You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2020/06/10 13:15:10 UTC

[uima-async-scaleout] 26/34: UIMA-5501 refactored to use pluggagble endpoints

This is an automated email from the ASF dual-hosted git repository.

cwiklik pushed a commit to branch uima-as-3
in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git

commit eec56f664844869f6072105109f0a2ee0728e767
Author: cwiklik <cwiklik>
AuthorDate: Thu Nov 29 17:29:40 2018 +0000

    UIMA-5501 refactored to use pluggagble endpoints
---
 .../adapter/jms/client/ActiveMQMessageSender.java  | 673 ++++++++++-----------
 .../client/BaseUIMAAsynchronousEngine_impl.java    |  35 +-
 .../apache/uima/as/dispatcher/LocalDispatcher.java |  81 ++-
 .../apache/uima/ee/test/TestUimaASNoErrors.java    |   9 +-
 4 files changed, 439 insertions(+), 359 deletions(-)

diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
index 0ec5a1f..a95be45 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
@@ -51,142 +51,138 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.impl.ProcessTrace_impl;
 
 /**
- * Initializes JMS session and creates JMS MessageProducer to be used for sending messages to a
- * given destination. It extends BaseMessageSender which starts the worker thread and is tasked with
- * sending messages. The application threads share a common 'queue' with the worker thread. The
- * application threads add messages to the pendingMessageList 'queue' and the worker thread consumes
- * them.
+ * Initializes JMS session and creates JMS MessageProducer to be used for
+ * sending messages to a given destination. It extends BaseMessageSender which
+ * starts the worker thread and is tasked with sending messages. The application
+ * threads share a common 'queue' with the worker thread. The application
+ * threads add messages to the pendingMessageList 'queue' and the worker thread
+ * consumes them.
  * 
  */
 public class ActiveMQMessageSender extends BaseMessageSender {
-  private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
-
-  private volatile Connection connection = null;
-
-  private Session session = null;
-
-  private MessageProducer producer = null;
-
-  private String destinationName = null;
-
-  private ConcurrentHashMap<Destination, MessageProducer> producerMap = 
-		  new ConcurrentHashMap<Destination, MessageProducer>();
-
-  public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
-          BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
-    super(engine);
-    connection = aConnection;
-    destinationName = aDestinationName;
-  }
-
-  public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception {
-    if (producerMap.containsKey(destination)) {
-      return (MessageProducer) producerMap.get(destination);
-    }
-    createSession();
-    MessageProducer mProducer = session.createProducer(destination);
-    mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-    producerMap.put(destination, mProducer);
-    return mProducer;
-  }
-  /**
-   * This is called when a new Connection is created after broker is restarted
-   */
-  public void setConnection(Connection aConnection) {
-    connection = aConnection;
-    cleanup();
-    try {
-      initializeProducer();
-    } catch( Exception e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
-                "setConnection", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
-      }
-    }
-    
-  }
-  private String getBrokerURL() {
-    try {
-      return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
-    } catch (Exception ex) { /* handle silently. */
-    }
-    return "";
-  }
-
-  private void createSession() throws Exception {
-    String broker = getBrokerURL();
-    try {
-      if (session == null || engine.producerInitialized == false) {
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      }
-    } catch (JMSException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_client_failed_creating_session_INFO",
-                new Object[] { destinationName, broker });
-      }
-      if (connection == null) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                  "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_client_connection_not_ready_INFO", new Object[] { broker });
-        }
-      } else if (((ActiveMQConnection) connection).isClosed()
-              || ((ActiveMQConnection) connection).isClosing()) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME)
-                  .logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
-                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_client_connection_closed_INFO",
-                          new Object[] { destinationName, broker });
-        }
-      }
-      throw e;
-    } catch (Exception e) {
-      throw e;
-    }
-  }
-  /**
-   * Returns the full name of the destination queue
-   */
-  protected String getDestinationEndpoint() throws Exception {
-    return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
-  }
-
-  /**
-   * Creates a jms session object used to instantiate message producer
-   */
-  protected void initializeProducer() throws Exception {
-    createSession();
-    producer = getMessageProducer(session.createQueue(destinationName));
-  }
-
-
-  /**
-   * Returns jsm MessageProducer
-   */
-  public MessageProducer getMessageProducer() {
-    if ( engine.running && engine.producerInitialized == false  ) {
-      try {
-        SharedConnection con = engine.lookupConnection(getBrokerURL());
-        if ( con != null ) {
-          setConnection(con.getConnection());
-          initializeProducer();
-          engine.producerInitialized = true;
-        }
-      } catch( Exception e) {
-        e.printStackTrace();
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
-                "getMessageProducer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
-        }
-      }
-    } 
-    return producer;
-  }
+	private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
+
+	private volatile Connection connection = null;
+
+	private Session session = null;
+
+	private MessageProducer producer = null;
+
+	private String destinationName = null;
+
+	private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>();
+
+	public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
+			BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
+		super(engine);
+		connection = aConnection;
+		destinationName = aDestinationName;
+	}
+
+	public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception {
+		if (producerMap.containsKey(destination)) {
+			return (MessageProducer) producerMap.get(destination);
+		}
+		createSession();
+		MessageProducer mProducer = session.createProducer(destination);
+		mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+		producerMap.put(destination, mProducer);
+		return mProducer;
+	}
+
+	/**
+	 * This is called when a new Connection is created after broker is restarted
+	 */
+	public void setConnection(Connection aConnection) {
+		connection = aConnection;
+		cleanup();
+		try {
+			initializeProducer();
+		} catch (Exception e) {
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "setConnection",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+			}
+		}
+
+	}
+
+	private String getBrokerURL() {
+		try {
+			return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
+		} catch (Exception ex) { /* handle silently. */
+		}
+		return "";
+	}
+
+	private void createSession() throws Exception {
+		String broker = getBrokerURL();
+		try {
+			if (session == null || engine.producerInitialized == false) {
+				session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			}
+		} catch (JMSException e) {
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+						JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_failed_creating_session_INFO",
+						new Object[] { destinationName, broker });
+			}
+			if (connection == null) {
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_not_ready_INFO",
+							new Object[] { broker });
+				}
+			} else if (((ActiveMQConnection) connection).isClosed() || ((ActiveMQConnection) connection).isClosing()) {
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO",
+							new Object[] { destinationName, broker });
+				}
+			}
+			throw e;
+		} catch (Exception e) {
+			throw e;
+		}
+	}
+
+	/**
+	 * Returns the full name of the destination queue
+	 */
+	protected String getDestinationEndpoint() throws Exception {
+		return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
+	}
+
+	/**
+	 * Creates a jms session object used to instantiate message producer
+	 */
+	protected void initializeProducer() throws Exception {
+		createSession();
+		producer = getMessageProducer(session.createQueue(destinationName));
+	}
+
+	/**
+	 * Returns jsm MessageProducer
+	 */
+	public MessageProducer getMessageProducer() {
+		if (engine.running && engine.producerInitialized == false) {
+			try {
+				SharedConnection con = engine.lookupConnection(getBrokerURL());
+				if (con != null) {
+					setConnection(con.getConnection());
+					initializeProducer();
+					engine.producerInitialized = true;
+				}
+			} catch (Exception e) {
+				e.printStackTrace();
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "getMessageProducer",
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+				}
+			}
+		}
+		return producer;
+	}
 
 	public TextMessage createTextMessage() throws Exception {
 		synchronized (ActiveMQMessageSender.class) {
@@ -209,14 +205,14 @@ public class ActiveMQMessageSender extends BaseMessageSender {
 
 	}
 
-  public BytesMessage createBytesMessage() throws Exception {
+	public BytesMessage createBytesMessage() throws Exception {
 		synchronized (ActiveMQMessageSender.class) {
-		    if (session == null) {
-		        //	Force initialization of Producer
-		          initializeProducer();
-		    }
-		    BytesMessage msg = null;
-		    try {
+			if (session == null) {
+				// Force initialization of Producer
+				initializeProducer();
+			}
+			BytesMessage msg = null;
+			try {
 				msg = session.createBytesMessage();
 			} catch (IllegalStateException e) {
 				// stale Session
@@ -227,203 +223,202 @@ public class ActiveMQMessageSender extends BaseMessageSender {
 			return msg;
 		}
 
- //   return session.createBytesMessage();
-  }
-
-  /**
-   * Cleanup any jms resources used by the worker thread
-   */
-  protected void cleanup() { 
-    try {
-      if (session != null) {
-        session.close();
-        session = null;
-      }
-      if (producer != null) {
-        producer.close();
-        producer = null;
-      }
-    } catch (Exception e) {
-      // Ignore we are shutting down
-    } finally {
-      producerMap.clear();
-    }
-  }
-  protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) throws Exception {
-      SharedConnection sc = 
-    		  engine.lookupConnection(engine.getBrokerURI());
-      ClientRequest cacheEntry = null;
-      boolean doCallback = false;
-      boolean addTimeToLive = true;
-      Session jmsSession = null;
-      
-      // Check the environment for existence of NoTTL tag. If present,
-      // the deployer of the service wants to disable message expiration.
-      if (System.getProperty("NoTTL") != null) {
-        addTimeToLive = false;
-      }
-      try {
-//    	  long t1 = System.currentTimeMillis();
-    	  jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
-          
-          // Request JMS Message from the concrete implementation
-          Message message = null;
-          // Determine if this a CAS Process Request
-//          boolean casProcessRequest = isProcessRequest(pm);
-          // Only Process request can be serialized as binary
-          if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
-            message = jmsSession.createBytesMessage();
-          } else {
-            message = jmsSession.createTextMessage();
-          }
-          //  get the producer initialized from a valid connection
-         // producer = getMessageProducer();
-          
-          Destination d = null;
-          String selector = null;
-          // UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue
-          // instead of a temp queue. Regular queues can be recovered in case of
-          // a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
-          // Code in JmsOutputChannel will add the selector if the service is a CM.
-          if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) {
-        	  selector = (String)pm.getPropertyAsString(AsynchAEMessage.TargetingSelector);
-          }
-          if ( selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS || pm.getMessageType() == AsynchAEMessage.Stop) ) {
-        	  d = (Destination)pm.getProperty(AsynchAEMessage.Destination);
-              
-          } else {
-              d = jmsSession.createQueue(destinationName);
-          }
-          MessageProducer mProducer = jmsSession.createProducer(d);
-          mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-          //System.out.println(">>>>>>> Time to create and initialize JMS Sesssion:"+(System.currentTimeMillis()-t1));
-          super.initializeMessage(pm, message);
-    	  String destination = ((ActiveMQDestination) d).getPhysicalName();
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINE,
-                    CLASS_NAME.getName(),
-                    "run",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_sending_msg_to_endpoint__FINE",
-                    new Object[] {
-                        UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
-                                .getIntProperty(AsynchAEMessage.Command)),
-                        UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
-                                .getIntProperty(AsynchAEMessage.MessageType)), destination });
-          }
-          if (casProcessRequest) {
-            cacheEntry = (ClientRequest) engine.getCache().get(
-                    pm.getPropertyAsString(AsynchAEMessage.CasReference));
-            if (cacheEntry != null) {
-            //    CAS cas = cacheEntry.getCAS();
-                // enable logging 
-                if (System.getProperty("UimaAsCasTracking") != null) {
-                  message.setStringProperty("UimaAsCasTracking", "enable");
-                }
-                // Target specific service instance if targeting for the CAS is provided
-                // by the client application
-    			if ( cacheEntry.getTargetServiceId() != null ) {
- //   				System.out.println("------------Client Sending CAS to Service Instance With Id:"+cacheEntry.getTargetServiceId());;
-    				message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty, cacheEntry.getTargetServiceId());
-    			}
-         	   // Use Process Timeout value for the time-to-live property in the
-              // outgoing JMS message. When this time is exceeded
-              // while the message sits in a queue, the JMS Server will remove it from
-              // the queue. What happens with the expired message depends on the
-              // configuration. Most JMS Providers create a special dead-letter queue
-              // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
-              // are not auto evicted yet and accumulate taking up memory.
-              long timeoutValue = cacheEntry.getProcessTimeout();
-
-              if (timeoutValue > 0 && addTimeToLive ) {
-                // Set high time to live value
-                message.setJMSExpiration(10 * timeoutValue);
-              }
-              if (pm.getMessageType() == AsynchAEMessage.Process) {
-                cacheEntry.setCASDepartureTime(System.nanoTime());
-              }
-              cacheEntry.setCASDepartureTime(System.nanoTime());
-      
-              doCallback = true;
-              
-          } else {
-              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-
-                  UIMAFramework.getLogger(CLASS_NAME).logrb(
-                          Level.WARNING,
-                          CLASS_NAME.getName(),
-                          "run",
-                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_failed_cache_lookup__WARNING",
-                          new Object[] {
-                         	 pm.getPropertyAsString(AsynchAEMessage.CasReference),
-                              UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
-                                      .getIntProperty(AsynchAEMessage.Command)),
-                              UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
-                                      .getIntProperty(AsynchAEMessage.MessageType)), destination });
-                }
-             }
-         	 
-          }
-          // start timers
-          if( casProcessRequest ) { 
-         	 CAS cas = cacheEntry.getCAS();
-
-
-            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
-            engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(), engine.timerPerCAS); // true=timer per cas
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-         	   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                     "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                     "UIMAJMS_cas_added_to_pending_FINE", new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), engine.serviceDelegate.toString()});
-            }
-
-          
-          } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta &&
-                  engine.serviceDelegate.getGetMetaTimeout() > 0 ) {
-            // timer for PING has been started in sendCAS()
-            if ( !engine.serviceDelegate.isAwaitingPingReply()) {
-              engine.serviceDelegate.startGetMetaRequestTimer();
-            } 
-          } else {
-         	 doCallback = false;  // dont call onBeforeMessageSend() callback on CPC
-          }
-          //  Dispatch asynchronous request to Uima AS service
-          mProducer.send(message);
-          
-          if ( doCallback ) {
-            UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),cacheEntry.getCAS(),
-                    cacheEntry.getCasReferenceId());
-            // Notify engine before sending a message
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-                UIMAFramework.getLogger(CLASS_NAME).logrb(
-                        Level.FINE,
-                        CLASS_NAME.getName(),
-                        "run",
-                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                        "UIMAJMS_calling_onBeforeMessageSend__FINE",
-                        new Object[] {
-                          pm.getPropertyAsString(AsynchAEMessage.CasReference),
-                          String.valueOf(cacheEntry.getCAS().hashCode())
-                        });
-              }  
-            // Note the callback is a misnomer. The callback is made *after* the send now
-            // Application receiving this callback can consider the CAS as delivere to a queue
-            engine.onBeforeMessageSend(status);
-          
-          
-          }
-      } finally {
-    	  if ( jmsSession != null ) {
-    		  try {
-    			  jmsSession.close(); 
-    		  } catch( Exception eee) {
-    			  
-    		  }
-    	  }
-      }
-
-
-  }
+		// return session.createBytesMessage();
+	}
+
+	/**
+	 * Cleanup any jms resources used by the worker thread
+	 */
+	protected void cleanup() {
+		try {
+			if (session != null) {
+				session.close();
+				session = null;
+			}
+			if (producer != null) {
+				producer.close();
+				producer = null;
+			}
+		} catch (Exception e) {
+			// Ignore we are shutting down
+		} finally {
+			producerMap.clear();
+		}
+	}
+
+	protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine,
+			boolean casProcessRequest) throws Exception {
+		SharedConnection sc = engine.lookupConnection(engine.getBrokerURI());
+		ClientRequest cacheEntry = null;
+		boolean doCallback = false;
+		boolean addTimeToLive = true;
+		Session jmsSession = null;
+
+		// Check the environment for existence of NoTTL tag. If present,
+		// the deployer of the service wants to disable message expiration.
+		if (System.getProperty("NoTTL") != null) {
+			addTimeToLive = false;
+		}
+		try {
+			// long t1 = System.currentTimeMillis();
+			jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+			// Request JMS Message from the concrete implementation
+			Message message = null;
+			// Determine if this a CAS Process Request
+			// boolean casProcessRequest = isProcessRequest(pm);
+			// Only Process request can be serialized as binary
+			if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
+				message = jmsSession.createBytesMessage();
+			} else {
+				message = jmsSession.createTextMessage();
+			}
+			// get the producer initialized from a valid connection
+			// producer = getMessageProducer();
+
+			Destination d = null;
+			String selector = null;
+			// UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue
+			// instead of a temp queue. Regular queues can be recovered in case of
+			// a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
+			// Code in JmsOutputChannel will add the selector if the service is a CM.
+			if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) {
+				selector = (String) pm.getPropertyAsString(AsynchAEMessage.TargetingSelector);
+			}
+			if (selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS
+					|| pm.getMessageType() == AsynchAEMessage.Stop)) {
+				d = (Destination) pm.getProperty(AsynchAEMessage.Destination);
+
+			} else {
+				d = jmsSession.createQueue(destinationName);
+			}
+			MessageProducer mProducer = jmsSession.createProducer(d);
+			mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+			// System.out.println(">>>>>>> Time to create and initialize JMS
+			// Sesssion:"+(System.currentTimeMillis()-t1));
+			super.initializeMessage(pm, message);
+			String destination = ((ActiveMQDestination) d).getPhysicalName();
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
+						JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+						new Object[] {
+								UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+										message.getIntProperty(AsynchAEMessage.Command)),
+								UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+										message.getIntProperty(AsynchAEMessage.MessageType)),
+								destination });
+			}
+			if (casProcessRequest) {
+				cacheEntry = (ClientRequest) engine.getCache()
+						.get(pm.getPropertyAsString(AsynchAEMessage.CasReference));
+				if (cacheEntry != null) {
+					// CAS cas = cacheEntry.getCAS();
+					// enable logging
+					if (System.getProperty("UimaAsCasTracking") != null) {
+						message.setStringProperty("UimaAsCasTracking", "enable");
+					}
+					// Target specific service instance if targeting for the CAS is provided
+					// by the client application
+					if (cacheEntry.getTargetServiceId() != null) {
+						// System.out.println("------------Client Sending CAS to Service Instance With
+						// Id:"+cacheEntry.getTargetServiceId());;
+						message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
+								cacheEntry.getTargetServiceId());
+					}
+					// Use Process Timeout value for the time-to-live property in the
+					// outgoing JMS message. When this time is exceeded
+					// while the message sits in a queue, the JMS Server will remove it from
+					// the queue. What happens with the expired message depends on the
+					// configuration. Most JMS Providers create a special dead-letter queue
+					// where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the
+					// DLQ
+					// are not auto evicted yet and accumulate taking up memory.
+					long timeoutValue = cacheEntry.getProcessTimeout();
+
+					if (timeoutValue > 0 && addTimeToLive) {
+						// Set high time to live value
+						message.setJMSExpiration(10 * timeoutValue);
+					}
+					if (pm.getMessageType() == AsynchAEMessage.Process) {
+						cacheEntry.setCASDepartureTime(System.nanoTime());
+					}
+					cacheEntry.setCASDepartureTime(System.nanoTime());
+
+					doCallback = true;
+
+				} else {
+					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "run",
+								JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING",
+								new Object[] { pm.getPropertyAsString(AsynchAEMessage.CasReference),
+										UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+												message.getIntProperty(AsynchAEMessage.Command)),
+										UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+												message.getIntProperty(AsynchAEMessage.MessageType)),
+										destination });
+					}
+					return; // no cache entry, done here
+				}
+
+			}
+			// start timers
+			if (casProcessRequest) {
+				CAS cas = cacheEntry.getCAS();
+
+				// Add the cas to a list of CASes pending reply. Also start the timer if
+				// necessary
+				engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(),
+						engine.timerPerCAS); // true=timer per cas
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS",
+							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE",
+							new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()),
+									engine.serviceDelegate.toString() });
+				}
+
+			} else if (pm.getMessageType() == AsynchAEMessage.GetMeta
+					&& engine.serviceDelegate.getGetMetaTimeout() > 0) {
+				// timer for PING has been started in sendCAS()
+				if (!engine.serviceDelegate.isAwaitingPingReply()) {
+					engine.serviceDelegate.startGetMetaRequestTimer();
+				}
+			} else {
+				doCallback = false; // dont call onBeforeMessageSend() callback on CPC
+			}
+			// Dispatch asynchronous request to Uima AS service
+			mProducer.send(message);
+
+			if (doCallback) {
+				UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(),
+						cacheEntry.getCasReferenceId());
+				// Notify engine before sending a message
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
+							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_calling_onBeforeMessageSend__FINE",
+							new Object[] { pm.getPropertyAsString(AsynchAEMessage.CasReference),
+									String.valueOf(cacheEntry.getCAS().hashCode()) });
+				}
+				// Note the callback is a misnomer. The callback is made *after* the send now
+				// Application receiving this callback can consider the CAS as delivere to a
+				// queue
+				engine.onBeforeMessageSend(status);
+
+			}
+		} catch (Exception e) {
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "dispatchMessage",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+			}
+		} finally {
+			if (jmsSession != null) {
+				try {
+					jmsSession.close();
+				} catch (Exception eee) {
+
+				}
+			}
+		}
+
+	}
 }
