You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [11/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uim...

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+public interface Listener extends LifecycleListener  {
+	public enum Type {
+		GetMeta, ProcessCAS, FreeCAS, Reply, Target, Unknown
+	}
+	
+	public Transport getTransport();
+	
+	public Type getType();
+	
+	public Object getEndpoint();
+	
+	// for delegate reply listener this should be delegate key
+	public String getName(); 
+}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java Mon Feb 26 18:54:11 2018
@@ -51,4 +51,18 @@ public interface CasPool {
    * @param heapSize the new initial fs heap size
    */
   public void setInitialFsHeapSize(int heapSize);
+  
+  /**
+   * Returns boolean to indicate if JCasCache should be enabled/disabled
+   *
+   * @return - boolean
+   */
+  public boolean disableJCasCache();
+  
+  /**
+   * Disables or enables JCasCache.
+   *
+   * @param disable - true or false
+   */
+  public void disableJCasCache(boolean disable);
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java Mon Feb 26 18:54:11 2018
@@ -150,4 +150,17 @@ public interface DelegateConfiguration {
    */
   public CollectionProcessCompleteErrorHandlingSettings getCollectionProcessCompleteErrorHandlingSettings();
   
+  /**
+   * Returns the value of the diableJCasCache
+   * @return true if configured to disable JCasCache. False otherwise
+   */
+  public boolean disableJCasCache();
+  
+  /**
+   * Sets the value of the diableJCasCache
+   * 
+   * @param disable - true to disable JCasCache. False otherwise
+   */
+  public void disableJCasCache(boolean disable);
+
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java Mon Feb 26 18:54:11 2018
@@ -289,5 +289,14 @@ public interface ServiceContext {
    * @return the cp c additional action
    */
   public Action getCpCAdditionalAction();
-  
+  /**
+   * Enables/disables JCas Cache
+   * 
+   */
+  public void disableJCasCache(boolean disable);
+  /**
+   * Gets a boolean to indicate if JCas Cache is enabled/disabled
+   * 
+   */
+  public boolean disableJCasCache();
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java Mon Feb 26 18:54:11 2018
@@ -38,7 +38,7 @@ public class CasMultiplierImpl implement
    * @param cmt the cmt
    */
   protected CasMultiplierImpl(CasMultiplierType cmt ) { 
-    this(cmt,1,2000000,false);
+    this(cmt,1,2000000,false,false);
   }
   
   /**
@@ -49,11 +49,12 @@ public class CasMultiplierImpl implement
    * @param initialHeapSize the initial heap size
    * @param processParentLast the process parent last
    */
-  protected CasMultiplierImpl(CasMultiplierType cmt, int casPoolSize, int initialHeapSize, boolean processParentLast) {
+  protected CasMultiplierImpl(CasMultiplierType cmt, int casPoolSize, int initialHeapSize, boolean processParentLast, boolean disableJCasCache) {
     this.cmt = cmt;
     setCasPoolSize(casPoolSize);
     setInitialFsHeapSize(initialHeapSize);
     setProcessParentLast(processParentLast);
+    disableJCasCache(disableJCasCache);
   }
   
   /**
@@ -67,6 +68,7 @@ public class CasMultiplierImpl implement
     setCasPoolSize(context.getCasPoolSize());
     setInitialFsHeapSize(context.getInitialHeapSize());
     setProcessParentLast(context.processParentLast());
+    disableJCasCache(context.disableJCasCache());
 
 //    if ( props.containsKey(UimaASDeploymentDescriptor.CASPOOL_CAS_COUNT)) {
 //      setAttr(UimaASDeploymentDescriptor.CASPOOL_CAS_COUNT,props);
@@ -151,4 +153,12 @@ public int getCasPoolSize() {
     Assert.notNull(cmt);
     cmt.setProcessParentLast(Boolean.toString(processParentLast));
   }
+  public boolean disableJCasCache() {
+	  Assert.notNull(cmt);
+	  return cmt.getDisableJCasCache();
+  }
+  public void disableJCasCache(boolean disable ) {
+	  Assert.notNull(cmt);
+	  cmt.setDisableJCasCache(disable);
+  }
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java Mon Feb 26 18:54:11 2018
@@ -45,6 +45,7 @@ public class CasPoolImpl implements CasP
     this.cpt = cpt;
     setNumberOfCases(context.getCasPoolSize());
     setInitialFsHeapSize(context.getInitialHeapSize());
+    disableJCasCache(context.disableJCasCache());
 
 //    if ( props.containsKey(UimaASDeploymentDescriptor.CASPOOL_CAS_COUNT)) {
 //      setAttr(UimaASDeploymentDescriptor.CASPOOL_CAS_COUNT,props);
@@ -99,5 +100,18 @@ public int getNumberOfCases() {
     Assert.notNull(cpt);
     cpt.setInitialFsHeapSize(heapSize);
   }
-
+  /* (non-Javadoc)
+   * @see org.apache.uima.resourceSpecifier.factory.CasPool#disableJCasCache()
+   */
+  public boolean disableJCasCache() {
+	  Assert.notNull(cpt);
+	 return cpt.getDisableJCasCache(); 
+  }
+  /* (non-Javadoc)
+   * @see org.apache.uima.resourceSpecifier.factory.CasPool#disableJCasCache(boolean)
+   */
+  public void disableJCasCache(boolean disable) {
+	  Assert.notNull(cpt);
+	  cpt.setDisableJCasCache(disable);
+  }
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java Mon Feb 26 18:54:11 2018
@@ -52,7 +52,7 @@ public class ColocatedDelegateEngineImpl
     setKey(cdc.getKey());
     setReplyQueueScaleup(cdc.getInternalReplyQueueScaleout());
     if ( cdc.isCasMultiplier()) {
-      cm = new CasMultiplierImpl(dcaet.addNewCasMultiplier(), cdc.getCasPoolSize(), cdc.getInitialHeapSize(), cdc.processParentLast());
+      cm = new CasMultiplierImpl(dcaet.addNewCasMultiplier(), cdc.getCasPoolSize(), cdc.getInitialHeapSize(), cdc.processParentLast(),cdc.disableJCasCache());
     }
     configuration= cdc;
   }
@@ -111,7 +111,11 @@ public class ColocatedDelegateEngineImpl
    * @see org.apache.uima.resourceSpecifier.factory.ColocatedDelegateEngine#setAsync()
    */
   public void setAsync() {
-    dcaet.setAsync("true");
+	if ( isAggregate()) {
+	   dcaet.setAsync("true");
+	} else {
+       dcaet.setAsync("false");
+	}
   }
 
   /* (non-Javadoc)

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java Mon Feb 26 18:54:11 2018
@@ -113,7 +113,7 @@ public class RemoteDelegateEngineImpl im
    * @param context the context
    */
   private void addCasMultiplier(CasMultiplierType cmt, DelegateConfiguration cdc, ServiceContext context) {
-    cm = new CasMultiplierImpl(cmt, cdc.getCasPoolSize(), cdc.getInitialHeapSize(), cdc.processParentLast());
+    cm = new CasMultiplierImpl(cmt, cdc.getCasPoolSize(), cdc.getInitialHeapSize(), cdc.processParentLast(), cdc.disableJCasCache());
   }
 
   /**

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java Mon Feb 26 18:54:11 2018
@@ -84,6 +84,10 @@ public class ServiceContextImpl implemen
   
   /** The cpc additional action. */
   private Action cpcAdditionalAction = Action.Terminate;
+  
+  /** The disable/enable of JCasCache */
+  private boolean disableJCasCache=true;
+
   /**
    * This class describes UIMA AS service. The minimal information needed is the service name, description, the AnalysisEngine
    * descriptor and the queue name. If not provided, the class uses defaults to fully describe the service.
@@ -373,5 +377,16 @@ public class ServiceContextImpl implemen
   public boolean isAsync() {
     return async;
   }
-
+  /* (non-Javadoc)
+   * @see org.apache.uima.resourceSpecifier.factory.ServiceContext#disableJCasCache(boolean)
+   */
+  public void disableJCasCache(boolean disable) {
+	  disableJCasCache = disable;
+  }
+  /* (non-Javadoc)
+   * @see org.apache.uima.resourceSpecifier.factory.ServiceContext#disableJCasCache()
+   */
+  public boolean disableJCasCache() {
+	  return disableJCasCache;
+  }
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Mon Feb 26 18:54:11 2018
@@ -40,6 +40,7 @@ import org.apache.uima.adapter.jms.JmsCo
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
 import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.adapter.jms.message.PendingMessageImpl;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.jms.error.handler.BrokerConnectionException;
@@ -54,11 +55,11 @@ import org.apache.uima.util.impl.Process
  * worker thread terminates when the uima ee client calls doStop() method.
  */
 public abstract class BaseMessageSender implements Runnable, MessageSender {
-  private static final Class CLASS_NAME = BaseMessageSender.class;
+  private static final Class<?> CLASS_NAME = BaseMessageSender.class;
 
   // A reference to a shared queue where application threads enqueue messages
   // to be sent
-  protected BlockingQueue<PendingMessage> messageQueue = new LinkedBlockingQueue<PendingMessage>();
+  protected BlockingQueue<PendingMessage> messageQueue = new LinkedBlockingQueue<>();
 
   // Global flag controlling lifecycle of this thread. It will be set to true
   // when the
@@ -116,7 +117,7 @@ public abstract class BaseMessageSender
     done = true;
     
     // Create an empty message to deliver to the queue that is blocking
-    PendingMessage emptyMessage = new PendingMessage(0);
+    PendingMessage emptyMessage = new PendingMessageImpl(0);
     
     messageQueue.add(emptyMessage);
     synchronized(emptyMessage) {
@@ -191,7 +192,7 @@ public abstract class BaseMessageSender
         if (pm.getMessageType() == AsynchAEMessage.Process) {
           //  fetch the cache entry for this CAS
           ClientRequest cacheEntry = (ClientRequest) engine.getCache().get(
-                  pm.get(AsynchAEMessage.CasReference));
+                  pm.getPropertyAsString(AsynchAEMessage.CasReference));
           if ( cacheEntry != null ) {
             //  We are rejecting any Process requests until connection to broker
             //  is recovered
@@ -505,12 +506,12 @@ public abstract class BaseMessageSender
         break;
 
       case AsynchAEMessage.Process:
-        String casReferenceId = (String) aPm.get(AsynchAEMessage.CasReference);
+        String casReferenceId = aPm.getPropertyAsString(AsynchAEMessage.CasReference);
         if (engine.getSerialFormat() == SerialFormat.XMI) {
-          String serializedCAS = (String) aPm.get(AsynchAEMessage.CAS);
+          String serializedCAS = aPm.getPropertyAsString(AsynchAEMessage.CAS);
           engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
         } else {
-          byte[] serializedCAS = (byte[]) aPm.get(AsynchAEMessage.CAS);
+          byte[] serializedCAS =  aPm.getPropertyAsBytesArray(AsynchAEMessage.CAS);
           engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
 
         }
@@ -523,15 +524,15 @@ public abstract class BaseMessageSender
         break;
         
       case AsynchAEMessage.ReleaseCAS:
-          String casRefId = (String) aPm.get(AsynchAEMessage.CasReference);
+          String casRefId = aPm.getPropertyAsString(AsynchAEMessage.CasReference);
           String selector = 
-        			 (String) aPm.get(AsynchAEMessage.TargetingSelector) ;
+        			 aPm.getPropertyAsString(AsynchAEMessage.TargetingSelector) ;
 
           engine.setFreeCasMessage(anOutgoingMessage, casRefId, selector);
           break;
           
       case AsynchAEMessage.Stop:
-          String casRefId2 = (String) aPm.get(AsynchAEMessage.CasReference);
+          String casRefId2 = aPm.getPropertyAsString(AsynchAEMessage.CasReference);
           engine.setStopMessage(anOutgoingMessage, casRefId2);
     	  break;
     }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Feb 26 18:54:11 2018
@@ -83,10 +83,15 @@ import org.apache.uima.aae.error.UimaEES
 import org.apache.uima.aae.jmx.UimaASClientInfo;
 import org.apache.uima.aae.jmx.UimaASClientInfoMBean;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.UimaASClientDirectMessage;
+import org.apache.uima.aae.message.UimaASClientMessage;
 import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.aae.service.UimaASService;
 import org.apache.uima.adapter.jms.ConnectionValidator;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.adapter.jms.message.PendingMessageImpl;
+import org.apache.uima.as.client.DirectMessage;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.cas.impl.AllowPreexistingFS;
@@ -173,8 +178,13 @@ public abstract class BaseUIMAAsynchrono
 
   protected AtomicLong totalCasRequestsSentBetweenCpCs = new AtomicLong();
 
-  protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
+//  protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
 
+  protected Map<String, UimaASService> serviceRegistry = 
+		  new ConcurrentHashMap<>();
+
+  abstract protected UimaASService getServiceReference();
+  
   protected MessageConsumer consumer = null;
 
   protected SerialFormat serialFormat = SerialFormat.XMI;
@@ -199,7 +209,7 @@ public abstract class BaseUIMAAsynchrono
 
   private Object casProducerMux = new Object();
 
-  protected BlockingQueue<PendingMessage> pendingMessageQueue = new LinkedBlockingQueue<PendingMessage>();
+  protected BlockingQueue<PendingMessage> pendingMessageQueue = new LinkedBlockingQueue<>();
 
   // Create Semaphore that will signal when the producer object is initialized
   protected Semaphore producerSemaphore = new Semaphore(1);
@@ -250,7 +260,7 @@ public abstract class BaseUIMAAsynchrono
 
   abstract protected void setStopMessage(Message msg, String aCasReferenceId) throws Exception;
 
-  abstract protected void setCPCMessage(Message msg) throws Exception;
+  abstract public void setCPCMessage(Message msg) throws Exception;
 
   abstract public void initialize(Map anApplicationContext) throws ResourceInitializationException;
 
@@ -270,12 +280,28 @@ public abstract class BaseUIMAAsynchrono
   
   abstract protected void dispatchFreeCasRequest(String casReferenceId, Message message) throws Exception;
 
+ abstract protected void beforeProcessReply(String casReferenceId);
+  
+  abstract protected boolean isServiceRemote();
+
   // enables/disable timer per CAS. Defaul is to use single timer for
   // all outstanding CASes
-  protected volatile boolean timerPerCAS=false;
+  public volatile boolean timerPerCAS=false;
   
   //abstract protected  String getBrokerURI();
+  
+  private long replyCount=1;
+  
+  protected Transport transport;
 
+  public boolean isRunning() {
+	  return running;
+  }
+  
+  public ClientServiceDelegate getServiceDelegate() {
+	  return serviceDelegate;
+  }
+  
   protected void setBrokeryURI(String brokerURI ) {
 	  this.brokerURI = brokerURI;
   }
@@ -381,6 +407,8 @@ public abstract class BaseUIMAAsynchrono
   }
 
   private void addMessage(PendingMessage msg) {
+	  System.out.println("Client addMessage() - adding message to pendingMessageQueue");
+
     pendingMessageQueue.add(msg);
   }
 
@@ -446,10 +474,10 @@ public abstract class BaseUIMAAsynchrono
 
       clientCache.put(uniqueIdentifier, requestToCache);
 
-      PendingMessage msg = new PendingMessage(AsynchAEMessage.CollectionProcessComplete);
+      PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.CollectionProcessComplete);
       if (cpcTimeout > 0) {
         requestToCache.startTimer();
-        msg.put(UimaAsynchronousEngine.CpcTimeout, String.valueOf(cpcTimeout));
+        msg.addProperty(UimaAsynchronousEngine.CpcTimeout, String.valueOf(cpcTimeout));
       }
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -492,9 +520,16 @@ public abstract class BaseUIMAAsynchrono
     Iterator it = clientCache.keySet().iterator();
     while (it.hasNext()) {
       ClientRequest entry = clientCache.get((String) it.next());
-      if (entry != null && entry.getCAS() != null) {
-        entry.getCAS().release();
+      System.out.println(".......... Client.releaseCacheEntries() - Client.hashCode()-"+this.hashCode()+" has "+clientCache.size()+ " entries remaining");
+      if ( entry.isThisClientTheCasOwner(this) ) {
+    	  if (entry != null && entry.getCAS() != null) {
+        	  System.out.println("............. Client is the owner of CAS:"+entry.getCasReferenceId()+" - CAS release is permitted");
+              entry.getCAS().release();
+            }
       }
+//      if (entry != null && entry.getCAS() != null) {
+//        entry.getCAS().release();
+//      }
     }
   }
 
@@ -569,13 +604,13 @@ public abstract class BaseUIMAAsynchrono
                   JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO",
                   new Object[] {});
         }
-        for (Iterator i = springContainerRegistry.entrySet().iterator(); i.hasNext();) {
+        for (Iterator i = serviceRegistry.entrySet().iterator(); i.hasNext();) {
           Map.Entry entry = (Map.Entry) i.next();
           Object key = entry.getKey();
           undeploy((String) key);
         }
         asynchManager = null;
-        springContainerRegistry.clear();
+        serviceRegistry.clear();
         listeners.clear();
         clientCache.clear();
         threadQueue.clear();
@@ -773,8 +808,9 @@ public abstract class BaseUIMAAsynchrono
   }
 
   protected void sendMetaRequest() throws Exception {
-    PendingMessage msg = new PendingMessage(AsynchAEMessage.GetMeta);
-    ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); // , metadataTimeout);
+    PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.GetMeta);
+    ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); 
+    
     requestToCache.setIsRemote(remoteService);
     requestToCache.setMetaRequest(true);
     requestToCache.setMetadataTimeout(metadataTimeout);
