You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/16 19:03:15 UTC

svn commit: r1071333 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Author: cwiklik
Date: Wed Feb 16 18:03:15 2011
New Revision: 1071333

URL: http://svn.apache.org/viewvc?rev=1071333&view=rev
Log:
UIMA-2055 Combined 4 versions of sendRequest() into one. Did the same with sendCasToRemoteEndpoint() 

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1071333&r1=1071332&r2=1071333&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Feb 16 18:03:15 2011
@@ -19,8 +19,6 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
-import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.NotSerializableException;
 import java.io.PrintWriter;
@@ -30,20 +28,16 @@ import java.net.InetAddress;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
+import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
@@ -54,17 +48,17 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.Channel;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.OutputChannel;
 import org.apache.uima.aae.SerializerCache;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaSerializer;
-import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
-import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
 import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
 import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.ErrorContext;
@@ -546,65 +540,37 @@ public class JmsOutputChannel implements
     return endpointConnection;
   }
 
-  /**
-   * Sends request message to a delegate.
-   * 
-   * @param aCommand
-   *          - the type of request [Process|GetMeta]
-   * @param anEndpoint
-   *          - the destination where the delegate receives messages
-   * 
-   * @throws AsynchAEException
-   */
-  public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint)
-          throws AsynchAEException {
-    try {
-
-      if ( aCommand == AsynchAEMessage.ReleaseCAS ) {
-        anEndpoint.setReplyEndpoint(true);
-        anEndpoint.setIsCasMultiplier(true);
-        anEndpoint.setFreeCasEndpoint(true);
-      }
-      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
-
-      TextMessage tm = endpointConnection.produceTextMessage("");
-      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
-      tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-      populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
-      if (aCommand == AsynchAEMessage.ReleaseCAS || aCommand == AsynchAEMessage.Stop) {
+  private void logRequest(String key, Endpoint anEndpoint ) {
+    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+              "sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+              "key", new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getDelegateKey() });
+    }
 
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.FINE,
-                  CLASS_NAME.getName(),
-                  "sendRequest",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_release_cas_req__FINE",
-                  new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),
-                      aCasReferenceId });
+  }
+  private Delegate startGetMetaTimerAndGetDelegate( Endpoint anEndpoint ) {
+    Delegate delegate = null;
+    if (anEndpoint.getDestination() != null) {
+      String replyQueueName = ((ActiveMQDestination) anEndpoint.getDestination())
+              .getPhysicalName().replaceAll(":", "_");
+      if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
+        String delegateKey = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+                .lookUpDelegateKey(anEndpoint.getEndpoint());
+        ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+                .getDelegateServiceInfo(delegateKey);
+        if (serviceInfo != null) {
+          serviceInfo.setReplyQueueName(replyQueueName);
+          serviceInfo.setServiceKey(delegateKey);
+        }
+        delegate = lookupDelegate(delegateKey);
+        if (delegate.getGetMetaTimeout() > 0) {
+          System.out.println("--------------- Service:"+getAnalysisEngineController().getComponentName()+" Starting getMeta Timer For Delegate:"+anEndpoint.getDelegateKey());
+          delegate.startGetMetaRequestTimer();
         }
       }
-      // Only used to send a Stop or ReleaseCas request so probably no need to start a connection
-      // timer ?
-      endpointConnection.send(tm, 0, true);
-    } catch (JMSException e) {
-      // Unable to establish connection to the endpoint. Logit and continue
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendRequest",
-                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
-                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
-      }
-    } catch (ServiceShutdownException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendRequest",
-                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_shutdown__INFO",
-                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
-      }
-    } catch (Exception e) {
-      throw new AsynchAEException(e);
-    }
+    } 
+    return delegate;
   }
-
   /**
    * Sends request message to a delegate.
    * 
@@ -615,89 +581,54 @@ public class JmsOutputChannel implements
    * 
    * @throws AsynchAEException
    */