\ No newline at end of file
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
index f0623fb..d5ffa93 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
@@ -70,9 +70,14 @@ import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.ControllerCallbackListener;
 import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.definition.connectors.Endpoints;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
 import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
 import org.apache.uima.aae.jmx.JmxManager;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.MessageProcessor;
 import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.aae.service.AsynchronousUimaASService;
 import org.apache.uima.aae.service.UimaASService;
@@ -495,12 +500,28 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
     if ( dispatchThread == null ) {
   	  // make sure we are in the running state. The local consumer depends on it
       	running = true;
-      
+      	UimaAsEndpoint clientEndpoint;
+      	try {
+      		
+      		MessageProcessor messageProcessor = 
+      				new ClientMessageProcessor();
+      		
+          	clientEndpoint =
+          			Endpoints.newEndpoint(EndpointType.Direct, EndpointType.Direct.getName()+"Client", messageProcessor);
+          	// create client consumers and connect them to the service
+          	// producers
+          	service.connect(clientEndpoint);
+          	clientEndpoint.start();
+      	} catch( Exception e) {
+      		throw new ResourceInitializationException(e);
+      	}
+
     	// start message consumer to handle replies  
-    	startLocalConsumer(anApplicationContext);
+   // 	startLocalConsumer(anApplicationContext);
     	// start dispatcher in its own thread. It will fetch messages from a shared 'pendingMessageQueue'
       	LocalDispatcher dispatcher =
-      			new LocalDispatcher(this, service, pendingMessageQueue);
+      			new LocalDispatcher(this, service, pendingMessageQueue, clientEndpoint);
+      	
       	dispatchThread = new Thread(dispatcher,"LocalDispatcher");
       	dispatchThread.start();
     }