@@ -849,7 +885,7 @@ public abstract class BaseUIMAAsynchrono
     }
     //	If the CR is done, enter a polling loop waiting for outstanding CASes to return
     //  from a service
-    if (hasNext == false ) {
+    if (!hasNext) {
     	Object mObject = new Object();
     	//	if client is running and there are outstanding CASe go sleep for awhile and
     	//  try again, until all CASes come back from a service
@@ -870,7 +906,38 @@ public abstract class BaseUIMAAsynchrono
   protected ConcurrentHashMap<String, ClientRequest> getCache() {
     return clientCache;
   }
-
+  private void prepareRemoteRequest(PendingMessage msg, ClientRequest requestToCache) throws Exception {
+      long t1 = System.nanoTime();
+      switch (serialFormat) {
+      case XMI:
+        XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
+        String serializedCAS = serializeCAS(requestToCache.getCAS(), serSharedData);
+        msg.addProperty(AsynchAEMessage.CAS,serializedCAS);
+        if (remoteService) {  // always true 5/2013
+          // Store the serialized CAS in case the timeout occurs and need to send the
+          // the offending CAS to listeners for reporting
+          requestToCache.setCAS(serializedCAS);
+          requestToCache.setXmiSerializationSharedData(serSharedData);
+        }
+        break;
+      case BINARY:
+        byte[] serializedBinaryCAS = uimaSerializer.serializeCasToBinary(requestToCache.getCAS());
+        msg.addProperty(AsynchAEMessage.CAS,serializedBinaryCAS);
+        break;
+      case COMPRESSED_FILTERED:
+        // can't use uimaserializer directly - project doesn't have ref to this one
+        // for storing the reuse info
+        BinaryCasSerDes6 bcs = new BinaryCasSerDes6(requestToCache.getCAS(), this.getRemoteTypeSystem());
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+        bcs.serialize(baos);
+        requestToCache.setCompress6ReuseInfo(bcs.getReuseInfo());           
+        msg.addProperty(AsynchAEMessage.CAS,baos.toByteArray());
+        break;
+      default:
+        throw new UIMARuntimeException(new Exception("Internal Error - Unsupported Serialization Format:"+serialFormat));    
+      }
+      requestToCache.setSerializationTime(System.nanoTime() - t1);
+  }
   /**
    * Sends a given CAS for analysis to the UIMA EE Service.
    * 
@@ -895,104 +962,37 @@ public abstract class BaseUIMAAsynchrono
           }
           return null;
         }
-
-        clientCache.put(casReferenceId, requestToCache);
-        PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
-        long t1 = System.nanoTime();
-        switch (serialFormat) {
-        case XMI:
-          XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
-          String serializedCAS = serializeCAS(aCAS, serSharedData);
-          msg.put(AsynchAEMessage.CAS, serializedCAS);
-          if (remoteService) {  // always true 5/2013
-            // Store the serialized CAS in case the timeout occurs and need to send the
-            // the offending CAS to listeners for reporting
-            requestToCache.setCAS(serializedCAS);
-            requestToCache.setXmiSerializationSharedData(serSharedData);
-          }
-          break;
-        case BINARY:
-          byte[] serializedBinaryCAS = uimaSerializer.serializeCasToBinary(aCAS);
-          msg.put(AsynchAEMessage.CAS, serializedBinaryCAS);
-          break;
-        case COMPRESSED_FILTERED:
-          // can't use uimaserializer directly - project doesn't have ref to this one
-          // for storing the reuse info
-          BinaryCasSerDes6 bcs = new BinaryCasSerDes6(aCAS, this.getRemoteTypeSystem());
-          ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
-          bcs.serialize(baos);
-          requestToCache.setCompress6ReuseInfo(bcs.getReuseInfo());           
-          msg.put(AsynchAEMessage.CAS, baos.toByteArray());
-          break;
-        default:
-          throw new UIMARuntimeException(new Exception("Internal Error"));    
+        // Save the CAS in the cache entry
+        requestToCache.setCAS(aCAS);
+        // add current work item to the client cache
+        if ( !clientCache.containsKey(casReferenceId)) {
+            clientCache.put(requestToCache.getCasReferenceId(), requestToCache);
         }
+ 
+        // pending message is an object which will be added to the work queue for dispatching
+        PendingMessage msg = 
+        		new PendingMessageImpl(AsynchAEMessage.Process);
+   
+		msg.addProperty(AsynchAEMessage.CasReference, requestToCache.getCasReferenceId());
         
-        requestToCache.setCAS(aCAS);
-
-        requestToCache.setSerializationTime(System.nanoTime() - t1);
-        msg.put(AsynchAEMessage.CasReference, casReferenceId);
-        requestToCache.setIsRemote(remoteService);
+        if ( isServiceRemote() ) {
+            prepareRemoteRequest(msg, requestToCache);
+            requestToCache.setIsRemote(true);
+        } else {
+    		msg.addProperty(AsynchAEMessage.CAS, aCAS);
+        }
         requestToCache.setEndpoint(getEndPointName());
         requestToCache.setProcessTimeout(processTimeout);
         requestToCache.clearTimeoutException();
 
         // The sendCAS() method is synchronized no need to synchronize the code below
         if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
-          SharedConnection sharedConnection = lookupConnection(getBrokerURI());
-          
-          //  Send Ping to service as getMeta request
-          if ( sharedConnection != null && !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) {
-            serviceDelegate.setAwaitingPingReply();
-            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
-			// serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
-        	  
-        	  // since the service is in time out state, we dont send CASes to it just yet. Instead, place
-        	  // a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING
-        	  // arrives.
-            serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId(), aCAS.hashCode(), timerPerCAS); 
-            if ( cpcReadySemaphore.availablePermits() > 0 ) {
-              acquireCpcReadySemaphore();
-            }
-
-            // Send PING Request to check delegate's availability
-            sendMetaRequest();
-            // @@@@@@@@@@@@@@@ Changed on 4/20 serviceDelegate.cancelDelegateTimer();
-            // Start a timer for GetMeta ping and associate a cas id
-            // with this timer. The delegate is currently in a timed out
-            // state due to a timeout on a CAS with a given casReferenceId.
-            //  
-            serviceDelegate.startGetMetaRequestTimer(casReferenceId);
-            
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS",
-                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
-                      new Object[] { serviceDelegate.getKey() });
+            if ( remoteService ) {
+           // 	handleDelegateTimeout(requestToCache, aCAS);
+            	return requestToCache.getCasReferenceId();
             }
-            return casReferenceId;
-          } else {
-            if ( !requestToCache.isSynchronousInvocation() && !sharedConnection.isOpen() ) {
-              Exception exception = new BrokerConnectionException("Unable To Deliver CAS:"+requestToCache.getCasReferenceId()+" To Destination. Connection To Broker "+getBrokerURI()+" Has Been Lost");
-              handleException(exception, requestToCache.getCasReferenceId(), null, requestToCache, true);
-              return casReferenceId;
-            } else {
-              //  Add to the outstanding list.  
-			  //  serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
-        	  // since the service is in time out state, we dont send CASes to it just yet. Instead, place
-        	  // a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING
-        	  // arrives.
-              serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId(), aCAS.hashCode(), timerPerCAS);
-              return casReferenceId;
-            }
-          }
         }
-        SharedConnection sharedConnection = lookupConnection(getBrokerURI());
-        if ( sharedConnection != null &&  !sharedConnection.isOpen() ) {
-          if (requestToCache != null && !requestToCache.isSynchronousInvocation() && aCAS != null ) {
-            aCAS.release();
-          }
-          throw new ResourceProcessException(new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+sharedConnection.getBroker()+" Has Been Lost")); 
-        }    
+
         // Incremented number of outstanding CASes sent to a service. When a reply comes
         // this counter is decremented
         outstandingCasRequests.incrementAndGet();
@@ -1001,14 +1001,22 @@ public abstract class BaseUIMAAsynchrono
         totalCasRequestsSentBetweenCpCs.incrementAndGet();
         // Add message to the pending queue
         addMessage(msg);
+        
       } catch (ResourceProcessException e) {
-        clientCache.remove(casReferenceId);
+       	  if ( clientCache.containsKey(requestToCache.getCasReferenceId())) {
+    	        clientCache.remove(requestToCache.getCasReferenceId());
+       	  }
         throw e;
       } catch (Exception e) {
-        clientCache.remove(casReferenceId);
+    	  if ( clientCache.containsKey(requestToCache.getCasReferenceId())) {
+  	        clientCache.remove(requestToCache.getCasReferenceId());
+  	      }
+
+        //clientCache.remove(casReferenceId);
         throw new ResourceProcessException(e);
       }
-      return casReferenceId;
+      return requestToCache.getCasReferenceId();
+
     }
   }
 
@@ -1029,6 +1037,8 @@ public abstract class BaseUIMAAsynchrono
 
   private ClientRequest produceNewClientRequestObject() {
     String casReferenceId = idGenerator.nextId();
+    
+    System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>> CLIENT ABOUT TO SEND CAS:"+casReferenceId);
     return new ClientRequest(casReferenceId, this);
   }
 
@@ -1056,6 +1066,8 @@ public abstract class BaseUIMAAsynchrono
    */
   protected void handleCollectionProcessCompleteReply(Message message) throws Exception {
     int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+    System.out.println("In handleCollectionProcessCompleteReply() ...");
+
     try {
       if (AsynchAEMessage.Exception == payload) {
         ProcessTrace pt = new ProcessTrace_impl();
@@ -1087,7 +1099,228 @@ public abstract class BaseUIMAAsynchrono
       cpcReplySemaphore.release();
     }
   }
+  
+  protected void handleCollectionProcessCompleteReply(UimaASClientMessage message) throws Exception {
+	    int payload = message.payload();
+	    System.out.println("In handleCollectionProcessCompleteReply() ...");
+	    try {
+	      if (AsynchAEMessage.Exception == payload) {
+	        ProcessTrace pt = new ProcessTrace_impl();
+	        UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+	        Exception exception = retrieveExceptionFromMessage(message);
+
+	        status.addEventStatus("CpC", "Failed", exception);
+	        notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
+	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(
+	                  Level.INFO,
+	                  CLASS_NAME.getName(),
+	                  "handleCollectionProcessCompleteReply",
+	                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAJMS_received_exception_msg_INFO",
+	                  new Object[] { message.messageFrom(),
+	                    getBrokerURI(),  
+	                    message.casReferenceId(), exception });
+	        }
+	      } else {
+	        // After receiving CPC reply there may be cleanup to do. Delegate this
+	        // to platform specific implementation (ActiveMQ or WAS)
+	        cleanup();
+	      }
+	    } catch (Exception e) {
+	      throw e;
+	    } finally {
+	      // Release the semaphore acquired in collectionProcessingComplete()
+	      cpcReplySemaphore.release();
+	    }
+	  }
 
+  private Exception retrieveExceptionFromMessage(UimaASClientMessage message) throws Exception {
+	    Exception exception = null;
+	    try {
+	    	if ( message.payload() == AsynchAEMessage.BinaryPayload ) {
+	    		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+	    		// Deserialize as Exception TBD !!!!!!!!!!!!!!!!!!1
+		        exception = (Exception) ((ObjectMessage) message).getObject();
+	    		
+	    	} else {
+		        exception = new UimaEEServiceException(((TextMessage) message).getText());
+	    	}
+/*
+	    	if (message instanceof ObjectMessage
+	              && ((ObjectMessage) message).getObject() instanceof Exception) {
+	        exception = (Exception) ((ObjectMessage) message).getObject();
+	      } else if (message instanceof TextMessage) {
+	        exception = new UimaEEServiceException(((TextMessage) message).getText());
+	      }
+	*/
+	    } catch( Exception e) {
+	      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+	              "retrieveExceptionFromMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	              "UIMAEE_exception__WARNING", e);   
+	      exception = new UimaEEServiceException("UIMA AS client is unable to de-serialize Exception from a remote service",e);
+	    }
+	    return exception;
+	  }
+  private void handleProcessReplyFromSynchronousCall(ClientRequest cachedRequest, UimaASClientMessage message)
+          throws Exception {
+    // Save reply message in the cache
+    cachedRequest.setMessage(message);
+    wakeUpSendThread(cachedRequest);
+  }
+  
+  /**
+   * Handles response to GetMeta Request. Deserializes ResourceMetaData and initializes CasManager.
+   * 
+   * @param message
+   *          - jms message containing serialized ResourceMetaData
+   * 
+   * @throws Exception
+   */
+  
+  protected void handleMetadataReply(UimaASClientMessage message) throws Exception {
+	   serviceDelegate.cancelDelegateGetMetaTimer();
+	    serviceDelegate.setState(Delegate.OK_STATE);
+	    // check if the reply msg contains replyTo destination. It will be
+	    // added by the Cas Multiplier to the getMeta reply
+
+	/*
+	 * !!!!!!!!!!!!!!!!!!!!!!! ENABLE CODE BELOW IN JMS SPECIFIC IMPL
+	!!!!!!!!!!!!!!!!!!!!!!!!! MOVE THIS CODE TO JMS SPECIFIC CODE     
+	    if (message.replyTo() != null) { 
+	      serviceDelegate.setFreeCasDestination(message.getJMSReplyTo());
+	    }
+	 */
+
+	    // Check if this is a reply for a Ping sent in response to a timeout
+	    if (serviceDelegate.isAwaitingPingReply()) {
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	        UIMAFramework.getLogger(CLASS_NAME).logrb(
+	                Level.INFO,
+	                CLASS_NAME.getName(),
+	                "handleMetadataReply",
+	                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                "UIMAJMS_rcvd_ping_reply__INFO",
+	                new Object[] { 
+	                  message.messageFrom(),
+	                  message.serverIP()});
+	      }
+	      //  reset the state of the service. The client received its ping reply  
+	      serviceDelegate.resetAwaitingPingReply();
+	      String casReferenceId = null;
+	      if (serviceDelegate.getCasPendingReplyListSize() > 0 || serviceDelegate.getCasPendingDispatchListSize() > 0) {
+	    	  serviceDelegate.restartTimerForOldestCasInOutstandingList();
+	          //	We got a reply to GetMeta ping. Send all CASes that have been
+	          //    placed on the pending dispatch queue to a service.
+	    	  while( serviceDelegate.getState()==Delegate.OK_STATE && (casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList()) != null ) {
+	    	        ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+	    	        if (cachedRequest != null) {
+	    	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	    	            UIMAFramework.getLogger(CLASS_NAME).logrb(
+	    	                    Level.INFO,
+	    	                    CLASS_NAME.getName(),
+	    	                    "handleMetadataReply",
+	    	                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	    	                    "UIMAJMS_dispatch_delayed_cas__INFO",
+	    	                    new Object[] { casReferenceId, String.valueOf(cachedRequest.cas.hashCode())});
+	    	          }
+	    	          sendCAS(cachedRequest.getCAS(), cachedRequest, null);
+	    	        }
+	    	  }
+	      } else {
+	        ProcessTrace pt = new ProcessTrace_impl();
+	        UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+	        notifyListeners(null, status, AsynchAEMessage.GetMeta);
+	      }
+	      // Handled Ping reply
+	      return;
+	    }
+	    int payload = message.payload();
+	    removeFromCache(uniqueIdentifier);
+
+	    try {
+	      if (AsynchAEMessage.Exception == payload) {
+	        ProcessTrace pt = new ProcessTrace_impl();
+	        UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+	        Exception exception = retrieveExceptionFromMessage(message);
+	        clientSideJmxStats.incrementMetaErrorCount();
+	        status.addEventStatus("GetMeta", "Failed", exception);
+	        notifyListeners(null, status, AsynchAEMessage.GetMeta);
+	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(
+	                  Level.INFO,
+	                  CLASS_NAME.getName(),
+	                  "handleMetadataReply",
+	                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAJMS_received_exception_msg_INFO",
+	                  new Object[] { message.messageFrom(),
+	                      getBrokerURI(),
+	                      message.casReferenceId(), exception });
+	        }
+	        abort = true;
+	        initialized = false;
+	      } else {
+	        // Check serialization supported by the service against client configuration.
+	        // If the client is configured to use Binary serialization *but* the service
+	        // doesnt support it, change the client serialization to xmi. Old services will
+	        // not return in a reply the type of serialization supported which implies "xmi".
+	        // New services *always* return "binary" or "compressedBinaryXXX" 
+	        // as a default serialization. The client
+	        // however may still want to serialize messages using xmi though.
+	        if (!message.serializationSpecified()) {
+	          // Dealing with an old service here, check if there is a mismatch with the
+	          // client configuration. If the client is configured with binary serialization
+	          // override this and change serialization to "xmi".
+	          if (getSerialFormat() != SerialFormat.XMI) {
+	            // Override configured serialization
+	            setSerialFormat(SerialFormat.XMI);
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                    "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] {});
+	          }
+	        } else {
+	          //final int c = message.serialization();
+	          if (getSerialFormat() != SerialFormat.XMI) {
+	            // don't override if XMI - because the remote may have different type system
+	              setSerialFormat((message.serializationMethod() == AsynchAEMessage.XMI_SERIALIZATION)    ? SerialFormat.XMI :
+	                              (message.serializationMethod() == AsynchAEMessage.BINARY_SERIALIZATION) ? SerialFormat.BINARY :
+	                                                                            SerialFormat.COMPRESSED_FILTERED);
+	          }
+	        }
+	        /*
+	        String meta = ((TextMessage) message).getText();
+	        System.out.println(meta);
+	        ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
+	        XMLInputSource in1 = new XMLInputSource(bis, null);
+	        // Adam - store ResouceMetaData in field so we can return it from getMetaData().
+	        resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
+	                .parseResourceMetaData(in1);
+	        */
+	        resourceMetadata = (ProcessingResourceMetaData) message.asObject(AsynchAEMessage.AEMetadata);
+	        // if remote delegate, save type system 
+	        if (brokerURI != null && !brokerURI.startsWith("vm:")) { // test if remote
+	          setRemoteTypeSystem(AggregateAnalysisEngineController_impl.getTypeSystemImpl(resourceMetadata));
+	        }
+	        casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
+	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+	                  "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAJMS_handling_meta_reply_FINEST",
+	                  new Object[] { message.messageFrom(), resourceMetadata });
+	        }
+	        //  check the state of the client 
+	        if ( running && asynchManager != null ) {
+	          //  Merge the metadata only if the client is still running
+	          asynchManager.addMetadata(resourceMetadata);
+	        }
+	      }
+
+	    } catch (Exception e) {
+	      throw e;
+	    } finally {
+	      getMetaSemaphore.release();
+	    }
+  }
   /**
    * Handles response to GetMeta Request. Deserializes ResourceMetaData and initializes CasManager.
    * 
@@ -1208,7 +1441,7 @@ public abstract class BaseUIMAAsynchrono
         resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
                 .parseResourceMetaData(in1);
         // if remote delegate, save type system 
-        if (!brokerURI.startsWith("vm:")) { // test if remote
+        if (brokerURI != null && !brokerURI.startsWith("vm:")) { // test if remote
           setRemoteTypeSystem(AggregateAnalysisEngineController_impl.getTypeSystemImpl(resourceMetadata));
         }
         casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
@@ -1411,7 +1644,150 @@ public abstract class BaseUIMAAsynchrono
 		    }
 
   }
-
+  /**
+   * Handles response to Process CAS request. If the message originated in a service that is running
+   * in a separate jvm (remote), deserialize the CAS and notify the application of the completed
+   * analysis via application listener.
+   * 
+   * @param message
+   *          - jms message containing serialized CAS
+   * 
+   * @throws Exception
+   */
+  protected void handleProcessReply(UimaASClientMessage message, boolean doNotify, ProcessTrace pt)
+          throws Exception {
+	    if (!running) {
+	        return;
+	      }
+	      int payload = -1;
+	      String casReferenceId = message.asString(AsynchAEMessage.CasReference);
+	    
+	//      beforeProcessReply(casReferenceId);
+	      
+	      // Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
+	      if (message.contains(AsynchAEMessage.Payload)) {
+	        payload = message.asInt(AsynchAEMessage.Payload);
+	      }
+	      // Fetch entry from the client cache for a cas id returned from the service
+	      // The client cache maintains an entry for every outstanding CAS sent to the
+	      // service.
+	      ClientRequest cachedRequest = null;
+
+	      if (casReferenceId != null) {
+	    	 
+	        cachedRequest =  clientCache.get(casReferenceId);
+	        if (cachedRequest != null && casReferenceId.equals(cachedRequest.getCasReferenceId())) {
+	      	  // invoke 2nd callback to the application if necessary. Makes  sure the callback is done once
+	            notifyApplication(cachedRequest, 
+	          		  pt, 
+	          		  casReferenceId, 
+	          		  message.asString(AsynchAEMessage.InputCasReference),
+	          		  message.asString(AsynchAEMessage.ServerIP), 
+	          		  message.asString(AsynchAEMessage.UimaASProcessPID));
+
+	      	    // Received a reply, decrement number of outstanding CASes. An outstanding CAS is one
+	            // for which we have not received a reply.
+	            decrementOutstandingCasCounter();
+	          }
+	        
+	        // turn off Process timer and remove CAS from outstanding (waiting for reply) list
+	        serviceDelegate.removeCasFromOutstandingList(casReferenceId);
+	        
+
+	  /*
+	        // Below is for debugging phase only. REMOVE BEFORE COMMIT!!!!!!!!!!!!!
+	        CAS cas = cachedRequest.getCAS();
+	        FSIterator<AnnotationFS> it = 
+	      		  cas.getAnnotationIndex().iterator();
+	        while( it.hasNext()) {
+	      	  AnnotationFS afs = it.next();
+	      	  System.out.println("..... Feature Covered Text:"+afs.getCoveredText());
+	        }
+	        // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+	    */
+	      }
+	      if (AsynchAEMessage.Exception == payload) {
+            Object exception = message.asObject(AsynchAEMessage.ErrorCause);
+            if ( exception != null && exception instanceof Exception ) {
+             	String parentCasId = message.asString(AsynchAEMessage.InputCasReference);
+            	handleException((Exception)exception, parentCasId, cachedRequest, true);
+            }
+	        return;
+	      }
+	      // cachedRequest is only null if we are receiving child CASes from a 
+	      // Cas Multiplier. Otherwise, we drop the message as it is out of band
+	      if ( cachedRequest == null && !casMultiplierDelegate ) {
+	      	// most likely a reply came in after the thread was interrupted
+	      	return;
+	      }
+	      
+	      
+	      // If the Cas Reference id not in the message check if the message contains an
+	      // exception and if so, handle the exception and return.
+	      if (casReferenceId == null) {
+	        return;
+	      }
+
+	      if (message instanceof TextMessage
+	              && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+	        UIMAFramework.getLogger(CLASS_NAME).logrb(
+	                Level.FINEST,
+	                CLASS_NAME.getName(),
+	                "handleProcessReply",
+	                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                "UIMAJMS_handling_process_reply_FINEST",
+	                new Object[] { message.asString(AsynchAEMessage.MessageFrom),
+	                    message.asString(AsynchAEMessage.CasReference),
+	                    message.toString() + ((TextMessage) message).getText() });
+	      }
+
+	      if (cachedRequest != null) {
+		      beforeProcessReply(casReferenceId);
+
+	    	  // Store the total latency for this CAS. The departure time is set right before the CAS
+	        // is sent to a service.
+	        cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
+	    	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                  "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAJMS_cas_reply_rcvd_FINE", new Object[] { casReferenceId, String.valueOf(cachedRequest.getCAS().hashCode())});
+	        }
+
+	        // If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
+	        // sent the CAS and process the reply
+	        if (cachedRequest.isSynchronousInvocation()) {
+	          handleProcessReplyFromSynchronousCall(cachedRequest, message);
+	          System.out.println("handleProcessReply() - handling process reply for sync call");
+	        } else {
+//	          deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, doNotify);
+	          System.out.println("handleProcessReply() - handling process reply for async call #"+replyCount++);
+	          completeProcessingReply( doNotify, message, pt);
+	          
+	        }
+	      } else if (message.contains(AsynchAEMessage.InputCasReference)) {
+	      	int command = message.asInt(AsynchAEMessage.Command);
+	      	if (AsynchAEMessage.ServiceInfo != command) {
+	              System.out.println("handleProcessReply() - handling process reply for cas multiplier");
+
+	      		handleProcessReplyFromCasMultiplier(message, casReferenceId, payload);
+	      	
+	      	}
+	      } else {
+	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	          // Most likely expired message. Already handled as timeout. Discard the message and move on
+	          // to the next
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(
+	                  Level.INFO,
+	                  CLASS_NAME.getName(),
+	                  "handleProcessReply",
+	                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAJMS_received_expired_msg_INFO",
+	                  new Object[] { message.asString(AsynchAEMessage.MessageFrom),
+	                      message.asString(AsynchAEMessage.CasReference) });
+	        }
+	      }
+	  
+  }
   /**
    * Handles response to Process CAS request. If the message originated in a service that is running
    * in a separate jvm (remote), deserialize the CAS and notify the application of the completed
@@ -1424,6 +1800,8 @@ public abstract class BaseUIMAAsynchrono
    */
   protected void handleProcessReply(Message message, boolean doNotify, ProcessTrace pt)
           throws Exception {
+	   System.out.println(".............. Client.handleProcessReply() 1");
+
     if (!running) {
       return;
     }
@@ -1522,6 +1900,79 @@ public abstract class BaseUIMAAsynchrono
       }
     }
   }