-  public void sendRequest(int aCommand, Endpoint anEndpoint) {
+  public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint) 
+  throws AsynchAEException {
     Delegate delegate = null;
     try {
       JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
 
-      TextMessage tm = endpointConnection.produceTextMessage("");
+      Message tm = endpointConnection.produceTextMessage("");
       tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
-      tm.setText(""); // Need this to prevent the Broker from throwing an exception when sending a
-      // message to C++ service
+      switch(aCommand) {
+        case AsynchAEMessage.CollectionProcessComplete:
+          logRequest("UIMAEE_send_cpc_req__FINE", anEndpoint);
+        break;
+        case AsynchAEMessage.ReleaseCAS:
+          tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+          logRequest("UIMAJMS_releasecas_request__endpoint__FINEST", anEndpoint);
+        break;
+        case AsynchAEMessage.GetMeta:
+          delegate = startGetMetaTimerAndGetDelegate(anEndpoint);
+          logRequest("UIMAEE_service_sending_getmeta_request__FINE", anEndpoint);
+        break;
+        case AsynchAEMessage.Stop:
+          tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+          logRequest("UIMAEE_service_sending_stop_request__FINE", anEndpoint);
+        break;
+        
+        case AsynchAEMessage.Process:
+          logRequest("UIMAEE_service_sending_process_request__FINE", anEndpoint);
+          serializeCasAndSend(getAnalysisEngineController().
+                  getInProcessCache().
+                    getCacheEntryForCAS(aCasReferenceId), anEndpoint);
+          return;  /// <<<<< RETURN - Done here >>>>
+          
+        
+      };
 
       populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
 
       // For remotes add a special property to the message. This property
       // will be echoed back by the service. This property enables matching
       // the reply with the right endpoint object managed by the aggregate.
-      if (anEndpoint.isRemote()) {
-        tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
-      }
-      boolean startTimer = false;
-      // Start timer for endpoints that are remote and are managed by a different broker
-      // than this service. If an endpoint contains a destination object, the outgoing
-      // request will contain a JMSReplyTo object which will point to a temp queue
-      if (anEndpoint.isRemote() && anEndpoint.getDestination() == null) {
-        startTimer = true;
-      }
-      if (aCommand == AsynchAEMessage.CollectionProcessComplete) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                  "sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAEE_send_cpc_req__FINE", new Object[] { anEndpoint.getEndpoint() });
-        }
-      } else if (aCommand == AsynchAEMessage.ReleaseCAS) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.FINEST,
-                  CLASS_NAME.getName(),
-                  "sendRequest",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_releasecas_request__endpoint__FINEST",
-                  new Object[] { getAnalysisEngineController().getName(),
-                      endpointConnection.getEndpoint() });
-        }
-      } else if (aCommand == AsynchAEMessage.GetMeta) {
-        if (anEndpoint.getDestination() != null) {
-          String replyQueueName = ((ActiveMQDestination) anEndpoint.getDestination())
-                  .getPhysicalName().replaceAll(":", "_");
-          if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
-            String delegateKey = ((AggregateAnalysisEngineController) getAnalysisEngineController())
-                    .lookUpDelegateKey(anEndpoint.getEndpoint());
-            ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
-                    .getDelegateServiceInfo(delegateKey);
-            if (serviceInfo != null) {
-              serviceInfo.setReplyQueueName(replyQueueName);
-              serviceInfo.setServiceKey(delegateKey);
-            }
-            delegate = lookupDelegate(delegateKey);
-            if (delegate.getGetMetaTimeout() > 0) {
-              delegate.startGetMetaRequestTimer();
-            }
-          }
-        } else if (!anEndpoint.isRemote()) {
-          ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
-                  .getServiceInfo();
-          if (serviceInfo != null) {
-            serviceInfo.setReplyQueueName(controllerInputEndpoint);
-          }
-        }
-      } else {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.FINEST,
-                  CLASS_NAME.getName(),
-                  "sendRequest",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_metadata_request__endpoint__FINEST",
-                  new Object[] { endpointConnection.getEndpoint(),
-                      endpointConnection.getServerUri() });
-        }
-      }
-      if (endpointConnection.send(tm, 0, startTimer) != true) {
+       tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
+
+      if (endpointConnection.send(tm, 0, true) != true) {
         throw new ServiceNotFoundException();
       }
 
+    } catch (AsynchAEException e) {
+      throw e;
     } catch (Exception e) {
       if (delegate != null && aCommand == AsynchAEMessage.GetMeta) {
         delegate.cancelDelegateTimer();
@@ -710,165 +641,43 @@ public class JmsOutputChannel implements
               getAnalysisEngineController());
     }
   }
