You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2008/10/10 00:28:31 UTC

svn commit: r703279 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms: activemq/ client/

Author: schor
Date: Thu Oct  9 15:28:27 2008
New Revision: 703279

URL: http://svn.apache.org/viewvc?rev=703279&view=rev
Log:
[UIMA-1196] apply patch to support binary serialization

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Thu Oct  9 15:28:27 2008
@@ -24,6 +24,7 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -357,6 +358,37 @@
 		}
 		throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
 	}
+  public BytesMessage produceByteMessage() throws AsynchAEException
+  {
+    Assert.notNull(producerSession);
+    boolean done = false;
+    int retryCount = 4;
+    while (retryCount > 0)
+    {
+      try
+      {
+        retryCount--;
+        return producerSession.createBytesMessage();
+      }
+      catch ( javax.jms.IllegalStateException e)
+      {
+        try
+        {
+          open();
+        }
+        catch ( ServiceShutdownException ex)
+        {
+          ex.printStackTrace();
+        }
+
+      }
+      catch ( Exception e)
+      {
+        throw new AsynchAEException(e);
+      }
+    }
+    throw new AsynchAEException(new InvalidMessageException("Unable to produce BytesMessage Object"));
+  }
 
 	public ObjectMessage produceObjectMessage() throws AsynchAEException
 	{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Thu Oct  9 15:28:27 2008
@@ -282,7 +282,8 @@
 		{
 			int payload = aMessage.getIntProperty(AsynchAEMessage.Payload);
 			if ( payload != AsynchAEMessage.XMIPayload && 
-				 payload != AsynchAEMessage.CASRefID &&
+	         payload != AsynchAEMessage.BinaryPayload &&
+	         payload != AsynchAEMessage.CASRefID &&
 				 payload != AsynchAEMessage.Exception &&
 				 payload != AsynchAEMessage.Metadata 
 				)
@@ -414,6 +415,8 @@
 			{
 			case AsynchAEMessage.XMIPayload:
 				return "XMIPayload";
+      case AsynchAEMessage.BinaryPayload:
+        return "BinaryPayload";
 			case AsynchAEMessage.CASRefID:
 				return "CASRefID";
 			case AsynchAEMessage.Metadata:

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu Oct  9 15:28:27 2008
@@ -29,6 +29,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
+import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -549,17 +550,33 @@
 		{
 			if (anEndpoint.isRemote())
 			{
-				String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
-				if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
-				{
-		            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
-		                    "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST",
-		                    new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS  });
-				}
-				
-				
-				//	Send process request to remote delegate and start timeout timer
-				sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
+			  if ( anEndpoint.getSerializer().equals("xmi")) {
+	        String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
+	        if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
+	        {
+	                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+	                        "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST",
+	                        new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS  });
+	        }
+	        
+	        
+	        //  Send process request to remote delegate and start timeout timer
+	        sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
+			  } else {
+	        byte[] serializedCAS = getBinaryCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
+	        if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
+	        {
+	                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+	                        "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_binary_cas__FINEST",
+	                        new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS  });
+	        }
+	        
+	        
+	        //  Send process request to remote delegate and start timeout timer
+	        sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
+			    
+			  }
+			  
 			}
 			else
 			{
@@ -594,6 +611,9 @@
 		try
 		{
 			boolean cacheSerializedCas = endpointRetryEnabled(endpoints);
+      // The default serialization strategy for parallel step is xmi.
+      // Binary serialization doesnt support merge.
+      endpoints[0].setSerializer("xmi");
 			
 			//	Serialize CAS using serializer defined in the first endpoint. All endpoints in the parallel Flow 
 			//	must use the same format (either XCAS or XMI)
@@ -614,8 +634,9 @@
 					}
 					
 					
-					
-					
+					// The default serialization strategy for parallel step is xmi.
+					// Binary serialization doesnt support merge.
+					endpoints[i].setSerializer("xmi");
 					
 					sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, endpoints[i], true, 0);
 				}