+  private void handleProcessReplyFromCasMultiplier(UimaASClientMessage message, String casReferenceId,
+          int payload ) throws Exception {
+    // Check if the message contains a CAS that was generated by a Cas Multiplier. If so,
+    // verify that the message also includes an input CAS id and that such input CAS id
+    // exists in the client's cache.
+    // Fetch the input CAS Reference Id from which the CAS being processed was generated from
+    String inputCasReferenceId = message.asString(AsynchAEMessage.InputCasReference);
+    // Fetch the destination for Free CAS notification
+    @SuppressWarnings("unchecked")
+	BlockingQueue<DirectMessage> releaseCASQueue =
+    		( BlockingQueue<DirectMessage>) message.asObject(AsynchAEMessage.FreeCASQueue);
+    // send request to a remote CAS multiplier to release a CAS
+    getServiceReference().releaseCAS(casReferenceId, releaseCASQueue);
+   
+    System.out.println("............ Client Sent ReleaseCAS Request for CAS:"+casReferenceId);
+    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINEST,
+                CLASS_NAME.getName(),
+                "handleProcessReplyFromCasMultiplier",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_client_sending_release_cas_FINEST",
+                new Object[] { releaseCASQueue,
+                    message.asString(AsynchAEMessage.CasReference) });
+      }
+
+    // Fetch an entry from the client cache for a given input CAS id. This would be an id
+    // of the CAS that the client sent out to the service.
+   
+    ClientRequest inputCasCachedRequest = (ClientRequest) clientCache.get(inputCasReferenceId);
+    if (inputCasCachedRequest == null) {
+    	System.out.println(".... Client Sent FREE CAS request for CAS:"+casReferenceId+" Input CAS:"+inputCasReferenceId+" Entry Not Found in Client Cache");
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+        // Most likely expired message. Already handled as timeout. Discard the message and move on
+        // to the next
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.INFO,
+                CLASS_NAME.getName(),
+                "handleProcessReplyFromCasMultiplier",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_received_expired_msg_INFO",
+                new Object[] { message.asString(AsynchAEMessage.MessageFrom),
+                    message.asString(AsynchAEMessage.InputCasReference) });
+      }
+      return;
+    }
+    /*
+     * 
+     * UNCOMMENT THIS
+    if (inputCasCachedRequest.isSynchronousInvocation()) {
+    	// with synchronous invocation, child CASes are thrown away. With sync API, the UIMA-AS client
+    	// is not using callbacks. 
+    	if ( casReferenceId.equals(inputCasCachedRequest.getCasReferenceId())) {
+    	      handleProcessReplyFromSynchronousCall(inputCasCachedRequest, message);
+    	} else {
+    		return;
+    	}
+    }
+    CAS cas = null;
+    if (message instanceof TextMessage) {
+      cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL);
+    } else {
+      long bodyLength = ((BytesMessage) message).getBodyLength();
+      byte[] serializedCas = new byte[(int) bodyLength];
+      ((BytesMessage) message).readBytes(serializedCas);
+      cas = deserializeCAS(serializedCas, SHADOW_CAS_POOL);
+
+    }
+
+    completeProcessingReply(cas, casReferenceId, payload, true, message, inputCasCachedRequest,
+            null);
+      */      
+  }
 
   private void handleProcessReplyFromCasMultiplier(Message message, String casReferenceId,
           int payload /* , ClientRequest inputCasCachedRequest */) throws Exception {
@@ -1530,6 +1981,9 @@ public abstract class BaseUIMAAsynchrono
     // exists in the client's cache.
     // Fetch the input CAS Reference Id from which the CAS being processed was generated from
     String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
+    // Fetch the destination for Free CAS notification
+ //   Destination freeCASNotificationDestination = message.getJMSReplyTo();
+
     try {
   	  String nodeIP = message.getStringProperty(AsynchAEMessage.ServerIP);
 	      String pid = message.getStringProperty(AsynchAEMessage.UimaASProcessPID);
@@ -1732,14 +2186,21 @@ public abstract class BaseUIMAAsynchrono
       decrementOutstandingCasCounter();
     }
   }