-
-  public void sendRequest(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
-
-    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest",
-              JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
-              new Object[] { anEndpoint.getEndpoint() });
-    }
-    try {
-      if (anEndpoint.isRemote()) {
-        if (anEndpoint.getSerializer().equals("xmi")) {
-          String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, anEndpoint,
-                  anEndpoint.isRetryEnabled());
-          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINEST,
-                    CLASS_NAME.getName(),
-                    "sendRequest",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_sending_serialized_cas__FINEST",
-                    new Object[] { getAnalysisEngineController().getComponentName(),
-                        anEndpoint.getEndpoint(), aCasReferenceId, serializedCAS });
-          }
-          // Send process request to remote delegate and start timeout timer
-          sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
-        } else {
-          byte[] serializedCAS = getBinaryCasAndReleaseIt(false, aCasReferenceId, anEndpoint,
-                  anEndpoint.isRetryEnabled());
-          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINEST,
-                    CLASS_NAME.getName(),
-                    "sendRequest",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_sending_binary_cas__FINEST",
-                    new Object[] { getAnalysisEngineController().getComponentName(),
-                        anEndpoint.getEndpoint(), aCasReferenceId, serializedCAS });
-          }
-
-          // Send process request to remote delegate and start timeout timer
-          sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
-
-        }
-
-      } else {
-        // Not supported
+  private void serializeCasAndSend(CacheEntry entry, Endpoint anEndpoint) throws Exception {
+    if (anEndpoint.getSerializer().equals("xmi")) {
+      String serializedCAS = getSerializedCasAndReleaseIt(false, entry.getCasReferenceId(), anEndpoint,
+              anEndpoint.isRetryEnabled());
+      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINEST,
+                CLASS_NAME.getName(),
+                "sendRequest",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_sending_serialized_cas__FINEST",
+                new Object[] { getAnalysisEngineController().getComponentName(),
+                    anEndpoint.getDelegateKey(), entry.getCasReferenceId(), serializedCAS });
       }
-    } catch (ServiceShutdownException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-        
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_exception__WARNING", e);
+      // Send process request to remote delegate and start timeout timer
+      sendCasToRemoteEndpoint(true, serializedCAS, entry, anEndpoint, true);
+    } else {
+      byte[] serializedCAS = getBinaryCasAndReleaseIt(false, entry.getCasReferenceId(), anEndpoint,
+              anEndpoint.isRetryEnabled());
+      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINEST,
+                CLASS_NAME.getName(),
+                "sendRequest",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_sending_binary_cas__FINEST",
+                new Object[] { getAnalysisEngineController().getComponentName(),
+                    anEndpoint.getDelegateKey(), entry.getCasReferenceId(), serializedCAS });
       }
-    }
+      
+      // Send process request to remote delegate and start timeout timer
+      sendCasToRemoteEndpoint(true, serializedCAS, entry, anEndpoint, true);
 
-    catch (AsynchAEException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new AsynchAEException(e);
     }
-  }
-
-  /**
-   * Sends request message to process CAS to the given destinations. This method enables processing
-   * of the same CAS in multiple Analysis Engines at the same time.
-   * 
-   * @param aCasReferenceId
-   * @param endpoints
-   * @throws AsynchAEException
-   */
-  public void sendRequest(String aCasReferenceId, Endpoint[] endpoints) throws AsynchAEException {
-    Endpoint currentEndpoint = null;
-    try {
-      boolean cacheSerializedCas = endpointRetryEnabled(endpoints);
-      // The default serialization strategy for parallel step is xmi.
-      // Binary serialization doesnt support merge.
-      endpoints[0].setSerializer("xmi");
-
-      // Serialize CAS using serializer defined in the first endpoint. All endpoints in the parallel
-      // Flow
-      // must use the same format (either XCAS or XMI)
-      String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, endpoints[0],
-              cacheSerializedCas);
-      // Using provided endpoints, create JMS message for each destination and sent the serialized
-      // CAS to it.
-      for (int i = 0; i < endpoints.length; i++) {
-        // For remote delegates, optionally cache serialized CAS in case a retry on timeout is
-        // required.
-        if (endpoints[i].isRemote()) {
-
-          currentEndpoint = endpoints[i];
-          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINEST,
-                    CLASS_NAME.getName(),
-                    "sendRequest",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_sending_serialized_cas__FINEST",
-                    new Object[] { getAnalysisEngineController().getComponentName(),
-                        endpoints[i].getEndpoint(), aCasReferenceId, serializedCAS });
-          }
-
-          // The default serialization strategy for parallel step is xmi.
-          // Binary serialization doesnt support merge.
-          endpoints[i].setSerializer("xmi");
 
-          sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, endpoints[i], true, 0);
-        } else {
-          // Currently this use case is not supported. Parallel processing of CAS is only supported
-          // with remote Delegates
-        }
-      }
-    } catch (Exception e) {
-      // Handle the error
-      ErrorContext errorContext = new ErrorContext();
-      errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
-      errorContext.add(AsynchAEMessage.Endpoint, currentEndpoint);
-      errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
-      getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext,
-              getAnalysisEngineController());
-    }
   }
 
-  public void sendReply(CAS aCas, String anInputCasReferenceId, String aNewCasReferenceId,
-          Endpoint anEndpoint, long sequence) throws AsynchAEException {
-    try {
-      anEndpoint.setReplyEndpoint(true);
-      if (anEndpoint.isRemote()) {
-        // Serializes CAS and releases it back to CAS Pool
-        String serializedCAS = getSerializedCas(true, aNewCasReferenceId, anEndpoint, anEndpoint
-                .isRetryEnabled());
-        sendCasToRemoteEndpoint(false, serializedCAS, anInputCasReferenceId, aNewCasReferenceId,
-                anEndpoint, false, sequence);
-      } else {
-        // Not supported
-      }
-    } catch (ServiceShutdownException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-        
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_exception__WARNING", e);
-      }
-    } catch (AsynchAEException e) {
-      throw e;
-    }
-
-    catch (Exception e) {
-      throw new AsynchAEException(e);
-    }
-
-  }
 
   public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
     try {
@@ -985,232 +794,100 @@ public class JmsOutputChannel implements
 	      }
 
   }