@@ -678,9 +699,15 @@
 			anEndpoint.setReplyEndpoint(true);
 			if ( anEndpoint.isRemote() )
 			{
-				//	Serializes CAS and releases it back to CAS Pool
-				String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
-				sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
+			  if ( anEndpoint.getSerializer().equals("xmi")) {
+	        //  Serializes CAS and releases it back to CAS Pool
+	        String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+	        sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
+			  } else {
+			    byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+          sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
+			  }
+			    
 			}
 			else
 			{
@@ -789,9 +816,22 @@
                     new Object[] { anEndpoint.getEndpoint(), aCasReferenceId });
 			if ( anEndpoint.isRemote() )
 			{
-				
-				String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
-				sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
+//				String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
+//				sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
+
+        if ( anEndpoint.getSerializer().equals("xmi")) {
+          //  Serializes CAS and releases it back to CAS Pool
+          String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
+          sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
+        } else {
+          CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+          byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+          sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
+        }
+          
+			
+			
+			
 			}
 			else
 			{
@@ -976,7 +1016,43 @@
 		}
 	}
 	
-	
+	private byte[] getBinaryCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
+	{
+    CAS cas = null;
+    try
+    {
+      byte[] serializedCAS = null;
+      //  Using Cas reference Id retrieve CAS from the shared Cash
+      cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
+      ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
+        CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+        long t1 = getAnalysisEngineController().getCpuTime();
+        //  Serialize CAS for remote Delegates
+        String serializer = anEndpoint.getSerializer();
+        if ( serializer.equals("binary")) {
+          serializedCAS = UimaSerializer.serializeCasToBinary(cas);
+        } else {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                  "getBinaryCas", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_serializer__WARNING",
+                  new Object[] { getAnalysisEngineController().getName(),serializer, anEndpoint.getEndpoint()});
+          throw new UimaEEServiceException("Invalid Serializer:"+serializer+" For Endpoint:"+anEndpoint.getEndpoint());
+        }
+        long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1;
+        
+        getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
+        
+        entry.incrementTimeToSerializeCAS(timeToSerializeCas);
+        casStats.incrementCasSerializationTime(timeToSerializeCas);
+        getAnalysisEngineController().getServicePerformance().
+          incrementCasSerializationTime(timeToSerializeCas);
+      return serializedCAS;
+    }
+    catch( Exception e)
+    {
+      throw new AsynchAEException(e);
+    }
+	  
+	}
 	
 	private String getSerializedCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
 	{
@@ -1022,12 +1098,56 @@
 			throw new AsynchAEException(e);
 		}
 	}
+  private byte[] getSerializedBinaryCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
+  {
+    CAS cas = null;
+    try
+    {
+      byte[] serializedCAS = null;
+      //  Using Cas reference Id retrieve CAS from the shared Cash
+      cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
+      ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
+        CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+        long t1 = getAnalysisEngineController().getCpuTime();
+        serializedCAS = UimaSerializer.serializeCasToBinary(cas);
+        long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1;
+        
+        getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
+        
+        entry.incrementTimeToSerializeCAS(timeToSerializeCas);
+        casStats.incrementCasSerializationTime(timeToSerializeCas);
+        getAnalysisEngineController().getServicePerformance().
+          incrementCasSerializationTime(timeToSerializeCas);
+      return serializedCAS;
+    }
+    catch( Exception e)
+    {
+      throw new AsynchAEException(e);
+    }
+  }
 	