-  
+  protected void handleException(Exception exception,String inputCasReferenceId, ClientRequest cachedRequest, boolean doNotify)
+          throws Exception {
+    
+    String casReferenceId = cachedRequest.getCasReferenceId();
+    handleException(exception, casReferenceId, inputCasReferenceId, cachedRequest, doNotify);
+  }
   protected void handleException(Message message, ClientRequest cachedRequest, boolean doNotify)
           throws Exception {
     
     Exception exception = retrieveExceptionFromMessage(message);
-    String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
+//    String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
     String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
-    handleException(exception, casReferenceId, inputCasReferenceId, cachedRequest, doNotify);
+//    handleException(exception, casReferenceId, inputCasReferenceId, cachedRequest, doNotify);
+    handleException(exception, inputCasReferenceId, cachedRequest, true);
+
   }
 
     private void dumpMetrics(List<AnalysisEnginePerformanceMetrics> ml, String casReferenceId ) {
@@ -1769,6 +2230,7 @@ public abstract class BaseUIMAAsynchrono
     }
     return ml;
     }
+    /*
   private void completeProcessingReply(CAS cas, String casReferenceId, int payload,
           boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt)
           throws Exception {
@@ -1837,7 +2299,186 @@ public abstract class BaseUIMAAsynchrono
       }
     }
   }
+*/
+    private void completeProcessingReply( boolean doNotify, UimaASClientMessage message, ProcessTrace pt)
+            throws Exception {
+  	  ClientRequest cachedRequest = 
+  			  (ClientRequest) clientCache.get(message.asString(AsynchAEMessage.CasReference));
+  	  int payload = message.asInt(AsynchAEMessage.Payload);
+  	  
+      if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload
+              || AsynchAEMessage.CASRefID == payload) {
+        if (pt == null) {
+          pt = new ProcessTrace_impl();
+        }
+        try {
+          // Log stats and populate ProcessTrace object
+
+      	  // UNCOMMENT BELOW. NEED THIS. 08/24/2016 !!!!!!!!!!!1
+      	  //        logTimingInfo(message, pt, cachedRequest);
+            // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+      	  if (doNotify) {
+            UimaASProcessStatusImpl status;
+            String inputCasReferenceId = message.asString(AsynchAEMessage.InputCasReference);
+            if (inputCasReferenceId != null
+                    && inputCasReferenceId.equals(cachedRequest.getCasReferenceId())) {
+              status = new UimaASProcessStatusImpl(pt, cachedRequest.getCAS(), message.asString(AsynchAEMessage.CasReference), inputCasReferenceId);
+            } else {
+              status = new UimaASProcessStatusImpl(pt, cachedRequest.getCAS(), message.asString(AsynchAEMessage.CasReference));
+            }
+            if ( message.contains(AsynchAEMessage.CASPerComponentMetrics)) {
+              // Add CAS identifier to enable matching replies with requests
+        //     notifyListeners(cachedRequest.getCAS(), status, AsynchAEMessage.Process, message.asString(AsynchAEMessage.CASPerComponentMetrics));
+            } else {
+              // Add CAS identifier to enable matching replies with requests
+              notifyListeners(cachedRequest.getCAS(), status, AsynchAEMessage.Process);
+            }
+          } else {  // synchronous sendAndReceive() was used
+              if (message.asString(AsynchAEMessage.CasReference) != null && message.contains(AsynchAEMessage.CASPerComponentMetrics) ) {
+                  //cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+                  if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
+                  	cachedRequest.getComponentMetricsList().
+                  		addAll(UimaSerializer.deserializePerformanceMetrics(message.asString(AsynchAEMessage.CASPerComponentMetrics)));
+                  }
+              }
+          }
+        } finally {
+          // Dont release the CAS if the application uses synchronous API
+//          if (remoteService && !cachedRequest.isSynchronousInvocation()) {
+          if ( !cachedRequest.isSynchronousInvocation()) {
+            if (cachedRequest.getCAS() != null) {
+          	  System.out.println("... Releasing CAS "+cachedRequest.getCasReferenceId());
+            	cachedRequest.getCAS().release();
+            }
+          }
+          System.out.println("1 /////////////////////////////////// Client.hashCode()-"+this.hashCode()+" calling removeFromCache()- CAS:"+message.asString(AsynchAEMessage.CasReference));
+
+          removeFromCache(message.asString(AsynchAEMessage.CasReference));
+        }
+      }
+    }
+    private void completeProcessingReply(CAS cas, String casReferenceId, int payload,
+            boolean doNotify,Message message, ClientRequest cachedRequest, ProcessTrace pt)
+            throws Exception {
+      if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload
+              || AsynchAEMessage.CASRefID == payload) {
+        if (pt == null) {
+          pt = new ProcessTrace_impl();
+        }
+        try {
+          // Log stats and populate ProcessTrace object
+          logTimingInfo(message, pt, cachedRequest);
+          if (doNotify) {
+            UimaASProcessStatusImpl status;
+            String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
+            if (inputCasReferenceId != null
+                    && inputCasReferenceId.equals(cachedRequest.getCasReferenceId())) {
+              status = new UimaASProcessStatusImpl(pt, cas,casReferenceId, inputCasReferenceId);
+            } else {
+              status = new UimaASProcessStatusImpl(pt, cas, casReferenceId);
+            }
+            if ( message.propertyExists(AsynchAEMessage.CASPerComponentMetrics)) {
+      	      List<AnalysisEnginePerformanceMetrics> ml = 
+      	  		  getMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics), casReferenceId, UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST));
+
+              // Add CAS identifier to enable matching replies with requests
+              notifyListeners(cas, status, AsynchAEMessage.Process, ml);//message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics));
+            } else {
+              // Add CAS identifier to enable matching replies with requests
+              notifyListeners(cas, status, AsynchAEMessage.Process);
+            }
+          } else {  // synchronous sendAndReceive() was used
+              if (casReferenceId != null && message.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
+                  cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+                  if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
+                  	cachedRequest.getComponentMetricsList().
+                  		addAll(UimaSerializer.deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
+                  }
+              }
+          }
+        } finally {
+          // Dont release the CAS if the application uses synchronous API
+          if (remoteService && !cachedRequest.isSynchronousInvocation()) {
+            if (cas != null) {
+              cas.release();
+            }
+          }
+          System.out.println("2 /////////////////////////////////// Client.hashCode()-"+this.hashCode()+" calling removeFromCache()- CAS:"+casReferenceId);
+          removeFromCache(casReferenceId);
+        }
+      }
+    }
+	private boolean isProcessRequest(PendingMessage pm) {
+		return pm.getMessageType() == AsynchAEMessage.Process;
+	}
+	private boolean isGetMetaRequest(PendingMessage pm) {
+		return pm.getMessageType() == AsynchAEMessage.GetMeta;
+	}
+
+  public void beforeDispatch(PendingMessage pm) {
+	  if (isProcessRequest(pm)) {
+		  ClientRequest cacheEntry = 
+				  (ClientRequest) getCache().get( pm.getPropertyAsString(AsynchAEMessage.CasReference));
+		  cacheEntry.setCASDepartureTime(System.nanoTime());
+	      // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+	      getServiceDelegate().
+	            addCasToOutstandingList(cacheEntry.getCasReferenceId(), cacheEntry.getCAS().hashCode(), timerPerCAS); // true=timer per cas
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+	   	   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	               "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	               "UIMAJMS_cas_added_to_pending_FINE", new Object[] 
+	            		   { pm.getPropertyAsString(AsynchAEMessage.CasReference), 
+	            				   String.valueOf(cacheEntry.getCAS().hashCode()), 
+	            				   getServiceDelegate().toString()});
+	      }
+		  
+	  } else if (isGetMetaRequest(pm) && getServiceDelegate().getGetMetaTimeout() > 0) {
+
+		getServiceDelegate().startGetMetaRequestTimer();  
+	  }
+  }
+  public void onMessage(DirectMessage message) {
+	  if ( !this.running) {
+		  return;    // ignore the message if the client is stopping
+	  }
+	  
+	  onMessage(new UimaASClientDirectMessage(message));
+  }
+  protected void onMessage( UimaASClientMessage message ) {
+	  // process service replies
+	  try {
+		  
+		  /*
+		    UimaASCommandHandler commandHandler = CommandHandlers.newCommandHandler(this, message.command());
+		    commandHandler.handle(message);
+		    
+		   */
+		  switch(message.command()) {
+		  case AsynchAEMessage.GetMeta:    // received GetMeta reply from a service
+			  
+		      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+	              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+	                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
+	                      new Object[] { message.messageFrom()});
+	            }
+		      System.out.println("... Processing GetMeta Reply");
+	            handleMetadataReply(message);
+			  break;
+		  case AsynchAEMessage.Process:    // received Process reply from a service
+			  System.out.println("onMessage() - recv'd Process reply");
+			  handleProcessReply(message, true, null);
+			  break;
+
+		  case AsynchAEMessage.CollectionProcessComplete:    // received Process reply from a service
+			  System.out.println("onMessage() - recv'd CPC reply");
+			  handleCollectionProcessCompleteReply(message);
+			  break;
+		  }
+	  } catch( Throwable t) {
+		  t.printStackTrace();
+	  }
 