@@ -1690,4 +1711,12 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
 	}
 	  
   }
+  private class ClientMessageProcessor implements MessageProcessor {
+
+	@Override
+	public void process(MessageContext message) throws Exception {
+		onMessage((DirectMessage)message.getRawMessage());
+	}
+	  
+  }
 }
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
index f3e03a4..a8fcd6a 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
@@ -24,7 +24,12 @@ import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.client.UimaASProcessStatus;
 import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UimaAsOrigin;
 import org.apache.uima.aae.service.UimaASService;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl;
@@ -39,37 +44,89 @@ public class LocalDispatcher implements Runnable  {
 	private BlockingQueue<PendingMessage> messageQueue = null;
 	private BaseUIMAAsynchronousEngineCommon_impl client;
 	private UimaASService service;
-
+	private UimaAsEndpoint clientEndpoint;
+	
 	public LocalDispatcher(BaseUIMAAsynchronousEngineCommon_impl client, UimaASService service,
-			BlockingQueue<PendingMessage> pendingMessageQueue) {
+			BlockingQueue<PendingMessage> pendingMessageQueue, UimaAsEndpoint clientEndpoint) {
 		this.service = service;
 		this.client = client;
 		this.messageQueue = pendingMessageQueue;
+		this.clientEndpoint = clientEndpoint;
 	}
-
+	
 	private boolean reject(PendingMessage pm) {
 		return false;
 	}
 
 	private void dispatch(PendingMessage pm) throws Exception {
 		boolean doCallback = false;
-
+		StringBuilder serviceUri = new StringBuilder("direct").append(":").append(service.getName());
+		
 		switch (pm.getMessageType()) {
 		case AsynchAEMessage.GetMeta:
-			service.sendGetMetaRequest();
+			serviceUri.append(":").append(ConsumerType.GetMetaRequest.name()).toString();
+			MessageContext getMetaMessage =
+				clientEndpoint.newMessageBuilder().
+					newGetMetaRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct))
+//					newGetMetaRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct))
+					.withPayload(AsynchAEMessage.None)
+					.build();
+			clientEndpoint.dispatch(getMetaMessage, serviceUri.toString());
+			
 			System.out.println("LocalDispatcher.dispatch()-dispatched getMeta Request");
 			break;
 
 		case AsynchAEMessage.Process:
 			doCallback = true;
-			service.process((CAS) pm.getProperty(AsynchAEMessage.CAS), pm.getPropertyAsString(AsynchAEMessage.CasReference));
-			System.out.println("LocalDispatcher.dispatch()-dispatched Process Request");
+			serviceUri.append(":").append(ConsumerType.ProcessCASRequest.name()).toString();
+			
+			MessageContext processMessage =
+					clientEndpoint.newMessageBuilder().
+						newProcessCASRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct))
+//						newProcessCASRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct))
+						.withPayload(AsynchAEMessage.CASRefID)
+						.withCasReferenceId(pm.getPropertyAsString(AsynchAEMessage.CasReference))
+						.build();
+
+			service.add2Cache((CAS) pm.getProperty(AsynchAEMessage.CAS), processMessage, pm.getPropertyAsString(AsynchAEMessage.CasReference));
+
+			clientEndpoint.dispatch(processMessage, serviceUri.toString());
+				
+			System.out.println("LocalDispatcher.dispatch()-dispatched process Request");
+
+				
 			break;
 
 		case AsynchAEMessage.CollectionProcessComplete:
-			service.collectionProcessComplete();
+			serviceUri.append(":").append(ConsumerType.CpcRequest.name()).toString();
+
+			MessageContext cpcMessage =
+			   clientEndpoint.newMessageBuilder().
+				  newCpCRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct))
+//				  newCpCRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct))
+				  .withPayload(AsynchAEMessage.None)
+				  .build();
+		    clientEndpoint.dispatch(cpcMessage, serviceUri.toString());
+
 			System.out.println("LocalDispatcher.dispatch()-dispatched CPC Request");
 			break;
+
+		case AsynchAEMessage.ReleaseCAS:
+			serviceUri.append(":").append(ConsumerType.FreeCASRequest.name()).toString();
+
+			MessageContext freeCASMessage =
+			   clientEndpoint.newMessageBuilder().
+				  newCpCRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct))
+//				  newCpCRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct))
+				  .withPayload(AsynchAEMessage.CASRefID)
+				  .withCasReferenceId(pm.getPropertyAsString(AsynchAEMessage.CasReference))
+				  .build();
+		    clientEndpoint.dispatch(freeCASMessage, serviceUri.toString());
+
+			System.out.println("LocalDispatcher.dispatch()-dispatched Free CAS Request");
+			break;
+
+		
 		}
         if ( doCallback ) {
             UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),(CAS)pm.getProperty(AsynchAEMessage.CAS),