+  private byte[] getBinaryCasAndReleaseIt( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
+  {
+    try
+    {
+      return getBinaryCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
+    }
+    catch( Exception e)
+    {
+      throw new AsynchAEException(e);
+    }
+    finally
+    {
+      if ( getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController && anEndpoint.isRemote() )
+      {
+        getAnalysisEngineController().dropCAS(aCasReferenceId, true );
+      }
+    }
+  }
 	
 	
 	private String getSerializedCasAndReleaseIt( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
 	{
-		CAS cas = null;
 		try
 		{
 			return getSerializedCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
@@ -1056,7 +1176,7 @@
 		}
 		return false;
 	}
-	private void populateStats( TextMessage aTextMessage, Endpoint anEndpoint, String aCasReferenceId, int anAdminCommand, boolean isRequest) throws Exception
+	private void populateStats( Message aTextMessage, Endpoint anEndpoint, String aCasReferenceId, int anAdminCommand, boolean isRequest) throws Exception
 	{
 		if ( anEndpoint.isFinal() )
 		{
@@ -1433,6 +1553,128 @@
 		}
 		
 	}
+
+  private void sendCasToRemoteEndpoint( boolean isRequest, byte[] aSerializedCAS, String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence) 
+  throws AsynchAEException, ServiceShutdownException
+  {
+    
+    try
+    {
+      if ( aborting )
+      {
+        return;
+      }
+      
+      //  Get the connection object for a given endpoint
+      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+      //  Create empty JMS Text Message
+      BytesMessage tm = endpointConnection.produceByteMessage();
+      tm.writeBytes(aSerializedCAS);
+      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload); 
+      //  Add Cas Reference Id to the outgoing JMS Header
+      tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+      //  Add common properties to the JMS Header
+      if ( isRequest == true )
+      {
+        populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process); 
+      }
+      else
+      {
+        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
+        CacheEntry entry = this.getCacheEntry(aCasReferenceId);
+        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
+      }
+      //  The following is true when the analytic is a CAS Multiplier
+      if ( sequence > 0 && !isRequest )
+      {
+        //  Override MessageType set in the populateHeaderWithContext above.
+        //  Make the reply message look like a request. This message will contain a new CAS 
+        //  produced by the CAS Multiplier. The client will treat this CAS
+        //  differently from the input CAS. 
+        tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+        tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
+        //  Add a sequence number assigned to this CAS by the controller
+        tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
+        isRequest = true;
+        if ( freeCASTempQueue != null )
+        {
+          //  Attach a temp queue to the outgoing message. This a queue where
+          //  Free CAS notifications need to be sent from the client
+          tm.setJMSReplyTo(freeCASTempQueue);
+        }
+        if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
+        {
+          CacheEntry cacheEntry = getCacheEntry(aCasReferenceId);
+          if ( cacheEntry != null )
+          {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                          "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+                          new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId, cacheEntry.getInputCasReferenceId() });
+          }
+        }
+      }
+
+      //  Add stats
+      populateStats(tm, anEndpoint, aCasReferenceId, AsynchAEMessage.Process, isRequest);
+      if ( startTimer)
+      {
+        //  Start a timer for this request. The amount of time to wait
+        //  for response is provided in configuration for the endpoint
+        anEndpoint.startProcessRequestTimer(aCasReferenceId);
+      }
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+                    new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+
+      //  By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+      //  endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+      //  the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+      //  managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+      //  by anEndpoint.getDestination != null, we dont start the timer.
+      boolean startConnectionTimer = true;
+      
+      if ( anEndpoint.getDestination() != null || !isRequest )
+      {
+        startConnectionTimer = false;
+      }
+      // ----------------------------------------------------
+      //  Send Request Messsage to the Endpoint
+      // ----------------------------------------------------
+      endpointConnection.send(tm, startConnectionTimer);
+//      if ( getAnalysisEngineController().isTopLevelComponent() )
+//      {
+//        getAnalysisEngineController().getInProcessCache().dumpContents(getAnalysisEngineController().getComponentName());
+//      }
+      if ( !isRequest )
+      {
+        addIdleTime(tm);
+      }
+    }
+    catch( JMSException e)
+    {
+      //  Unable to establish connection to the endpoint. Logit and continue
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                    "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+                    new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
+      
+    }
+
+    catch( ServiceShutdownException e)
+    {
+      throw e;
+    }
+    catch( AsynchAEException e)
+    {
+      throw e;
+    }
+    catch( Exception e)
+    {
+      throw new AsynchAEException(e);
+    }
+    
+  }
+	
+	
 	
 	private void sendCasToRemoteEndpoint( boolean isRequest, String aSerializedCAS, CacheEntry entry,  Endpoint anEndpoint, boolean startTimer ) 
 	throws AsynchAEException, ServiceShutdownException
@@ -1561,6 +1803,125 @@
 		
 	}
 	