-  public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId)
-          throws AsynchAEException {
-	  sendReply(aCommand, anEndpoint, aCasReferenceId, false);
-  }
 
-  public void sendReply(int aCommand, Endpoint anEndpoint) throws AsynchAEException {
+  /**
+   * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
+   * full stack)
+   * 
+   * @param t
+   *          - Throwable to include in the reply message
+   * @param anEndpoint
+   *          - an endpoint to receive the reply message
+   * @param aCasReferenceId
+   *          - a unique CAS reference id
+   * 
+   * @throws AsynchAEException
+   */
+  public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId,
+          Endpoint anEndpoint, int aCommand) throws AsynchAEException {
     anEndpoint.setReplyEndpoint(true);
     try {
+      Throwable wrapper = null;
+      if (!(t instanceof UimaEEServiceException)) {
+        // Strip off AsyncAEException and replace with UimaEEServiceException
+        if (t instanceof AsynchAEException && t.getCause() != null) {
+          wrapper = new UimaEEServiceException(t.getCause());
+        } else {
+          wrapper = new UimaEEServiceException(t);
+        }
+      }
       if (aborting) {
         return;
       }
+      anEndpoint.setReplyEndpoint(true);
       JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+      // Create Message that will contain serialized Exception with stack
+      ObjectMessage om = endpointConnection.produceObjectMessage();
+      //  Now try to catch non-serializable exception. The Throwable passed into this method may
+      //  not be serializable. Catch the exception, and create a wrapper containing stringified  
+      //  stack trace.
+      try {
+          //  serialize the Throwable
+          if (wrapper == null) {
+            om.setObject(t);
+          } else {
+            om.setObject(wrapper);
+          }
+      } catch( RuntimeException e) {
+          //  Check if we failed due to non-serializable object in the Throwable
+          if ( e.getCause() != null && e.getCause() instanceof NotSerializableException ) {
+            //  stringify the stack trace
+            StringWriter sw = new StringWriter();
+            t.printStackTrace(new PrintWriter(sw));
+            wrapper = new UimaEEServiceException(sw.toString());
+            //  serialize the new wrapper
+            om.setObject(wrapper);
+          } else {
+            throw e;  // rethrow
+          }
+        } catch( Exception e) {
+          throw e; // rethrow
+      }
+        // Add common header properties
+      populateHeaderWithResponseContext(om, anEndpoint, aCommand); // AsynchAEMessage.Process);
 
-      TextMessage tm = endpointConnection.produceTextMessage("");
-      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
-      populateHeaderWithResponseContext(tm, anEndpoint, aCommand);
+      om.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
+      if (aCasReferenceId != null) {
+        om.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+        if (aParentCasReferenceId != null) {
+          om.setStringProperty(AsynchAEMessage.InputCasReference, aParentCasReferenceId);
+        }
+      }
 
-      endpointConnection.send(tm, 0, false);
-      addIdleTime(tm);
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(
-                Level.FINE,
-                CLASS_NAME.getName(),
-                "sendReply",
-                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_cpc_reply_sent__FINE",
-                new Object[] { getAnalysisEngineController().getComponentName(),
-                    anEndpoint.getEndpoint() });
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_exception__FINE",
+                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
       }
+      // Dispatch Message to destination
+      endpointConnection.send(om, 0, false);
+      addIdleTime(om);
     } catch (JMSException e) {
       // Unable to establish connection to the endpoint. Logit and continue
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
+      }
+    } catch (ServiceShutdownException e) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                 "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                 "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
 
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "sendReply",
-                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
-                e);
-      }
-    }
-
-    catch (ServiceShutdownException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_exception__WARNING", e);
-      }
-    }
-
-    catch (AsynchAEException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new AsynchAEException(e);
-    }
-  }
-
-  /**
-   * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
-   * full stack)
-   * 
-   * @param aCasReferenceId
-   *          - a unique CAS reference id
-   * @param anEndpoint
-   *          - an endpoint to receive the reply message
-   * 
-   * @throws AsynchAEException
-   */
-  public void sendReply(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
-    anEndpoint.setReplyEndpoint(true);
-    try {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
-                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_replyto_endpoint__FINE",
-                new Object[] { anEndpoint.getEndpoint(), aCasReferenceId });
-      }
-      if (anEndpoint.isRemote()) {
-        CacheEntry entry = null;
-        try {
-          entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
-                  aCasReferenceId);
-
-        } catch (Exception e) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINE,
-                    CLASS_NAME.getName(),
-                    "sendReply",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_cas_not_found__INFO",
-                    new Object[] { getAnalysisEngineController().getComponentName(),
-                        anEndpoint.getEndpoint(), aCasReferenceId });
-          }
-          return;
-        }
-        if (anEndpoint.getSerializer().equals("xmi")) {
-          // Serializes CAS and releases it back to CAS Pool
-          String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
-          sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
-        } else {
-          byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint
-                  .isRetryEnabled());
-          if (binaryCas == null) {
-            return;
-          }
-          sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
-        }
-      } else {
-        // Not supported
-      }
-    } catch (ServiceShutdownException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_exception__WARNING", e);
-      }
-    }
-
-    catch (AsynchAEException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new AsynchAEException(e);
-    }
-  }
-
-  /**
-   * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
-   * full stack)
-   * 
-   * @param t
-   *          - Throwable to include in the reply message
-   * @param anEndpoint
-   *          - an endpoint to receive the reply message
-   * @param aCasReferenceId
-   *          - a unique CAS reference id
-   * 
-   * @throws AsynchAEException
-   */
-  public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId,
-          Endpoint anEndpoint, int aCommand) throws AsynchAEException {
-    anEndpoint.setReplyEndpoint(true);
-    try {
-      Throwable wrapper = null;
-      if (!(t instanceof UimaEEServiceException)) {
-        // Strip off AsyncAEException and replace with UimaEEServiceException
-        if (t instanceof AsynchAEException && t.getCause() != null) {
-          wrapper = new UimaEEServiceException(t.getCause());
-        } else {
-          wrapper = new UimaEEServiceException(t);
-        }
-      }
-      if (aborting) {
-        return;
-      }
-      anEndpoint.setReplyEndpoint(true);
-      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
-      // Create Message that will contain serialized Exception with stack
-      ObjectMessage om = endpointConnection.produceObjectMessage();
-      //  Now try to catch non-serializable exception. The Throwable passed into this method may
-      //  not be serializable. Catch the exception, and create a wrapper containing stringified  
-      //  stack trace.
-      try {
-          //  serialize the Throwable
-          if (wrapper == null) {
-            om.setObject(t);
-          } else {
-            om.setObject(wrapper);
-          }
-      } catch( RuntimeException e) {
-          //  Check if we failed due to non-serializable object in the Throwable
-          if ( e.getCause() != null && e.getCause() instanceof NotSerializableException ) {
-            //  stringify the stack trace
-            StringWriter sw = new StringWriter();
-            t.printStackTrace(new PrintWriter(sw));
-            wrapper = new UimaEEServiceException(sw.toString());
-            //  serialize the new wrapper
-            om.setObject(wrapper);
-          } else {
-            throw e;  // rethrow
-          }
-        } catch( Exception e) {
-          throw e; // rethrow
-      }
-        // Add common header properties
-      populateHeaderWithResponseContext(om, anEndpoint, aCommand); // AsynchAEMessage.Process);
-
-      om.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
-      if (aCasReferenceId != null) {
-        om.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-        if (aParentCasReferenceId != null) {
-          om.setStringProperty(AsynchAEMessage.InputCasReference, aParentCasReferenceId);
-        }
-      }
-
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
-                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_exception__FINE",
-                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
-      }
-      // Dispatch Message to destination
-      endpointConnection.send(om, 0, false);
-      addIdleTime(om);
-    } catch (JMSException e) {
-      // Unable to establish connection to the endpoint. Logit and continue
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
-                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
-                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
-      }
-    } catch (ServiceShutdownException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_exception__WARNING", e);
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_exception__WARNING", e);
       }
     } catch (AsynchAEException e) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -1470,15 +1147,6 @@ public class JmsOutputChannel implements
     }
   }
 
