You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2008/10/10 00:28:31 UTC
svn commit: r703279 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms:
activemq/ client/
Author: schor
Date: Thu Oct 9 15:28:27 2008
New Revision: 703279
URL: http://svn.apache.org/viewvc?rev=703279&view=rev
Log:
[UIMA-1196] apply patch to support binary serialization
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Thu Oct 9 15:28:27 2008
@@ -24,6 +24,7 @@
import java.util.Timer;
import java.util.TimerTask;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -357,6 +358,37 @@
}
throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
}
+ public BytesMessage produceByteMessage() throws AsynchAEException
+ {
+ Assert.notNull(producerSession);
+ boolean done = false;
+ int retryCount = 4;
+ while (retryCount > 0)
+ {
+ try
+ {
+ retryCount--;
+ return producerSession.createBytesMessage();
+ }
+ catch ( javax.jms.IllegalStateException e)
+ {
+ try
+ {
+ open();
+ }
+ catch ( ServiceShutdownException ex)
+ {
+ ex.printStackTrace();
+ }
+
+ }
+ catch ( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+ }
+ throw new AsynchAEException(new InvalidMessageException("Unable to produce BytesMessage Object"));
+ }
public ObjectMessage produceObjectMessage() throws AsynchAEException
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Thu Oct 9 15:28:27 2008
@@ -282,7 +282,8 @@
{
int payload = aMessage.getIntProperty(AsynchAEMessage.Payload);
if ( payload != AsynchAEMessage.XMIPayload &&
- payload != AsynchAEMessage.CASRefID &&
+ payload != AsynchAEMessage.BinaryPayload &&
+ payload != AsynchAEMessage.CASRefID &&
payload != AsynchAEMessage.Exception &&
payload != AsynchAEMessage.Metadata
)
@@ -414,6 +415,8 @@
{
case AsynchAEMessage.XMIPayload:
return "XMIPayload";
+ case AsynchAEMessage.BinaryPayload:
+ return "BinaryPayload";
case AsynchAEMessage.CASRefID:
return "CASRefID";
case AsynchAEMessage.Metadata:
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu Oct 9 15:28:27 2008
@@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -549,17 +550,33 @@
{
if (anEndpoint.isRemote())
{
- String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
- if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST",
- new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS });
- }
-
-
- // Send process request to remote delegate and start timeout timer
- sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
+ if ( anEndpoint.getSerializer().equals("xmi")) {
+ String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST",
+ new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS });
+ }
+
+
+ // Send process request to remote delegate and start timeout timer
+ sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
+ } else {
+ byte[] serializedCAS = getBinaryCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_binary_cas__FINEST",
+ new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS });
+ }
+
+
+ // Send process request to remote delegate and start timeout timer
+ sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
+
+ }
+
}
else
{
@@ -594,6 +611,9 @@
try
{
boolean cacheSerializedCas = endpointRetryEnabled(endpoints);
+ // The default serialization strategy for parallel step is xmi.
+ // Binary serialization doesnt support merge.
+ endpoints[0].setSerializer("xmi");
// Serialize CAS using serializer defined in the first endpoint. All endpoints in the parallel Flow
// must use the same format (either XCAS or XMI)
@@ -614,8 +634,9 @@
}
-
-
+ // The default serialization strategy for parallel step is xmi.
+ // Binary serialization doesnt support merge.
+ endpoints[i].setSerializer("xmi");
sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, endpoints[i], true, 0);
}
@@ -678,9 +699,15 @@
anEndpoint.setReplyEndpoint(true);
if ( anEndpoint.isRemote() )
{
- // Serializes CAS and releases it back to CAS Pool
- String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
- sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
+ if ( anEndpoint.getSerializer().equals("xmi")) {
+ // Serializes CAS and releases it back to CAS Pool
+ String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+ sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
+ } else {
+ byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+ sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
+ }
+
}
else
{
@@ -789,9 +816,22 @@
new Object[] { anEndpoint.getEndpoint(), aCasReferenceId });
if ( anEndpoint.isRemote() )
{
-
- String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
- sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
+// String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
+// sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
+
+ if ( anEndpoint.getSerializer().equals("xmi")) {
+ // Serializes CAS and releases it back to CAS Pool
+ String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
+ sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
+ } else {
+ CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+ sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
+ }
+
+
+
+
}
else
{
@@ -976,7 +1016,43 @@
}
}
-
+ private byte[] getBinaryCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
+ {
+ CAS cas = null;
+ try
+ {
+ byte[] serializedCAS = null;
+ // Using Cas reference Id retrieve CAS from the shared Cash
+ cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
+ ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
+ CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ long t1 = getAnalysisEngineController().getCpuTime();
+ // Serialize CAS for remote Delegates
+ String serializer = anEndpoint.getSerializer();
+ if ( serializer.equals("binary")) {
+ serializedCAS = UimaSerializer.serializeCasToBinary(cas);
+ } else {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "getBinaryCas", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_serializer__WARNING",
+ new Object[] { getAnalysisEngineController().getName(),serializer, anEndpoint.getEndpoint()});
+ throw new UimaEEServiceException("Invalid Serializer:"+serializer+" For Endpoint:"+anEndpoint.getEndpoint());
+ }
+ long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1;
+
+ getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
+
+ entry.incrementTimeToSerializeCAS(timeToSerializeCas);
+ casStats.incrementCasSerializationTime(timeToSerializeCas);
+ getAnalysisEngineController().getServicePerformance().
+ incrementCasSerializationTime(timeToSerializeCas);
+ return serializedCAS;
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+
+ }
private String getSerializedCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
{
@@ -1022,12 +1098,56 @@
throw new AsynchAEException(e);
}
}
+ private byte[] getSerializedBinaryCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
+ {
+ CAS cas = null;
+ try
+ {
+ byte[] serializedCAS = null;
+ // Using Cas reference Id retrieve CAS from the shared Cash
+ cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
+ ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
+ CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ long t1 = getAnalysisEngineController().getCpuTime();
+ serializedCAS = UimaSerializer.serializeCasToBinary(cas);
+ long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1;
+
+ getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
+
+ entry.incrementTimeToSerializeCAS(timeToSerializeCas);
+ casStats.incrementCasSerializationTime(timeToSerializeCas);
+ getAnalysisEngineController().getServicePerformance().
+ incrementCasSerializationTime(timeToSerializeCas);
+ return serializedCAS;
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+ }
+ private byte[] getBinaryCasAndReleaseIt( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
+ {
+ try
+ {
+ return getBinaryCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+ finally
+ {
+ if ( getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController && anEndpoint.isRemote() )
+ {
+ getAnalysisEngineController().dropCAS(aCasReferenceId, true );
+ }
+ }
+ }
private String getSerializedCasAndReleaseIt( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
{
- CAS cas = null;
try
{
return getSerializedCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
@@ -1056,7 +1176,7 @@
}
return false;
}
- private void populateStats( TextMessage aTextMessage, Endpoint anEndpoint, String aCasReferenceId, int anAdminCommand, boolean isRequest) throws Exception
+ private void populateStats( Message aTextMessage, Endpoint anEndpoint, String aCasReferenceId, int anAdminCommand, boolean isRequest) throws Exception
{
if ( anEndpoint.isFinal() )
{
@@ -1433,6 +1553,128 @@
}
}
+
+ private void sendCasToRemoteEndpoint( boolean isRequest, byte[] aSerializedCAS, String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence)
+ throws AsynchAEException, ServiceShutdownException
+ {
+
+ try
+ {
+ if ( aborting )
+ {
+ return;
+ }
+
+ // Get the connection object for a given endpoint
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ // Create empty JMS Text Message
+ BytesMessage tm = endpointConnection.produceByteMessage();
+ tm.writeBytes(aSerializedCAS);
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
+ // Add Cas Reference Id to the outgoing JMS Header
+ tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+ // Add common properties to the JMS Header
+ if ( isRequest == true )
+ {
+ populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
+ }
+ else
+ {
+ populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
+ CacheEntry entry = this.getCacheEntry(aCasReferenceId);
+ tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
+ }
+ // The following is true when the analytic is a CAS Multiplier
+ if ( sequence > 0 && !isRequest )
+ {
+ // Override MessageType set in the populateHeaderWithContext above.
+ // Make the reply message look like a request. This message will contain a new CAS
+ // produced by the CAS Multiplier. The client will treat this CAS
+ // differently from the input CAS.
+ tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
+ // Add a sequence number assigned to this CAS by the controller
+ tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
+ isRequest = true;
+ if ( freeCASTempQueue != null )
+ {
+ // Attach a temp queue to the outgoing message. This a queue where
+ // Free CAS notifications need to be sent from the client
+ tm.setJMSReplyTo(freeCASTempQueue);
+ }
+ if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
+ {
+ CacheEntry cacheEntry = getCacheEntry(aCasReferenceId);
+ if ( cacheEntry != null )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+ new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId, cacheEntry.getInputCasReferenceId() });
+ }
+ }
+ }
+
+ // Add stats
+ populateStats(tm, anEndpoint, aCasReferenceId, AsynchAEMessage.Process, isRequest);
+ if ( startTimer)
+ {
+ // Start a timer for this request. The amount of time to wait
+ // for response is provided in configuration for the endpoint
+ anEndpoint.startProcessRequestTimer(aCasReferenceId);
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+ new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+
+ // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+ // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+ // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+ // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+ // by anEndpoint.getDestination != null, we dont start the timer.
+ boolean startConnectionTimer = true;
+
+ if ( anEndpoint.getDestination() != null || !isRequest )
+ {
+ startConnectionTimer = false;
+ }
+ // ----------------------------------------------------
+ // Send Request Messsage to the Endpoint
+ // ----------------------------------------------------
+ endpointConnection.send(tm, startConnectionTimer);
+// if ( getAnalysisEngineController().isTopLevelComponent() )
+// {
+// getAnalysisEngineController().getInProcessCache().dumpContents(getAnalysisEngineController().getComponentName());
+// }
+ if ( !isRequest )
+ {
+ addIdleTime(tm);
+ }
+ }
+ catch( JMSException e)
+ {
+ // Unable to establish connection to the endpoint. Logit and continue
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
+
+ }
+
+ catch( ServiceShutdownException e)
+ {
+ throw e;
+ }
+ catch( AsynchAEException e)
+ {
+ throw e;
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+
private void sendCasToRemoteEndpoint( boolean isRequest, String aSerializedCAS, CacheEntry entry, Endpoint anEndpoint, boolean startTimer )
throws AsynchAEException, ServiceShutdownException
@@ -1561,6 +1803,125 @@
}
+
+ private void sendCasToRemoteEndpoint( boolean isRequest, byte[] aSerializedCAS, CacheEntry entry, Endpoint anEndpoint, boolean startTimer )
+ throws AsynchAEException, ServiceShutdownException
+ {
+
+ try
+ {
+ if ( aborting )
+ {
+ return;
+ }
+ // Get the connection object for a given endpoint
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ // Create empty JMS Text Message
+ BytesMessage tm = endpointConnection.produceByteMessage();
+ tm.writeBytes(aSerializedCAS);
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
+ // Add Cas Reference Id to the outgoing JMS Header
+ tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
+ // Add common properties to the JMS Header
+ if ( isRequest == true )
+ {
+ populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
+ }
+ else
+ {
+ populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
+// tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
+ }
+ // The following is true when the analytic is a CAS Multiplier
+ if ( entry.isSubordinate() && !isRequest )
+ {
+ // Override MessageType set in the populateHeaderWithContext above.
+ // Make the reply message look like a request. This message will contain a new CAS
+ // produced by the CAS Multiplier. The client will treat this CAS
+ // differently from the input CAS.
+ tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+
+ isRequest = true;
+ // Save the id of the parent CAS
+ tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry.getCasReferenceId()));
+ // Add a sequence number assigned to this CAS by the controller
+ tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
+ // If this is a Cas Multiplier, add a reference to a special queue where
+ // the client sends Free Cas Notifications
+ if ( freeCASTempQueue != null )
+ {
+ // Attach a temp queue to the outgoing message. This is a queue where
+ // Free CAS notifications need to be sent from the client
+ tm.setJMSReplyTo(freeCASTempQueue);
+ }
+ if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+ new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
+ }
+ }
+
+ // Add stats
+ populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
+ if ( startTimer)
+ {
+ // Start a timer for this request. The amount of time to wait
+ // for response is provided in configuration for the endpoint
+ anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+ new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+
+ // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+ // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+ // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+ // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+ // by anEndpoint.getDestination != null, we dont start the timer.
+ boolean startConnectionTimer = true;
+
+ if ( anEndpoint.getDestination() != null || !isRequest )
+ {
+ startConnectionTimer = false;
+ }
+
+ // ----------------------------------------------------
+ // Send Request Messsage to the Endpoint
+ // ----------------------------------------------------
+ endpointConnection.send(tm, startConnectionTimer);
+
+ if ( !isRequest )
+ {
+ addIdleTime(tm);
+ }
+ }
+ catch( JMSException e)
+ {
+ // Unable to establish connection to the endpoint. Logit and continue
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
+
+ }
+
+ catch( ServiceShutdownException e)
+ {
+ throw e;
+ }
+ catch( AsynchAEException e)
+ {
+ throw e;
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+
+
private String getTopParentCasReferenceId( String casReferenceId ) throws Exception
{
if ( !getAnalysisEngineController().getInProcessCache().entryExists(casReferenceId) )
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Thu Oct 9 15:28:27 2008
@@ -20,6 +20,7 @@
import java.util.List;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -103,6 +104,14 @@
}
return session.createTextMessage();
}
+ public BytesMessage createBytesMessage() throws Exception
+ {
+ if ( session == null )
+ {
+ throw new JMSException("Unable To Create JMS BytesMessage. Reason: JMS Session Not Initialized");
+ }
+ return session.createBytesMessage();
+ }
/**
* Cleanup any jms resources used by the worker thread
*/
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Thu Oct 9 15:28:27 2008
@@ -23,8 +23,10 @@
import java.util.Map;
import java.util.Properties;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -34,6 +36,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.uima.UIMAFramework;
@@ -95,6 +98,10 @@
{
return new ActiveMQTextMessage();
}
+ protected BytesMessage createBytesMessage() throws ResourceInitializationException
+ {
+ return new ActiveMQBytesMessage();
+ }
/**
* Called at the end of collectionProcessingComplete - WAS closes receiving
@@ -118,7 +125,7 @@
throw new ResourceProcessException(e);
}
}
- protected void setMetaRequestMessage(TextMessage msg) throws Exception
+ protected void setMetaRequestMessage(Message msg) throws Exception
{
msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
@@ -126,16 +133,18 @@
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta);
msg.setJMSReplyTo(consumerDestination);
- ((ActiveMQTextMessage) msg).setText("");
+ if ( msg instanceof TextMessage ) {
+ ((ActiveMQTextMessage) msg).setText("");
+ }
}
/**
* Initialize JMS Message with properties relevant to Process CAS request.
*/
- protected void setCASMessage(String aCasReferenceId, CAS aCAS, TextMessage msg) throws ResourceProcessException
+ protected void setCASMessage(String aCasReferenceId, CAS aCAS, Message msg) throws ResourceProcessException
{
try{
- setCommonProperties(aCasReferenceId, msg);
- msg.setText(serializeCAS(aCAS));
+ setCommonProperties(aCasReferenceId, msg, "xmi");
+ ((TextMessage)msg).setText(serializeCAS(aCAS));
}
catch (Exception e)
{
@@ -143,18 +152,30 @@
}
}
- protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, TextMessage msg) throws ResourceProcessException
+ protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, Message msg) throws ResourceProcessException
{
try{
- setCommonProperties(aCasReferenceId, msg);
- msg.setText(aSerializedCAS);
+ setCommonProperties(aCasReferenceId, msg, "xmi");
+ ((TextMessage)msg).setText(aSerializedCAS);
}
catch (Exception e)
{
throw new ResourceProcessException(e);
}
}
- protected void setCommonProperties( String aCasReferenceId, TextMessage msg) throws ResourceProcessException
+ protected void setCASMessage(String aCasReferenceId, byte[] aSerializedCAS, Message msg) throws ResourceProcessException
+ {
+ try{
+ setCommonProperties(aCasReferenceId, msg, "binary");
+ ((BytesMessage)msg).writeBytes(aSerializedCAS);
+ }
+ catch (Exception e)
+ {
+ throw new ResourceProcessException(e);
+ }
+ }
+
+ protected void setCommonProperties( String aCasReferenceId, Message msg, String aSerializationStrategy) throws ResourceProcessException
{
try{
msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
@@ -163,7 +184,13 @@
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process);
msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
+
+ if ( aSerializationStrategy.equals("binary")) {
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
+ } else if ( aSerializationStrategy.equals("xmi")) {
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
+ }
+
msg.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
msg.setJMSReplyTo(consumerDestination);
@@ -230,7 +257,7 @@
}
}
- public void setCPCMessage(TextMessage msg) throws Exception
+ public void setCPCMessage(Message msg) throws Exception
{
msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
@@ -239,7 +266,9 @@
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
msg.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true);
msg.setJMSReplyTo(consumerDestination);
- msg.setText("");
+ if ( msg instanceof TextMessage ) {
+ ((TextMessage)msg).setText("");
+ }
}
protected Connection getConnection( String aBrokerURI ) throws Exception
{
@@ -422,6 +451,10 @@
{
applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
}
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.SerializationStrategy))
+ {
+ super.serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SerializationStrategy);
+ }
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_uimaee_client__CONFIG", new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout, cpcTimeout });
@@ -479,7 +512,9 @@
{
throw new ResourceInitializationException(e);
}
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized_CONFIG", new Object[] { "" });
+ if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized__INFO", new Object[] { super.serializationStrategy });
+ }
}
/**