+  }
   private void logTimingInfo(Message message, ProcessTrace pt, ClientRequest cachedRequest)
           throws Exception {
     clientSideJmxStats.incrementTotalNumberOfCasesProcessed();
@@ -2069,7 +2710,28 @@ public abstract class BaseUIMAAsynchrono
     uimaSerializer.deserializeCasFromBinary(aSerializedCAS, cas);
     return cas;
   }
+	private CAS deserializeCAS(ClientRequest cachedRequest) throws Exception {
+		CAS cas = null;
+
+		Message message = cachedRequest.getMessage();
+	    if (message != null) {
+			boolean deltaCas = false;
+			if (message.propertyExists(AsynchAEMessage.SentDeltaCas)) {
+				deltaCas = message.getBooleanProperty(AsynchAEMessage.SentDeltaCas);
+			}
+
+			if (message instanceof TextMessage) {
+				cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest, deltaCas);
+			} else {
+				long bodyLength = ((BytesMessage) message).getBodyLength();
+				byte[] serializedCas = new byte[(int) bodyLength];
+				((BytesMessage) message).readBytes(serializedCas);
+				cas = deserializeCAS(serializedCas, cachedRequest);
 
+			}
+	    }
+		return cas;
+	}
   /**
    * Listener method receiving JMS Messages from the response queue.
    * 
@@ -2078,6 +2740,19 @@ public abstract class BaseUIMAAsynchrono
 	  if ( !this.running) {
 		  return;    // ignore the message if the client is stopping
 	  }
+	  /*
+	   
+	   UimaMessage um = new JmsMessage(message);
+	   MessageHandler mh = MessageHandlers.createHandler(this);
+	   mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newProcessCASCommand());
+	   mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newProcessChildCASCommand());
+	   mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newGetMetaCommand());
+	   mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newCPCCommand());
+	   mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newServiceInfoCommand());
+	   
+	   mh.handle(um);
+	   
+	   */
     // Process message in a separate thread. Previously the message was processed in ActiveMQ dispatch thread.
     // This onMessage() method is called by ActiveMQ code from a critical region protected with a lock. The lock 
     // is only released if this method returns. Running in a dispatch thread caused a hang when an application