-  private boolean endpointRetryEnabled(Endpoint[] endpoints) {
-    for (int i = 0; i < endpoints.length; i++) {
-      if (endpoints[i].isRetryEnabled()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   private void populateStats(Message aTextMessage, Endpoint anEndpoint, String aCasReferenceId,
           int anAdminCommand, boolean isRequest) throws Exception {
     if (anEndpoint.isFinal()) {
@@ -1750,9 +1418,10 @@ public class JmsOutputChannel implements
     }
   }
 
-  private void sendCasToRemoteEndpoint(boolean isRequest, String aSerializedCAS,
-          String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint,
-          boolean startTimer, long sequence) throws AsynchAEException, ServiceShutdownException {
+  private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CacheEntry entry,
+          Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
+          ServiceShutdownException {
+    CasStateEntry casStateEntry = null;
     long msgSize = 0;
     try {
       if (aborting) {
@@ -1762,33 +1431,9 @@ public class JmsOutputChannel implements
       //  Otherwise this is a request so use a broker specified in the endpoint object.
       String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
 
-      CacheEntry entry = this.getCacheEntry(aCasReferenceId);
-      if (entry == null) {
-        throw new AsynchAEException("Controller:"
-                + getAnalysisEngineController().getComponentName()
-                + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
-                + " CAS:" + aCasReferenceId + " Not In The Cache");
-      }
-
-      // Get the connection object for a given endpoint
-      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
-      if (endpointConnection == null) {
-        throw new AsynchAEException("Controller:"
-                + getAnalysisEngineController().getComponentName()
-                + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
-                + " Connection is Invalid. InputCasReferenceId:" + anInputCasReferenceId
-                + " CasReferenceId:" + aCasReferenceId + " Sequece:" + sequence);
-      }
-      if (!endpointConnection.isOpen()) {
-        if (!isRequest) {
-          return;
-        }
-      }
-      TextMessage tm = null;
-      try {
-        // Create empty JMS Text Message
-        tm = endpointConnection.produceTextMessage("");
-      } catch (AsynchAEException ex) {
+      casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
+              entry.getCasReferenceId());
+      if (casStateEntry == null) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(
                 Level.WARNING,
                 CLASS_NAME.getName(),
@@ -1796,346 +1441,36 @@ public class JmsOutputChannel implements
                 JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                 "UIMAJMS_unable_to_send_reply__WARNING",
                 new Object[] { getAnalysisEngineController().getComponentName(),
-                	anEndpoint.getDestination(), brokerConnectionURL, anInputCasReferenceId, aCasReferenceId, sequence, ex  });
-        return;
-      }
-      // Save Serialized CAS in case we need to re-send it for analysis
-      if (anEndpoint.isRetryEnabled()
-              && getAnalysisEngineController().getInProcessCache()
-                      .getSerializedCAS(aCasReferenceId) == null) {
-        getAnalysisEngineController().getInProcessCache().saveSerializedCAS(aCasReferenceId,
-                aSerializedCAS);
-      }
-      if (aSerializedCAS != null) {
-        msgSize = aSerializedCAS.length();
-      }
-      tm.setText(aSerializedCAS);
-      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
-      // Add Cas Reference Id to the outgoing JMS Header
-      tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-
-      // Add common properties to the JMS Header
-      if (isRequest == true) {
-        populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
-      } else {
-        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
-        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
-      }
-      // The following is true when the analytic is a CAS Multiplier
-      if (sequence > 0 && !isRequest) {
-        // Override MessageType set in the populateHeaderWithContext above.
-        // Make the reply message look like a request. This message will contain a new CAS
-        // produced by the CAS Multiplier. The client will treat this CAS
-        // differently from the input CAS.
-        tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-        tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
-        // Add a sequence number assigned to this CAS by the controller
-        tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
-        isRequest = true;
-        // Add the name of the FreeCas Queue
-        if (freeCASTempQueue != null) {
-          // Attach a temp queue to the outgoing message. This a queue where
-          // Free CAS notifications need to be sent from the client
-          tm.setJMSReplyTo(freeCASTempQueue);
-        }
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          if (entry != null) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINE,
-                    CLASS_NAME.getName(),
-                    "sendCasToRemoteEndpoint",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
-                    new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
-                        anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId,
-                        entry.getInputCasReferenceId() });
-          }
-        }
-      }
-      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
-    } catch (JMSException e) {
-      // Unable to establish connection to the endpoint. Log it and continue
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_unable_to_connect__INFO",
-                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
-      }
-    }
-
-    catch (ServiceShutdownException e) {
-      throw e;
-    } catch (AsynchAEException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new AsynchAEException(e);
-    }
-
-  }
-
-  private void sendCasToRemoteEndpoint(boolean isRequest, byte[] aSerializedCAS,
-          String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint,
-          boolean startTimer, long sequence) throws AsynchAEException, ServiceShutdownException {
-    long msgSize = 0;
-    try {
-      if (aborting) {
+                  anEndpoint.getDestination(), brokerConnectionURL, 
+                  entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), 
+                          entry.getCasReferenceId(), 0, 
+                          new Exception("Unable to lookup entry in Local Cache for a given Cas Id")  });
         return;
       }
