You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2017/07/13 19:31:22 UTC

svn commit: r1801867 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/

Author: cwiklik
Date: Thu Jul 13 19:31:22 2017
New Revision: 1801867

URL: http://svn.apache.org/viewvc?rev=1801867&view=rev
Log:
UIMA-5477 improved error handling in the UIMA-AS client to recover from lost broker connection

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1801867&r1=1801866&r2=1801867&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Thu Jul 13 19:31:22 2017
@@ -26,6 +26,7 @@ import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -34,9 +35,18 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.UimaMessageValidator;
 import org.apache.uima.adapter.jms.JmsConstants;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
+import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.util.Level;
+import org.apache.uima.util.impl.ProcessTrace_impl;
 
 /**
  * Initializes JMS session and creates JMS MessageProducer to be used for sending messages to a
@@ -57,7 +67,8 @@ public class ActiveMQMessageSender exten
 
   private String destinationName = null;
 
-  private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>();
+  private ConcurrentHashMap<Destination, MessageProducer> producerMap = 
+		  new ConcurrentHashMap<Destination, MessageProducer>();
 
   public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
           BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
@@ -135,6 +146,12 @@ public class ActiveMQMessageSender exten
       throw e;
     }
   }
+  /**
+   * Returns the full name of the destination queue
+   */
+  protected String getDestinationEndpoint() throws Exception {
+    return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
+  }
 
   /**
    * Creates a jms session object used to instantiate message producer
@@ -144,12 +161,6 @@ public class ActiveMQMessageSender exten
     producer = getMessageProducer(session.createQueue(destinationName));
   }
 
-  /**
-   * Returns the full name of the destination queue
-   */
-  protected String getDestinationEndpoint() throws Exception {
-    return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
-  }
 
   /**
    * Returns jsm MessageProducer
@@ -236,4 +247,161 @@ public class ActiveMQMessageSender exten
       producerMap.clear();
     }
   }
+  protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) throws Exception {
+      SharedConnection sc = 
+    		  engine.lookupConnection(engine.getBrokerURI());
+      ClientRequest cacheEntry = null;
+      boolean doCallback = false;
+      boolean addTimeToLive = true;
+      Session jmsSession = null;
+      
+      // Check the environment for existence of NoTTL tag. If present,
+      // the deployer of the service wants to disable message expiration.
+      if (System.getProperty("NoTTL") != null) {
+        addTimeToLive = false;
+      }
+      try {
+    	  long t1 = System.currentTimeMillis();
+    	  jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+          
+          // Request JMS Message from the concrete implementation
+          Message message = null;
+          // Determine if this a CAS Process Request
+//          boolean casProcessRequest = isProcessRequest(pm);
+          // Only Process request can be serialized as binary
+          if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
+            message = jmsSession.createBytesMessage();
+          } else {
+            message = jmsSession.createTextMessage();
+          }
+          //  get the producer initialized from a valid connection
+         // producer = getMessageProducer();
+          Destination d = jmsSession.createQueue(destinationName);
+          MessageProducer mProducer = jmsSession.createProducer(d);
+          mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+          //System.out.println(">>>>>>> Time to create and initialize JMS Sesssion:"+(System.currentTimeMillis()-t1));
+          super.initializeMessage(pm, message);
+    	  String destination = ((ActiveMQDestination) d).getPhysicalName();
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(
+                    Level.FINE,
+                    CLASS_NAME.getName(),
+                    "run",
+                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_sending_msg_to_endpoint__FINE",
+                    new Object[] {
+                        UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
+                                .getIntProperty(AsynchAEMessage.Command)),
+                        UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
+                                .getIntProperty(AsynchAEMessage.MessageType)), destination });
+          }
+          if (casProcessRequest) {
+            cacheEntry = (ClientRequest) engine.getCache().get(
+                    pm.get(AsynchAEMessage.CasReference));
+            if (cacheEntry != null) {
+            //    CAS cas = cacheEntry.getCAS();
+                // enable logging 
+                if (System.getProperty("UimaAsCasTracking") != null) {
+                  message.setStringProperty("UimaAsCasTracking", "enable");
+                }
+    			   
+         	   // Use Process Timeout value for the time-to-live property in the
+              // outgoing JMS message. When this time is exceeded
+              // while the message sits in a queue, the JMS Server will remove it from
+              // the queue. What happens with the expired message depends on the
+              // configuration. Most JMS Providers create a special dead-letter queue
+              // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
+              // are not auto evicted yet and accumulate taking up memory.
+              long timeoutValue = cacheEntry.getProcessTimeout();
+
+              if (timeoutValue > 0 && addTimeToLive ) {
+                // Set high time to live value
+                message.setJMSExpiration(10 * timeoutValue);
+              }
+              if (pm.getMessageType() == AsynchAEMessage.Process) {
+                cacheEntry.setCASDepartureTime(System.nanoTime());
+              }
+              cacheEntry.setCASDepartureTime(System.nanoTime());
+      
+              doCallback = true;
+              
+          } else {
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+                  UIMAFramework.getLogger(CLASS_NAME).logrb(
+                          Level.WARNING,
+                          CLASS_NAME.getName(),
+                          "run",
+                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                          "UIMAJMS_failed_cache_lookup__WARNING",
+                          new Object[] {
+                         	 pm.get(AsynchAEMessage.CasReference),
+                              UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
+                                      .getIntProperty(AsynchAEMessage.Command)),
+                              UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
+                                      .getIntProperty(AsynchAEMessage.MessageType)), destination });
+                }
+             }
+         	 
+          }
+          // start timers
+          if( casProcessRequest ) { 
+         	 CAS cas = cacheEntry.getCAS();
+
+
+            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+            engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(), engine.timerPerCAS); // true=timer per cas
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+         	   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                     "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                     "UIMAJMS_cas_added_to_pending_FINE", new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), engine.serviceDelegate.toString()});
+            }
+
+          
+          } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta &&
+                  engine.serviceDelegate.getGetMetaTimeout() > 0 ) {
+            // timer for PING has been started in sendCAS()
+            if ( !engine.serviceDelegate.isAwaitingPingReply()) {
+              engine.serviceDelegate.startGetMetaRequestTimer();
+            } 
+          } else {
+         	 doCallback = false;  // dont call onBeforeMessageSend() callback on CPC
+          }
+          //  Dispatch asynchronous request to Uima AS service
+          mProducer.send(message);
+          
+          if ( doCallback ) {
+            UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),cacheEntry.getCAS(),
+                    cacheEntry.getCasReferenceId());
+            // Notify engine before sending a message
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(
+                        Level.FINE,
+                        CLASS_NAME.getName(),
+                        "run",
+                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAJMS_calling_onBeforeMessageSend__FINE",
+                        new Object[] {
+                          pm.get(AsynchAEMessage.CasReference),
+                          String.valueOf(cacheEntry.getCAS().hashCode())
+                        });
+              }  
+            // Note the callback is a misnomer. The callback is made *after* the send now
+            // Application receiving this callback can consider the CAS as delivere to a queue
+            engine.onBeforeMessageSend(status);
+          
+          
+          }
+      } finally {
+    	  if ( jmsSession != null ) {
+    		  try {
+    			  jmsSession.close(); 
+    		  } catch( Exception eee) {
+    			  
+    		  }
+    	  }
+      }
+
+
+  }
 }
\ No newline at end of file

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=1801867&r1=1801866&r2=1801867&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Thu Jul 13 19:31:22 2017
@@ -86,7 +86,9 @@ public abstract class BaseMessageSender
 
   // Releases resources
   protected abstract void cleanup() throws Exception;
-
+  
+  protected abstract void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) throws Exception;
+	  
   // Returns the name of the destination
   protected abstract String getDestinationEndpoint() throws Exception;
 
@@ -306,7 +308,7 @@ public abstract class BaseMessageSender
       }
      
       //  get the producer initialized from a valid connection
-      producer = getMessageProducer();
+//      producer = getMessageProducer();
       //  Check if the request should be rejected. It would be the case if the connection was invalid and
       //  subsequently recovered. If it was invalid, we went through error handling and the request is stale.
       if ( !rejectRequest && engine.running) {
@@ -318,6 +320,24 @@ public abstract class BaseMessageSender
                       "UIMAJMS_client_dispatching_getmeta_ping__INFO", new Object[] { });
             }
            }
+          
+          // NEW CODE 07/12/2017
+          while( engine.running ) {
+        	  try {
+        	      //  blocks until the connection is re-established with a broker
+        	      engine.recoverSharedConnectionIfClosed();
+        	      SharedConnection sc = 
+        	    		  engine.lookupConnection(engine.getBrokerURI());
+        	      dispatchMessage(pm,engine,isProcessRequest(pm));
+        	
+        		  break;
+        	  } catch( Exception exx) {
+        		  
+        	  }
+          }
+          
+          
+        /*  
            try {
              // Request JMS Message from the concrete implementation
              Message message = null;
@@ -329,7 +349,9 @@ public abstract class BaseMessageSender
              } else {
                message = createTextMessage();
              }
-
+             //  get the producer initialized from a valid connection
+             producer = getMessageProducer();
+            
              initializeMessage(pm, message);
              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
                UIMAFramework.getLogger(CLASS_NAME).logrb(
@@ -448,7 +470,7 @@ public abstract class BaseMessageSender
              }
              reject(pm,e);
            }
-
+*/
       }
     }
     try {
@@ -462,7 +484,7 @@ public abstract class BaseMessageSender
     }
   }
 
-  private void initializeMessage(PendingMessage aPm, Message anOutgoingMessage) throws Exception {
+  protected void initializeMessage(PendingMessage aPm, Message anOutgoingMessage) throws Exception {
     // Populate message properties based on outgoing message type
     switch (aPm.getMessageType()) {
       case AsynchAEMessage.GetMeta:

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1801867&r1=1801866&r2=1801867&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Thu Jul 13 19:31:22 2017
@@ -3264,7 +3264,7 @@ public abstract class BaseUIMAAsynchrono
                           JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_retry__INFO",
                           new Object[] { brokerURL });
                 }
-        		if ( e instanceof JMSException && e.getMessage().endsWith("Connection refused") ) {
+        		if ( e instanceof JMSException && e.getMessage().indexOf("Connection refused") > 0) {
         			log = false;
             		System.out.println("Uima AS Client:"+e.getMessage()+" Retrying every 5 seconds until successfull");