You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/02/19 22:46:01 UTC
svn commit: r746009 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Author: cwiklik
Date: Thu Feb 19 21:46:00 2009
New Revision: 746009
URL: http://svn.apache.org/viewvc?rev=746009&view=rev
Log:
UIMA-1194 Added time waiting for free cas to the outgoing reply. Removed dead code. Removed code that handled messaging to collocated delegates.
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=746009&r1=746008&r2=746009&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu Feb 19 21:46:00 2009
@@ -263,12 +263,6 @@
bos.close();
}
}
-// if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
-// {
-// UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
-// "serializeCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_dump_serialized_cas__FINEST",
-// new Object[] {aCasReferenceId, serializedCas});
-// }
LongNumericStatistic statistic;
if ( (statistic = getAnalysisEngineController().getMonitor().getLongNumericStatistic("",Monitor.TotalSerializeTime)) != null )
@@ -622,7 +616,7 @@
}
else
{
- sendCasToCollocatedDelegate(true, aCasReferenceId, null, anEndpoint, true,0 );
+ // Not supported
}
}
catch( ServiceShutdownException e)
@@ -712,12 +706,7 @@
}
else
{
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_reply_with_sequence__FINE",
- new Object[] { anEndpoint.getEndpoint(), aNewCasReferenceId, sequence });
- }
- sendCasToCollocatedDelegate(false, anInputCasReferenceId, aNewCasReferenceId, anEndpoint, false, sequence);
+ // Not supported
}
}
catch( ServiceShutdownException e)
@@ -758,26 +747,7 @@
}
else
{
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_reply_with_sequence__FINE",
- new Object[] { anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getCasSequence() });
- }
- if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController && getAnalysisEngineController().isCasMultiplier() )
- {
- if ( ((AggregateAnalysisEngineController)getAnalysisEngineController()).getMessageOrigin(entry.getInputCasReferenceId()) != null)
- {
- sendCasToCollocatedDelegate(false, entry.getInputCasReferenceId(), entry.getCasReferenceId(), anEndpoint, false, entry.getCasSequence());
- }
- else
- {
- sendCasToCollocatedDelegate(false, entry.getCasReferenceId(), null, anEndpoint, false, 0);
- }
- }
- else
- {
- sendCasToCollocatedDelegate(false, entry.getInputCasReferenceId(), entry.getCasReferenceId(), anEndpoint, false, entry.getCasSequence());
- }
+ // Not supported
}
}
catch( ServiceShutdownException e)
@@ -887,14 +857,10 @@
}
sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
}
-
-
-
-
}
else
{
- sendCasToCollocatedDelegate(false, aCasReferenceId, null, anEndpoint, false, 0);
+ // Not supported
}
}
catch( ServiceShutdownException e)
@@ -1181,34 +1147,6 @@
throw new AsynchAEException(e);
}
}
- private byte[] getSerializedBinaryCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
- {
- CAS cas = null;
- try
- {
- byte[] serializedCAS = null;
- // Using Cas reference Id retrieve CAS from the shared Cash
- cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
- ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
- CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
- long t1 = getAnalysisEngineController().getCpuTime();
- serializedCAS = uimaSerializer.serializeCasToBinary(cas);
- long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1;
-
- getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
-
- entry.incrementTimeToSerializeCAS(timeToSerializeCas);
- casStats.incrementCasSerializationTime(timeToSerializeCas);
- getAnalysisEngineController().getServicePerformance().
- incrementCasSerializationTime(timeToSerializeCas);
- return serializedCAS;
- }
- catch( Exception e)
- {
- throw new AsynchAEException(e);
- }
- }
-
private byte[] getBinaryCasAndReleaseIt( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
{
try
@@ -1282,8 +1220,10 @@
aTextMessage.setLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats.getRawCasSerializationTime());
aTextMessage.setLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats.getRawCasDeserializationTime());
aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
- long iT =getAnalysisEngineController().getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
- aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT );
+ aTextMessage.setLongProperty(AsynchAEMessage.TimeWaitingForCAS,
+ getAnalysisEngineController().getServicePerformance().getTimeWaitingForCAS());
+ long iT =getAnalysisEngineController().getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
+ aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT );
String lookupKey = getAnalysisEngineController().getName();
long arrivalTime = getAnalysisEngineController().getTime( aCasReferenceId, lookupKey); //serviceInputEndpoint);
long timeInService = getAnalysisEngineController().getCpuTime()-arrivalTime;
@@ -1416,62 +1356,6 @@
aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
}
}
- /**
- * @deprecated
- * @param aMessage
- * @param anEndpoint
- * @param isProcessRequest
- * @throws Exception
- */
- private void populateHeaderWithContext( Message aMessage, Endpoint anEndpoint, boolean isProcessRequest ) throws Exception
- {
- if (anEndpoint.isRemote())
- {
- aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
-
- if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController && isProcessRequest)
- {
- String protocol = serviceProtocolList;
- if ( anEndpoint.getServerURI().trim().toLowerCase().startsWith("http"))
- {
- protocol = extractURLWithProtocol(serviceProtocolList, "http");
- }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "populateHeaderWithContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_remote_FINE",
- new Object[] {protocol, anEndpoint.getEndpoint()});
- }
- aMessage.setStringProperty(UIMAMessage.ServerURI, protocol);
- }
- else
- {
- aMessage.setStringProperty(UIMAMessage.ServerURI, serverURI);
- }
- }
- else
- {
- aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
- aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
- }
- if ( isProcessRequest )
- {
- aMessage.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process);
- if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController)
- {
- aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
- aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-
- }
- else
- {
- aMessage.setStringProperty(AsynchAEMessage.MessageFrom, serviceInputEndpoint);
- aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Response);
- }
-
- }
-
- }
-
public AnalysisEngineController getAnalysisEngineController()
{
return analysisEngineController;
@@ -1493,33 +1377,6 @@
this.controllerInputEndpoint = controllerInputEndpoint;
}
- private void ackMessage(String aCasReferenceId) throws Exception
- {
- if ( getAnalysisEngineController().getInputChannel().getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE )
- {
- ((Message)getAnalysisEngineController().
- getInProcessCache().
- getMessageAccessorByReference(aCasReferenceId).
- getRawMessage()).acknowledge();
-
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "ackMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_ack_msg__FINE",
- new Object[] { aCasReferenceId });
- }
- }
-
- }
- private boolean requiresAck( Message aMessage, boolean isFinal, int ackMode) throws JMSException
- {
- if ( aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response &&
- isFinal && ackMode == Session.CLIENT_ACKNOWLEDGE )
- {
- return true;
- }
- return false;
- }
-
private void dispatch( Message aMessage, Endpoint anEndpoint, CacheEntry entry, boolean isRequest, JmsEndpointConnection_impl endpointConnection, long msgSize ) throws Exception {
// Add stats
populateStats(aMessage, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
@@ -1968,148 +1825,11 @@
return entry.getCasReferenceId();
}
- private boolean isProcessReply( Message aMessage )
- {
- try
- {
- if ( aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response &&
- aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.Process )
- {
- return true;
- }
- }
- catch( JMSException e) {}
- return false;
- }
private void addIdleTime( Message aMessage )
{
-
-/*
- if ( isProcessReply(aMessage ) &&
- ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController ||
- !getAnalysisEngineController().isCasMultiplier() ) )
-
-*/
long t = System.nanoTime();
getAnalysisEngineController().saveReplyTime(t, "");
}
- private void sendCasToCollocatedDelegate(boolean isRequest, String anInputCasReferenceId, String aNewCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence)
- throws AsynchAEException, ServiceShutdownException
- {
- try
- {
- // Determine which of the CAS id's to log. If the CAS was generated by a CM its sequence
- // will be > 0. In this instance use the id in 'aNewCASReferenceId'. Otherwise use the
- // the default which is the id of an Input CAS.
- String id2Log = anInputCasReferenceId;
- if ( sequence > 0 )
- {
- id2Log = aNewCasReferenceId;
- }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service__FINE",
- new Object[] {anEndpoint.getEndpoint(), id2Log});
- }
- // Get the connection object for a given endpoint
- JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
- // Create empty JMS Text Message
- TextMessage tm = endpointConnection.produceTextMessage("");
- // Add common properties to the JMS Header
- if ( isRequest == true )
- {
- populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
- }
- else
- {
- populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
- }
-
-
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.CASRefID);
- // Add Cas Reference Id to the outgoing JMS Header
- if ( aNewCasReferenceId != null )
- {
- tm.setStringProperty(AsynchAEMessage.CasReference, aNewCasReferenceId);
-/*
- // Add the initial Input Cas Reference Id. This is the top ancestor from
- // which all other CASes are produced
- addTopCASParentReferenceId(tm, anInputCasReferenceId);
-*/
- tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
- tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
- // Override MessageType set in the populateHeaderWithContext above.
- // Add stats
- populateStats(tm, anEndpoint, aNewCasReferenceId, AsynchAEMessage.Process, isRequest);
-
- // Make the reply message look like a request. This message will contain a new CAS
- // produced by the CAS Multiplier. The client will treat this CAS
- // differently from the input CAS.
- tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- isRequest = true;
-
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
- {
- CacheEntry cacheEntry = getCacheEntry(aNewCasReferenceId);
- if ( cacheEntry != null )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
- new Object[] {getAnalysisEngineController().getComponentName(),"Collocated", anEndpoint.getEndpoint(), aNewCasReferenceId, anInputCasReferenceId, cacheEntry.getInputCasReferenceId() });
- }
- }
-
-
- }
- else
- {
- tm.setStringProperty(AsynchAEMessage.CasReference, anInputCasReferenceId);
-
- if ( getAnalysisEngineController().getInProcessCache() != null &&
- getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(anInputCasReferenceId).isAborted() )
- {
- tm.setBooleanProperty(AsynchAEMessage.Aborted, true);
- }
-
- // Add stats
- populateStats(tm, anEndpoint, anInputCasReferenceId, AsynchAEMessage.Process, isRequest);
- if ( startTimer )
- {
- // Start a timer for this request. The amount of time to wait
- // for response is provided in configuration for the endpoint
- anEndpoint.startProcessRequestTimer(anInputCasReferenceId);
- }
- }
- // ----------------------------------------------------
- // Send Request Messsage to Delegate
- // ----------------------------------------------------
- endpointConnection.send(tm, 0, startTimer);
- if ( !isRequest )
- {
- addIdleTime(tm);
- }
-
- }
- catch( JMSException e)
- {
- // Unable to establish connection to the endpoint. Logit and continue
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
- }
- }
- catch( ServiceShutdownException e)
- {
- throw e;
- }
- catch( Exception e)
- {
- throw new AsynchAEException(e);
- }
-
- }
-
private CacheEntry getCacheEntry( String aCasReferenceId) throws Exception
{
CacheEntry cacheEntry = null;
@@ -2120,22 +1840,6 @@
}
return cacheEntry;
}
- private void addTopCASParentReferenceId( TextMessage tm, String aCasReferenceId) throws Exception
- {
- CacheEntry cacheEntry = null;
- if ( ( cacheEntry = getCacheEntry(aCasReferenceId)) != null )
- {
- if ( cacheEntry.getInputCasReferenceId() == null )
- {
- tm.setStringProperty(AsynchAEMessage.InputCasReference, aCasReferenceId);
- }
- else
- {
- tm.setStringProperty(AsynchAEMessage.InputCasReference, cacheEntry.getInputCasReferenceId());
- }
- }
-
- }
public void stop()
{
aborting = true;