-      //  If this is a reply to a client, use the same broker URL that manages this service input queue.
-      //  Otherwise this is a request so use a broker specified in the endpoint object.
-      String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
 
-      if (aSerializedCAS != null) {
-        msgSize = aSerializedCAS.length;
-      }
-      CacheEntry entry = this.getCacheEntry(aCasReferenceId);
-      if (entry == null) {
-        throw new AsynchAEException("Controller:"
-                + getAnalysisEngineController().getComponentName()
-                + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
-                + " CAS:" + aCasReferenceId + " Not In The Cache");
-      }
       // Get the connection object for a given endpoint
       JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
-      if (endpointConnection == null) {
-        throw new AsynchAEException("Controller:"
-                + getAnalysisEngineController().getComponentName()
-                + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
-                + " Connection is Invalid. InputCasReferenceId:" + anInputCasReferenceId
-                + " CasReferenceId:" + aCasReferenceId + " Sequece:" + sequence);
-      }
-      if (!endpointConnection.isOpen()) {
-        if (!isRequest) {
-          return;
-        }
-      }
-
-      BytesMessage tm = null;
-      try {
-        // Create empty JMS Text Message
-        tm = endpointConnection.produceByteMessage();
-      } catch (AsynchAEException ex) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.WARNING,
-                  CLASS_NAME.getName(),
-                  "sendCasToRemoteDelegate",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_unable_to_send_reply__WARNING",
-                  new Object[] { getAnalysisEngineController().getComponentName(),
-                  	anEndpoint.getDestination(), brokerConnectionURL, anInputCasReferenceId, aCasReferenceId, sequence, ex  });
-        return;
-      }
-
-      tm.writeBytes(aSerializedCAS);
-      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
-      // Add Cas Reference Id to the outgoing JMS Header
-      tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-      // Add common properties to the JMS Header
-      if (isRequest == true) {
-        populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
-      } else {
-        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
-        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
-      }
-      // The following is true when the analytic is a CAS Multiplier
-      if (sequence > 0 && !isRequest) {
-        // Override MessageType set in the populateHeaderWithContext above.
-        // Make the reply message look like a request. This message will contain a new CAS
-        // produced by the CAS Multiplier. The client will treat this CAS
-        // differently from the input CAS.
-        tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-        tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
-        // Add a sequence number assigned to this CAS by the controller
-        tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
-        isRequest = true;
-        if (freeCASTempQueue != null) {
-          // Attach a temp queue to the outgoing message. This a queue where
-          // Free CAS notifications need to be sent from the client
-          tm.setJMSReplyTo(freeCASTempQueue);
-        }
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          if (entry != null) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINE,
-                    CLASS_NAME.getName(),
-                    "sendCasToRemoteEndpoint",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
-                    new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
-                        anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId,
-                        entry.getInputCasReferenceId() });
-          }
-        }
-      }
-      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
-    } catch (JMSException e) {
-      // Unable to establish connection to the endpoint. Logit and continue
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_unable_to_connect__INFO",
-                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
-      }
-    }
-
-    catch (ServiceShutdownException e) {
-      throw e;
-    } catch (AsynchAEException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new AsynchAEException(e);
-    }
-
-  }
-
-  private void sendCasToRemoteEndpoint(boolean isRequest, String aSerializedCAS, CacheEntry entry,
-          Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
-          ServiceShutdownException {
-    CasStateEntry casStateEntry = null;
-    long msgSize = 0;
-    try {
-      if (aborting) {
-        return;
-      }
-      //  If this is a reply to a client, use the same broker URL that manages this service input queue.
-      //  Otherwise this is a request so use a broker specified in the endpoint object.
-      String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
 
-      casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
-              entry.getCasReferenceId());
-      // Get the connection object for a given endpoint
-      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
       if (!endpointConnection.isOpen()) {
         if (!isRequest) {
           return;
         }
       }
-      // Create empty JMS Text Message
-      TextMessage tm = null;
+      Message tm = null;
       try {
-        // Create empty JMS Text Message
-        tm = endpointConnection.produceTextMessage("");
-      } catch (AsynchAEException ex) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.WARNING,
-                  CLASS_NAME.getName(),
-                  "sendCasToRemoteDelegate",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_unable_to_send_reply__WARNING",
-                  new Object[] { getAnalysisEngineController().getComponentName(),
-                  	anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex  });
-        return;
-      }
-
-      // Save Serialized CAS in case we need to re-send it for analysis
-      if (anEndpoint.isRetryEnabled()
-              && getAnalysisEngineController().getInProcessCache().getSerializedCAS(
-                      entry.getCasReferenceId()) == null) {
-        getAnalysisEngineController().getInProcessCache().saveSerializedCAS(
-                entry.getCasReferenceId(), aSerializedCAS);
-      }
-      if (aSerializedCAS != null) {
-        msgSize = aSerializedCAS.length();
-      }
-
-      tm.setText(aSerializedCAS);
-      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
-      // Add Cas Reference Id to the outgoing JMS Header
-      tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
-      // Add common properties to the JMS Header
-      if (isRequest == true) {
-        populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
-      } else {
-        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
-        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
-      }
-      // The following is true when the analytic is a CAS Multiplier
-      if (casStateEntry.isSubordinate() && !isRequest) {
-        // Override MessageType set in the populateHeaderWithContext above.
-        // Make the reply message look like a request. This message will contain a new CAS
-        // produced by the CAS Multiplier. The client will treat this CAS
-        // differently from the input CAS.
-        tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-
-        isRequest = true;
-        // Save the id of the parent CAS
-        tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
-                .getCasReferenceId()));
-        // Add a sequence number assigned to this CAS by the controller
-        tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
-        // If this is a Cas Multiplier, add a reference to a special queue where
-        // the client sends Free Cas Notifications
-        if (freeCASTempQueue != null) {
-          // Attach a temp queue to the outgoing message. This is a queue where
-          // Free CAS notifications need to be sent from the client
-          tm.setJMSReplyTo(freeCASTempQueue);
-        }
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.FINE,
-                  CLASS_NAME.getName(),
-                  "sendCasToRemoteEndpoint",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
-                  new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
-                      anEndpoint.getEndpoint(), entry.getCasReferenceId(),
-                      entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
-        }
-      }
-
-      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
-
-    } catch (JMSException e) {
-      // Unable to establish connection to the endpoint. Logit and continue
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_unable_to_connect__INFO",
-                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
-      }
-    }
-
-    catch (ServiceShutdownException e) {
-      throw e;
-    } catch (AsynchAEException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new AsynchAEException(e);
-    }
-
-  }
-
-  private void sendCasToRemoteEndpoint(boolean isRequest, byte[] aSerializedCAS, CacheEntry entry,
-          Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
-          ServiceShutdownException {
-    CasStateEntry casStateEntry = null;
-    long msgSize = 0;
-    try {
-      if (aborting) {
-        return;
-      }
-      //  If this is a reply to a client, use the same broker URL that manages this service input queue.
-      //  Otherwise this is a request so use a broker specified in the endpoint object.
-      String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
-
-      casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
-              entry.getCasReferenceId());
-      // Get the connection object for a given endpoint
-      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+        if ( anEndpoint.getSerializer().equals("xmi")) {
+          tm = endpointConnection.produceTextMessage((String)aSerializedCAS);
+          if (aSerializedCAS != null) {
+            msgSize = ((String)aSerializedCAS).length();
+          }
 
-      if (aSerializedCAS != null) {
-        msgSize = aSerializedCAS.length;
-      }
-      if (!endpointConnection.isOpen()) {
-        if (!isRequest) {
-          return;
+        } else {
+          // Create empty JMS Bytes Message
+          tm = endpointConnection.produceByteMessage((byte[])aSerializedCAS);
+          if (aSerializedCAS != null) {
+            msgSize = ((byte[])aSerializedCAS).length;
+          }
         }
-      }
-      // Create empty JMS Text Message
-      BytesMessage tm = null;
-      try {
-        // Create empty JMS Text Message
-        tm = endpointConnection.produceByteMessage();
       } catch (AsynchAEException ex) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(
                   Level.WARNING,
@@ -2147,7 +1482,6 @@ public class JmsOutputChannel implements
                   	anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex  });
         return;
       }
-      tm.writeBytes(aSerializedCAS);
       tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
       // Add Cas Reference Id to the outgoing JMS Header
       tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
@@ -2158,9 +1492,6 @@ public class JmsOutputChannel implements
         populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);   
         tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
       }