+
+  private void sendCasToRemoteEndpoint( boolean isRequest, byte[] aSerializedCAS, CacheEntry entry,  Endpoint anEndpoint, boolean startTimer ) 
+  throws AsynchAEException, ServiceShutdownException
+  {
+    
+    try
+    {
+      if ( aborting )
+      {
+        return;
+      }
+      //  Get the connection object for a given endpoint
+      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+      //  Create empty JMS Text Message
+      BytesMessage tm = endpointConnection.produceByteMessage();
+      tm.writeBytes(aSerializedCAS);
+      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload); 
+      //  Add Cas Reference Id to the outgoing JMS Header
+      tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
+      //  Add common properties to the JMS Header
+      if ( isRequest == true )
+      {
+        populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process); 
+      }
+      else
+      {
+        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
+//        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
+      }
+      //  The following is true when the analytic is a CAS Multiplier
+      if ( entry.isSubordinate() && !isRequest )
+      {
+        //  Override MessageType set in the populateHeaderWithContext above.
+        //  Make the reply message look like a request. This message will contain a new CAS 
+        //  produced by the CAS Multiplier. The client will treat this CAS
+        //  differently from the input CAS. 
+        tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+
+        isRequest = true;
+        //  Save the id of the parent CAS
+        tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry.getCasReferenceId()));
+        //  Add a sequence number assigned to this CAS by the controller
+        tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
+        //  If this is a Cas Multiplier, add a reference to a special queue where
+        //  the client sends Free Cas Notifications
+        if ( freeCASTempQueue != null )
+        {
+          //  Attach a temp queue to the outgoing message. This is a queue where
+          //  Free CAS notifications need to be sent from the client
+          tm.setJMSReplyTo(freeCASTempQueue);
+        }
+        if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
+        {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                        "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+                        new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
+        }
+      }
+
+      //  Add stats
+      populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
+      if ( startTimer)
+      {
+        //  Start a timer for this request. The amount of time to wait
+        //  for response is provided in configuration for the endpoint
+        anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
+      }
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+                    new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+
+      //  By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+      //  endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+      //  the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+      //  managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+      //  by anEndpoint.getDestination != null, we dont start the timer.
+      boolean startConnectionTimer = true;
+      
+      if ( anEndpoint.getDestination() != null || !isRequest )
+      {
+        startConnectionTimer = false;
+      }
+      
+      // ----------------------------------------------------
+      //  Send Request Messsage to the Endpoint
+      // ----------------------------------------------------
+      endpointConnection.send(tm, startConnectionTimer);
+
+      if ( !isRequest )
+      {
+        addIdleTime(tm);
+      }
+    }
+    catch( JMSException e)
+    {
+      //  Unable to establish connection to the endpoint. Logit and continue
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                    "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+                    new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
+      
+    }
+
+    catch( ServiceShutdownException e)
+    {
+      throw e;
+    }
+    catch( AsynchAEException e)
+    {
+      throw e;
+    }
+    catch( Exception e)
+    {
+      throw new AsynchAEException(e);
+    }
+    
+  }
+	
+	
+	
 	private String getTopParentCasReferenceId( String casReferenceId ) throws Exception
 	{
 		if ( !getAnalysisEngineController().getInProcessCache().entryExists(casReferenceId) )

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Thu Oct  9 15:28:27 2008
@@ -20,6 +20,7 @@
 
 import java.util.List;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -103,6 +104,14 @@
     	}
 	  return session.createTextMessage();
 	}
+  public BytesMessage createBytesMessage() throws Exception
+  {
+      if ( session == null )
+      {
+        throw new JMSException("Unable To Create JMS BytesMessage. Reason: JMS Session Not Initialized");
+      }
+    return session.createBytesMessage();
+  }
 	/**
 	 * Cleanup any jms resources used by the worker thread
 	 */

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=703279&r1=703278&r2=703279&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Thu Oct  9 15:28:27 2008
@@ -23,8 +23,10 @@
 import java.util.Map;
 import java.util.Properties;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -34,6 +36,7 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.uima.UIMAFramework;
@@ -95,6 +98,10 @@
 	{
 		return 	new ActiveMQTextMessage();
 	}