@@ -99,9 +156,9 @@ public class LocalDispatcher implements Runnable  {
 		while (client.isRunning()) {
 			PendingMessage pm = null;
 			try {
-				System.out.println("LocalDispatcher.run()- waiting for new message ... queue hashcode:"+messageQueue.hashCode());
+				System.out.println("Client LocalDispatcher.run()- waiting for new message ... queue hashcode:"+messageQueue.hashCode());
 				pm = messageQueue.take();
-				System.out.println("LocalDispatcher.run()-got new message to dispatch");
+				System.out.println("Client LocalDispatcher.run()-got new message to dispatch");
 			} catch (InterruptedException e) {
 				
 				return;
@@ -122,12 +179,8 @@ public class LocalDispatcher implements Runnable  {
 					}
 				}
 				try {
-					System.out.println(".................... calling LocalDispatch.beforeDispatch()");
 					client.beforeDispatch(pm);
-					
-					System.out.println(".................... calling LocalDispatch.dispatch()");
 					dispatch(pm);
-					System.out.println(".................... LocalDispatch.dispatch() returned");
 				} catch (Exception e) {
 					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
 						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "run",
diff --git a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
index 1976e2f..2e966a5 100644
--- a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
+++ b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
@@ -26,6 +26,7 @@ import java.io.FileWriter;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -1229,7 +1230,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
 	    initialize(uimaAsClient, appCtx);
 	    waitUntilInitialized();
 
-	    for (int i = 0; i < 500; i++) {
+	    for (int i = 0; i < 2; i++) {
 	      CAS cas = uimaAsClient.getCAS();
 	      cas.setDocumentText("Some Text");
 	      System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
@@ -1441,6 +1442,8 @@ public class TestUimaASNoErrors extends BaseTestSupport {
 
 	@Test
 	public void testDeployAsyncAggregateServiceOverJava() throws Exception {
+//		URL url = TestUimaASNoErrors.class.getResource("/Deploy_AsyncAggregate.xml");
+		
 		testDeployAsyncAggregateService(Transport.Java);
 	}
 
@@ -1457,7 +1460,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
 
 		addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
 
-		runTest(appCtx, uimaAsClient, "tcp://localhost:61616", "TopLevelTaeQueue", 200, PROCESS_LATCH);
+		runTest(appCtx, uimaAsClient, "tcp://localhost:61616", "TopLevelTaeQueue", 2, PROCESS_LATCH);
 	}
 
 	 @Test
@@ -1476,7 +1479,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
 	    
 	       System.setProperty("NoOpBroker", "tcp::/localhost:61616");
 	       System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
-//	       deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
+	      // deployService(uimaAsClient, relativePath + "/Deploy_NoOpAnnotator.xml");
 			deployJmsService(uimaAsClient, relativePath + "/Deploy_NoOpAnnotatorUsingPlaceholder.xml");
 
 	       Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue");