You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/11/16 16:53:32 UTC

svn commit: r1846728 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java

Author: cwiklik
Date: Fri Nov 16 16:53:32 2018
New Revision: 1846728

URL: http://svn.apache.org/viewvc?rev=1846728&view=rev
Log:
UIMA-5906 fixed cause of NPE and added logging if there is an exception

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1846728&r1=1846727&r2=1846728&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Fri Nov 16 16:53:32 2018
@@ -50,142 +50,138 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.impl.ProcessTrace_impl;
 
 /**
- * Initializes JMS session and creates JMS MessageProducer to be used for sending messages to a
- * given destination. It extends BaseMessageSender which starts the worker thread and is tasked with
- * sending messages. The application threads share a common 'queue' with the worker thread. The
- * application threads add messages to the pendingMessageList 'queue' and the worker thread consumes
- * them.
+ * Initializes JMS session and creates JMS MessageProducer to be used for
+ * sending messages to a given destination. It extends BaseMessageSender which
+ * starts the worker thread and is tasked with sending messages. The application
+ * threads share a common 'queue' with the worker thread. The application
+ * threads add messages to the pendingMessageList 'queue' and the worker thread
+ * consumes them.
  * 
  */
 public class ActiveMQMessageSender extends BaseMessageSender {
-  private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
+	private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
 
-  private volatile Connection connection = null;
+	private volatile Connection connection = null;
 
-  private Session session = null;
+	private Session session = null;
 
-  private MessageProducer producer = null;
+	private MessageProducer producer = null;
 
-  private String destinationName = null;
-
-  private ConcurrentHashMap<Destination, MessageProducer> producerMap = 
-		  new ConcurrentHashMap<Destination, MessageProducer>();
-
-  public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
-          BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
-    super(engine);
-    connection = aConnection;
-    destinationName = aDestinationName;
-  }
-
-  public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception {
-    if (producerMap.containsKey(destination)) {
-      return (MessageProducer) producerMap.get(destination);
-    }
-    createSession();
-    MessageProducer mProducer = session.createProducer(destination);
-    mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-    producerMap.put(destination, mProducer);
-    return mProducer;
-  }
-  /**
-   * This is called when a new Connection is created after broker is restarted
-   */
-  public void setConnection(Connection aConnection) {
-    connection = aConnection;
-    cleanup();
-    try {
-      initializeProducer();
-    } catch( Exception e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
-                "setConnection", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
-      }
-    }
-    
-  }
-  private String getBrokerURL() {
-    try {
-      return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
-    } catch (Exception ex) { /* handle silently. */
-    }
-    return "";
-  }
-
-  private void createSession() throws Exception {
-    String broker = getBrokerURL();
-    try {
-      if (session == null || engine.producerInitialized == false) {
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      }
-    } catch (JMSException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_client_failed_creating_session_INFO",
-                new Object[] { destinationName, broker });
-      }
-      if (connection == null) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                  "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_client_connection_not_ready_INFO", new Object[] { broker });
-        }
-      } else if (((ActiveMQConnection) connection).isClosed()
-              || ((ActiveMQConnection) connection).isClosing()) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME)
-                  .logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
-                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_client_connection_closed_INFO",
-                          new Object[] { destinationName, broker });
-        }
-      }
-      throw e;
-    } catch (Exception e) {
-      throw e;
-    }
-  }
-  /**
-   * Returns the full name of the destination queue
-   */
-  protected String getDestinationEndpoint() throws Exception {
-    return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
-  }
-
-  /**
-   * Creates a jms session object used to instantiate message producer
-   */
-  protected void initializeProducer() throws Exception {
-    createSession();
-    producer = getMessageProducer(session.createQueue(destinationName));
-  }
-
-
-  /**
-   * Returns jsm MessageProducer
-   */
-  public MessageProducer getMessageProducer() {
-    if ( engine.running && engine.producerInitialized == false  ) {
-      try {
-        SharedConnection con = engine.lookupConnection(getBrokerURL());
-        if ( con != null ) {
-          setConnection(con.getConnection());
-          initializeProducer();
-          engine.producerInitialized = true;
-        }
-      } catch( Exception e) {
-        e.printStackTrace();
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
-                "getMessageProducer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
-        }
-      }
-    } 
-    return producer;
-  }
+	private String destinationName = null;
+
+	private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>();
+
+	public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
+			BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
+		super(engine);
+		connection = aConnection;
+		destinationName = aDestinationName;
+	}
+
+	public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception {
+		if (producerMap.containsKey(destination)) {
+			return (MessageProducer) producerMap.get(destination);
+		}
+		createSession();
+		MessageProducer mProducer = session.createProducer(destination);
+		mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+		producerMap.put(destination, mProducer);
+		return mProducer;
+	}
+
+	/**
+	 * This is called when a new Connection is created after broker is restarted
+	 */
+	public void setConnection(Connection aConnection) {
+		connection = aConnection;
+		cleanup();
+		try {
+			initializeProducer();
+		} catch (Exception e) {
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "setConnection",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+			}
+		}
+
+	}
+
+	private String getBrokerURL() {
+		try {
+			return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
+		} catch (Exception ex) { /* handle silently. */
+		}
+		return "";
+	}
+
+	private void createSession() throws Exception {
+		String broker = getBrokerURL();
+		try {
+			if (session == null || engine.producerInitialized == false) {
+				session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			}
+		} catch (JMSException e) {
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+						JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_failed_creating_session_INFO",
+						new Object[] { destinationName, broker });
+			}
+			if (connection == null) {
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_not_ready_INFO",
+							new Object[] { broker });
+				}
+			} else if (((ActiveMQConnection) connection).isClosed() || ((ActiveMQConnection) connection).isClosing()) {
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO",
+							new Object[] { destinationName, broker });
+				}
+			}
+			throw e;
+		} catch (Exception e) {
+			throw e;
+		}
+	}
+
+	/**
+	 * Returns the full name of the destination queue
+	 */
+	protected String getDestinationEndpoint() throws Exception {
+		return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
+	}
+
+	/**
+	 * Creates a jms session object used to instantiate message producer
+	 */
+	protected void initializeProducer() throws Exception {
+		createSession();
+		producer = getMessageProducer(session.createQueue(destinationName));
+	}
+
+	/**
+	 * Returns jsm MessageProducer
+	 */
+	public MessageProducer getMessageProducer() {
+		if (engine.running && engine.producerInitialized == false) {
+			try {
+				SharedConnection con = engine.lookupConnection(getBrokerURL());
+				if (con != null) {
+					setConnection(con.getConnection());
+					initializeProducer();
+					engine.producerInitialized = true;
+				}
+			} catch (Exception e) {
+				e.printStackTrace();
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "getMessageProducer",
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+				}
+			}
+		}
+		return producer;
+	}
 
 	public TextMessage createTextMessage() throws Exception {
 		synchronized (ActiveMQMessageSender.class) {
@@ -208,14 +204,14 @@ public class ActiveMQMessageSender exten
 
 	}
 
-  public BytesMessage createBytesMessage() throws Exception {
+	public BytesMessage createBytesMessage() throws Exception {
 		synchronized (ActiveMQMessageSender.class) {
-		    if (session == null) {
-		        //	Force initialization of Producer
-		          initializeProducer();
-		    }
-		    BytesMessage msg = null;
-		    try {
+			if (session == null) {
+				// Force initialization of Producer
+				initializeProducer();
+			}
+			BytesMessage msg = null;
+			try {
 				msg = session.createBytesMessage();
 			} catch (IllegalStateException e) {
 				// stale Session
@@ -226,203 +222,202 @@ public class ActiveMQMessageSender exten
 			return msg;
 		}
 
- //   return session.createBytesMessage();
-  }
+		// return session.createBytesMessage();
+	}
 
-  /**
-   * Cleanup any jms resources used by the worker thread
-   */
-  protected void cleanup() { 
-    try {
-      if (session != null) {
-        session.close();
-        session = null;
-      }
-      if (producer != null) {
-        producer.close();
-        producer = null;
-      }
-    } catch (Exception e) {
-      // Ignore we are shutting down
-    } finally {
-      producerMap.clear();
-    }
-  }
-  protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) throws Exception {
-      SharedConnection sc = 
-    		  engine.lookupConnection(engine.getBrokerURI());
-      ClientRequest cacheEntry = null;
-      boolean doCallback = false;
-      boolean addTimeToLive = true;
-      Session jmsSession = null;
-      
-      // Check the environment for existence of NoTTL tag. If present,
-      // the deployer of the service wants to disable message expiration.
-      if (System.getProperty("NoTTL") != null) {
-        addTimeToLive = false;
-      }
-      try {
-//    	  long t1 = System.currentTimeMillis();
-    	  jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
-          
-          // Request JMS Message from the concrete implementation
-          Message message = null;
-          // Determine if this a CAS Process Request
-//          boolean casProcessRequest = isProcessRequest(pm);
-          // Only Process request can be serialized as binary
-          if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
-            message = jmsSession.createBytesMessage();
-          } else {
-            message = jmsSession.createTextMessage();
-          }
-          //  get the producer initialized from a valid connection
-         // producer = getMessageProducer();
-          
-          Destination d = null;
-          String selector = null;
-          // UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue
-          // instead of a temp queue. Regular queues can be recovered in case of
-          // a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
-          // Code in JmsOutputChannel will add the selector if the service is a CM.
-          if (pm.get(AsynchAEMessage.TargetingSelector) != null) {
-        	  selector = (String)pm.get(AsynchAEMessage.TargetingSelector);
-          }
-          if ( selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS || pm.getMessageType() == AsynchAEMessage.Stop) ) {
-        	  d = (Destination)pm.get(AsynchAEMessage.Destination);
-              
-          } else {
-              d = jmsSession.createQueue(destinationName);
-          }
-          MessageProducer mProducer = jmsSession.createProducer(d);
-          mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-          //System.out.println(">>>>>>> Time to create and initialize JMS Sesssion:"+(System.currentTimeMillis()-t1));
-          super.initializeMessage(pm, message);
-    	  String destination = ((ActiveMQDestination) d).getPhysicalName();
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINE,
-                    CLASS_NAME.getName(),
-                    "run",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_sending_msg_to_endpoint__FINE",
-                    new Object[] {
-                        UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
-                                .getIntProperty(AsynchAEMessage.Command)),
-                        UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
-                                .getIntProperty(AsynchAEMessage.MessageType)), destination });
-          }
-          if (casProcessRequest) {
-            cacheEntry = (ClientRequest) engine.getCache().get(
-                    pm.get(AsynchAEMessage.CasReference));
-            if (cacheEntry != null) {
-            //    CAS cas = cacheEntry.getCAS();
-                // enable logging 
-                if (System.getProperty("UimaAsCasTracking") != null) {
-                  message.setStringProperty("UimaAsCasTracking", "enable");
-                }
-                // Target specific service instance if targeting for the CAS is provided
-                // by the client application
-    			if ( cacheEntry.getTargetServiceId() != null ) {
- //   				System.out.println("------------Client Sending CAS to Service Instance With Id:"+cacheEntry.getTargetServiceId());;
-    				message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty, cacheEntry.getTargetServiceId());
-    			}
-         	   // Use Process Timeout value for the time-to-live property in the
-              // outgoing JMS message. When this time is exceeded
-              // while the message sits in a queue, the JMS Server will remove it from
-              // the queue. What happens with the expired message depends on the
-              // configuration. Most JMS Providers create a special dead-letter queue
-              // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
-              // are not auto evicted yet and accumulate taking up memory.
-              long timeoutValue = cacheEntry.getProcessTimeout();
-
-              if (timeoutValue > 0 && addTimeToLive ) {
-                // Set high time to live value
-                message.setJMSExpiration(10 * timeoutValue);
-              }
-              if (pm.getMessageType() == AsynchAEMessage.Process) {
-                cacheEntry.setCASDepartureTime(System.nanoTime());
-              }
-              cacheEntry.setCASDepartureTime(System.nanoTime());
-      
-              doCallback = true;
-              
-          } else {
-              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-
-                  UIMAFramework.getLogger(CLASS_NAME).logrb(
-                          Level.WARNING,
-                          CLASS_NAME.getName(),
-                          "run",
-                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_failed_cache_lookup__WARNING",
-                          new Object[] {
-                         	 pm.get(AsynchAEMessage.CasReference),
-                              UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
-                                      .getIntProperty(AsynchAEMessage.Command)),
-                              UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
-                                      .getIntProperty(AsynchAEMessage.MessageType)), destination });
-                }
-             }
-         	 
-          }
-          // start timers
-          if( casProcessRequest ) { 
-         	 CAS cas = cacheEntry.getCAS();
-
-
-            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
-            engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(), engine.timerPerCAS); // true=timer per cas
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-         	   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                     "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                     "UIMAJMS_cas_added_to_pending_FINE", new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), engine.serviceDelegate.toString()});
-            }
+	/**
+	 * Cleanup any jms resources used by the worker thread
+	 */
+	protected void cleanup() {
+		try {
+			if (session != null) {
+				session.close();
+				session = null;
+			}
+			if (producer != null) {
+				producer.close();
+				producer = null;
+			}
+		} catch (Exception e) {
+			// Ignore we are shutting down
+		} finally {
+			producerMap.clear();
+		}
+	}
 
-          
-          } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta &&
-                  engine.serviceDelegate.getGetMetaTimeout() > 0 ) {
-            // timer for PING has been started in sendCAS()
-            if ( !engine.serviceDelegate.isAwaitingPingReply()) {
-              engine.serviceDelegate.startGetMetaRequestTimer();
-            } 
-          } else {
-         	 doCallback = false;  // dont call onBeforeMessageSend() callback on CPC
-          }
-          //  Dispatch asynchronous request to Uima AS service
-          mProducer.send(message);
-          
-          if ( doCallback ) {
-            UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),cacheEntry.getCAS(),
-                    cacheEntry.getCasReferenceId());
-            // Notify engine before sending a message
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-                UIMAFramework.getLogger(CLASS_NAME).logrb(
-                        Level.FINE,
-                        CLASS_NAME.getName(),
-                        "run",
-                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                        "UIMAJMS_calling_onBeforeMessageSend__FINE",
-                        new Object[] {
-                          pm.get(AsynchAEMessage.CasReference),
-                          String.valueOf(cacheEntry.getCAS().hashCode())
-                        });
-              }  
-            // Note the callback is a misnomer. The callback is made *after* the send now
-            // Application receiving this callback can consider the CAS as delivere to a queue
-            engine.onBeforeMessageSend(status);
-          
-          
-          }
-      } finally {
-    	  if ( jmsSession != null ) {
-    		  try {
-    			  jmsSession.close(); 
-    		  } catch( Exception eee) {
-    			  
-    		  }
-    	  }
-      }
+	protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine,
+			boolean casProcessRequest) throws Exception {
+		SharedConnection sc = engine.lookupConnection(engine.getBrokerURI());
+		ClientRequest cacheEntry = null;
+		boolean doCallback = false;
+		boolean addTimeToLive = true;
+		Session jmsSession = null;
+
+		// Check the environment for existence of NoTTL tag. If present,
+		// the deployer of the service wants to disable message expiration.
+		if (System.getProperty("NoTTL") != null) {
+			addTimeToLive = false;
+		}
+		try {
+			// long t1 = System.currentTimeMillis();
+			jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+			// Request JMS Message from the concrete implementation
+			Message message = null;
+			// Determine if this a CAS Process Request
+			// boolean casProcessRequest = isProcessRequest(pm);
+			// Only Process request can be serialized as binary
+			if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
+				message = jmsSession.createBytesMessage();
+			} else {
+				message = jmsSession.createTextMessage();
+			}
+			// get the producer initialized from a valid connection
+			// producer = getMessageProducer();
 
+			Destination d = null;
+			String selector = null;
+			// UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue
+			// instead of a temp queue. Regular queues can be recovered in case of
+			// a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
+			// Code in JmsOutputChannel will add the selector if the service is a CM.
+			if (pm.get(AsynchAEMessage.TargetingSelector) != null) {
+				selector = (String) pm.get(AsynchAEMessage.TargetingSelector);
+			}
+			if (selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS
+					|| pm.getMessageType() == AsynchAEMessage.Stop)) {
+				d = (Destination) pm.get(AsynchAEMessage.Destination);
 
-  }
+			} else {
+				d = jmsSession.createQueue(destinationName);
+			}
+			MessageProducer mProducer = jmsSession.createProducer(d);
+			mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+			// System.out.println(">>>>>>> Time to create and initialize JMS
+			// Sesssion:"+(System.currentTimeMillis()-t1));
+			super.initializeMessage(pm, message);
+			String destination = ((ActiveMQDestination) d).getPhysicalName();
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
+						JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+						new Object[] {
+								UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+										message.getIntProperty(AsynchAEMessage.Command)),
+								UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+										message.getIntProperty(AsynchAEMessage.MessageType)),
+								destination });
+			}
+			if (casProcessRequest) {
+				cacheEntry = (ClientRequest) engine.getCache().get(pm.get(AsynchAEMessage.CasReference));
+				if (cacheEntry != null) {
+					// CAS cas = cacheEntry.getCAS();
+					// enable logging
+					if (System.getProperty("UimaAsCasTracking") != null) {
+						message.setStringProperty("UimaAsCasTracking", "enable");
+					}
+					// Target specific service instance if targeting for the CAS is provided
+					// by the client application
+					if (cacheEntry.getTargetServiceId() != null) {
+						// System.out.println("------------Client Sending CAS to Service Instance With
+						// Id:"+cacheEntry.getTargetServiceId());;
+						message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
+								cacheEntry.getTargetServiceId());
+					}
+					// Use Process Timeout value for the time-to-live property in the
+					// outgoing JMS message. When this time is exceeded
+					// while the message sits in a queue, the JMS Server will remove it from
+					// the queue. What happens with the expired message depends on the
+					// configuration. Most JMS Providers create a special dead-letter queue
+					// where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the
+					// DLQ
+					// are not auto evicted yet and accumulate taking up memory.
+					long timeoutValue = cacheEntry.getProcessTimeout();
+
+					if (timeoutValue > 0 && addTimeToLive) {
+						// Set high time to live value
+						message.setJMSExpiration(10 * timeoutValue);
+					}
+					if (pm.getMessageType() == AsynchAEMessage.Process) {
+						cacheEntry.setCASDepartureTime(System.nanoTime());
+					}
+					cacheEntry.setCASDepartureTime(System.nanoTime());
+
+					doCallback = true;
+
+				} else {
+					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "run",
+								JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING",
+								new Object[] { pm.get(AsynchAEMessage.CasReference),
+										UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+												message.getIntProperty(AsynchAEMessage.Command)),
+										UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+												message.getIntProperty(AsynchAEMessage.MessageType)),
+										destination });
+					}
+					return;  // no cacheEntry so just return
+				}
+
+			}
+			// start timers
+			if (casProcessRequest) {
+				CAS cas = cacheEntry.getCAS();
+
+				// Add the cas to a list of CASes pending reply. Also start the timer if
+				// necessary
+				engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(),
+						engine.timerPerCAS); // true=timer per cas
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS",
+							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE",
+							new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()),
+									engine.serviceDelegate.toString() });
+				}
+
+			} else if (pm.getMessageType() == AsynchAEMessage.GetMeta
+					&& engine.serviceDelegate.getGetMetaTimeout() > 0) {
+				// timer for PING has been started in sendCAS()
+				if (!engine.serviceDelegate.isAwaitingPingReply()) {
+					engine.serviceDelegate.startGetMetaRequestTimer();
+				}
+			} else {
+				doCallback = false; // dont call onBeforeMessageSend() callback on CPC
+			}
+			// Dispatch asynchronous request to Uima AS service
+			mProducer.send(message);
+
+			if (doCallback) {
+				UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(),
+						cacheEntry.getCasReferenceId());
+				// Notify engine before sending a message
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
+							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_calling_onBeforeMessageSend__FINE",
+							new Object[] { pm.get(AsynchAEMessage.CasReference),
+									String.valueOf(cacheEntry.getCAS().hashCode()) });
+				}
+				// Note the callback is a misnomer. The callback is made *after* the send now
+				// Application receiving this callback can consider the CAS as delivere to a
+				// queue
+				engine.onBeforeMessageSend(status);
+
+			}
+		} catch( Exception e) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                        "run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAEE_exception__WARNING", e);
+            }
+		} finally {
+			if (jmsSession != null) {
+				try {
+					jmsSession.close();
+				} catch (Exception eee) {
+
+				}
+			}
+		}
+
+	}
 }
\ No newline at end of file