+  protected BytesMessage createBytesMessage() throws ResourceInitializationException
+  {
+    return  new ActiveMQBytesMessage();
+  }
 
 	/**
 	 * Called at the end of collectionProcessingComplete - WAS closes receiving
@@ -118,7 +125,7 @@
 			throw new ResourceProcessException(e);
 		}
 	}
-	protected void setMetaRequestMessage(TextMessage msg) throws Exception
+	protected void setMetaRequestMessage(Message msg) throws Exception
 	{
 		msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
 
@@ -126,16 +133,18 @@
 		msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
 		msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta);
 		msg.setJMSReplyTo(consumerDestination);
-		((ActiveMQTextMessage) msg).setText("");
+		if ( msg instanceof TextMessage ) {
+	    ((ActiveMQTextMessage) msg).setText("");
+		}
 	}
 	/**
 	 * Initialize JMS Message with properties relevant to Process CAS request.
 	 */
-	protected void setCASMessage(String aCasReferenceId, CAS aCAS, TextMessage msg) throws ResourceProcessException
+	protected void setCASMessage(String aCasReferenceId, CAS aCAS, Message msg) throws ResourceProcessException
 	{
 		try{
-			setCommonProperties(aCasReferenceId, msg);
-			msg.setText(serializeCAS(aCAS));
+			setCommonProperties(aCasReferenceId, msg, "xmi");
+			((TextMessage)msg).setText(serializeCAS(aCAS));
 		}
 		catch (Exception e)
 		{
@@ -143,18 +152,30 @@
 		}
 	}
 	
-	protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, TextMessage msg) throws ResourceProcessException
+	protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, Message msg) throws ResourceProcessException
 	{
 		try{
-			setCommonProperties(aCasReferenceId, msg);
-			msg.setText(aSerializedCAS);
+			setCommonProperties(aCasReferenceId, msg, "xmi");
+			((TextMessage)msg).setText(aSerializedCAS);
 		}
 		catch (Exception e)
 		{
 			throw new ResourceProcessException(e);
 		}
 	}
-    protected void setCommonProperties( String aCasReferenceId, TextMessage msg) throws ResourceProcessException
+  protected void setCASMessage(String aCasReferenceId, byte[] aSerializedCAS, Message msg) throws ResourceProcessException
+  {
+    try{
+      setCommonProperties(aCasReferenceId, msg, "binary");
+      ((BytesMessage)msg).writeBytes(aSerializedCAS);
+    }
+    catch (Exception e)
+    {
+      throw new ResourceProcessException(e);
+    }
+  }
+
+  protected void setCommonProperties( String aCasReferenceId, Message msg, String aSerializationStrategy) throws ResourceProcessException
     {
 		try{
 			msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
@@ -163,7 +184,13 @@
 			msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
 			msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process);
 			msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-			msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
+			
+			if ( aSerializationStrategy.equals("binary")) {
+	      msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
+			} else if ( aSerializationStrategy.equals("xmi")) {
+        msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
+			}
+
 			msg.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
 			msg.setJMSReplyTo(consumerDestination);
 			
@@ -230,7 +257,7 @@
 		}
 	}
 
-	public void setCPCMessage(TextMessage msg) throws Exception
+	public void setCPCMessage(Message msg) throws Exception
 	{
 		msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
 		msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
@@ -239,7 +266,9 @@
 		msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
 		msg.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true);
 		msg.setJMSReplyTo(consumerDestination);
-		msg.setText("");
+    if ( msg instanceof TextMessage ) {
+      ((TextMessage)msg).setText("");
+    }
 	}
 	protected Connection getConnection( String aBrokerURI ) throws Exception
 	{
@@ -422,6 +451,10 @@
 		{
 			applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
 		}
+    if (anApplicationContext.containsKey(UimaAsynchronousEngine.SerializationStrategy))
+    {
+      super.serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SerializationStrategy);
+    }
 
 		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_uimaee_client__CONFIG", new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout, cpcTimeout });
 
@@ -479,7 +512,9 @@
 		{
 			throw new ResourceInitializationException(e);
 		}
-		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized_CONFIG", new Object[] { "" });
+		if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized__INFO", new Object[] { super.serializationStrategy });
+		}
 
 	}
 	/**