@@ -2107,6 +2782,8 @@ public abstract class BaseUIMAAsynchrono
           }
 
           int command = message.getIntProperty(AsynchAEMessage.Command);
+          
+          System.out.println(" >>>>>>>>>> Received Reply for Command:"+command);
           if (AsynchAEMessage.CollectionProcessComplete == command) {
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
@@ -2120,7 +2797,9 @@ public abstract class BaseUIMAAsynchrono
                       JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
                       new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
             }
+            System.out.println(">>>>>> Handling GetMeta Reply");
             handleMetadataReply(message);
+            System.out.println(">>>>>> Done Handling GetMeta Reply");
           } else if (AsynchAEMessage.Process == command) {
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
@@ -2148,10 +2827,92 @@ public abstract class BaseUIMAAsynchrono
         
       }
     });
-
+	  
   }
 
   /**
+   * Listener method receiving JMS Messages from the response queue.
+   * 
+   */
+  /*
+  public void onMessage(final Message message) {
+	  if ( !this.running) {
+		  return;    // ignore the message if the client is stopping
+	  }
+    // Process message in a separate thread. Previously the message was processed in ActiveMQ dispatch thread.
+    // This onMessage() method is called by ActiveMQ code from a critical region protected with a lock. The lock 
+    // is only released if this method returns. Running in a dispatch thread caused a hang when an application
+    // decided to call System.exit() in any of its callback listener methods. The UIMA AS client adds a 
+    // ShutdownHoook to the JVM to enable orderly shutdown which includes stopping JMS Consumer, JMS Producer
+    // and finally stopping JMS Connection. The ShutdownHook support was added to the client in case the 
+    // application doesnt call client's stop() method. Now, the hang was caused by the fact that the dispatch
+    // thread was used to call System.exit() which in turn executed client's ShutdownHook code. The ShutdownHook
+    // code runs in a separate thread, but the the JVM blocks the dispatch thread until the ShutdownHook 
+    // finishes. It never will though, since the ShutdownHook is calling ActiveMQSession.close() which tries to enter
+    // the same critical region that the dispatch thread is still stuck into. DEADLOCK.
+    // The code below uses a simple FixedThreadPool Executor with a single thread. This thread is reused instead
+    // creating one on the fly.
+    exec.execute( new Runnable() {
+      
+      public void run() {
+        try {
+
+          
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
+                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST",
+                    new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+          }
+          if (!message.propertyExists(AsynchAEMessage.Command)) {
+            return;
+          }
+
+          int command = message.getIntProperty(AsynchAEMessage.Command);
+          if (AsynchAEMessage.CollectionProcessComplete == command) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE",
+                      new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+            }
+            handleCollectionProcessCompleteReply(message);
+          } else if (AsynchAEMessage.GetMeta == command) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
+                      new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+            }
+            handleMetadataReply(message);
+          } else if (AsynchAEMessage.Process == command) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE",
+                      new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+            }
+            handleProcessReply(message, true, null);
+          } else if (AsynchAEMessage.ServiceInfo == command) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+                      "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                      "UIMAJMS_received_service_info_FINEST",
+                      new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+            }
+            handleServiceInfo(message);
+          }
+        } catch (Exception e) {
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                    "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAEE_exception__WARNING", e);
+          }
+
+        }
+        
+      }
+    });
+
+  }
+*/
+  /**
    * Gets the ProcessingResourceMetadata for the asynchronous AnalysisEngine.
    */
   public ProcessingResourceMetaData getMetaData() throws ResourceInitializationException {
@@ -2207,6 +2968,8 @@ public abstract class BaseUIMAAsynchrono
     }
 
     ClientRequest cachedRequest = produceNewClientRequestObject();