-      if (casStateEntry == null) {
-        return;
-      }
       // The following is true when the analytic is a CAS Multiplier
       if (casStateEntry.isSubordinate() && !isRequest) {
         // Override MessageType set in the populateHeaderWithContext above.
@@ -2257,15 +1588,6 @@ public class JmsOutputChannel implements
     getAnalysisEngineController().saveReplyTime(t, "");
   }
 
-  private CacheEntry getCacheEntry(String aCasReferenceId) throws Exception {
-    CacheEntry cacheEntry = null;
-    if (getAnalysisEngineController().getInProcessCache().entryExists(aCasReferenceId)) {
-      cacheEntry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
-              aCasReferenceId);
-    }
-    return cacheEntry;
-  }
-
   public void stop() {
     stop(Channel.CloseAllChannels);
   }
@@ -2343,16 +1665,6 @@ public class JmsOutputChannel implements
     }
   }
 
-  private String extractURLWithProtocol(String aProtocolList, String aProtocol) {
-    StringTokenizer tokenizer = new StringTokenizer(aProtocolList, ",");
-    while (tokenizer.hasMoreTokens()) {
-      String token = tokenizer.nextToken().trim();
-      if (token.toLowerCase().startsWith(aProtocol.toLowerCase())) {
-        return token;
-      }
-    }
-    return null;
-  }
 
   public void cancelTimers() {
     if (connectionMap.size() > 0) {