+    clientCache.put(cachedRequest.getCasReferenceId(), cachedRequest);
+
     cachedRequest.setSynchronousInvocation();
 //    cachedRequest.setTargetServiceId(targetServiceId);
     //	save application provided List where the performance stats will be copied
@@ -2371,9 +3134,38 @@ public abstract class BaseUIMAAsynchrono
     try {
       // Process reply in the send thread
       Message message = cachedRequest.getMessage();
-      if (message != null) {
-        deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
+	    if (message == null) {
+	    	throw new IllegalArgumentException("Client Recv'd Invalid Reply Message for CAS:"+casReferenceId+" The JMS Message is Missing ");
+	    }
+
+	    
+	   	 CAS cas = null;
+    	 int payload = AsynchAEMessage.None;
+      // Process reply in the send thread
+      if (transport.equals(Transport.JMS)) {
+    	 if (!running) {
+    	    return casReferenceId;
+    	 }
+
+	    System.out.println("............ Client.hashCode():"+this.hashCode()+" Recv'd Reply For CAS:"+casReferenceId+" Deserializing");
+    	 long t1 = System.nanoTime();
+    	 cas = deserializeCAS(cachedRequest);
+    	 cachedRequest.setDeserializationTime(System.nanoTime() - t1);
+
+    	payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+        completeProcessingReply(cas, casReferenceId, payload, false, message, cachedRequest, pt);
+    	  
+//    	  deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
+
+      } else {
+    	  cas = cachedRequest.getCAS();
+          completeProcessingReply(cas, casReferenceId, payload, false, message, cachedRequest, pt);
+
       }
+      
+//      if (message != null) {
+//        deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
+//      }
     } catch (ResourceProcessException rpe) {
       throw rpe;
     } catch (Exception e) {
@@ -2620,6 +3412,8 @@ public abstract class BaseUIMAAsynchrono
     
     List<AnalysisEnginePerformanceMetrics> componentMetricsList;
     
+    private UimaASClientMessage clientMessage;
+
     // flag to indicate if the remote service acknowledged receiving a CAS for processing
     private volatile boolean receivedServiceACK=false;
     
@@ -2632,7 +3426,16 @@ public abstract class BaseUIMAAsynchrono
 	public void setTargetServiceId(String serviceTargetId) {
 		this.targetServiceId = serviceTargetId;
 	}
-
+    public boolean isThisClientTheCasOwner(BaseUIMAAsynchronousEngineCommon_impl client) {
+    	System.out.println("......isThisClientTheCasOwner() - client.hashCode():"+client.hashCode()+" uimaEEEngine.hashCode():"+uimaEEEngine.hashCode());
+    	return client.equals(uimaEEEngine);
+    }
+    public UimaASClientMessage getClientMessage() {
+        return clientMessage;
+    }
+    public void setMessage(UimaASClientMessage message) {
+        this.clientMessage = message;
+    }
 	public boolean receivedServiceACK() {
 		return receivedServiceACK;
 	}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java Mon Feb 26 18:54:11 2018
@@ -33,7 +33,7 @@ import org.apache.uima.aae.message.UIMAM
 import org.apache.uima.cas.SerialFormat;
 
 public class JmsMessageContext implements MessageContext {
-  private static final Class CLASS_NAME = JmsMessageContext.class;
+  private static final Class<?> CLASS_NAME = JmsMessageContext.class;
 
   private Message message;
 

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java Mon Feb 26 18:54:11 2018
@@ -18,18 +18,12 @@
  */
 package org.apache.uima.adapter.jms.message;
 
-import java.util.HashMap;
-
-public class PendingMessage extends HashMap<Object, Object> {
- 
-private static final long serialVersionUID = 3512718154731557413L;
-private int messageType;
-
-  public PendingMessage(int aMessageType) {
-    messageType = aMessageType;
-  }
-
-  public int getMessageType() {
-    return messageType;
-  }
+public interface PendingMessage {
+	
+	public int getMessageType();
+	public Object getProperty(String propertyKey);
+	public void addProperty(String propertyKey, Object property);
+	public String getPropertyAsString(String key);
+	public int getPropertyAsInt(String key);
+	public byte[] getPropertyAsBytesArray(String key);
 }