You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/11/10 22:06:50 UTC

svn commit: r834666 - /incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java

Author: cwiklik
Date: Tue Nov 10 21:06:50 2009
New Revision: 834666

URL: http://svn.apache.org/viewvc?rev=834666&view=rev
Log:
UIMA-1643 Modified to support recovery from lost connection to a broker. Before sending a request the code checks the state of a connection and enters a loop retrying a connection every 5ms.

Modified:
    incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java

Modified: incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=834666&r1=834665&r2=834666&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Tue Nov 10 21:06:50 2009
@@ -24,6 +24,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.InvalidDestinationException;
@@ -36,13 +37,19 @@
 import org.apache.uima.aae.client.UimaASProcessStatus;
 import org.apache.uima.aae.client.UimaASProcessStatusImpl;
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.UimaMessageValidator;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
 import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
 import org.apache.uima.cas.CAS;
+import org.apache.uima.jms.error.handler.BrokerConnectionException;
 import org.apache.uima.util.Level;
+import org.apache.uima.util.ProcessTrace;
 import org.apache.uima.util.impl.ProcessTrace_impl;
 
 /**
@@ -89,7 +96,10 @@
   // Returns the name of the destination
   protected abstract String getDestinationEndpoint() throws Exception;
 
+  public abstract void setConnection(Connection connection);
+  
   private MessageProducer producer = null;
+  
 
   public BaseMessageSender(BaseUIMAAsynchronousEngineCommon_impl anEngine) {
     messageQueue = anEngine.pendingMessageQueue;
@@ -129,7 +139,77 @@
   public boolean failed() {
     return workerThreadFailed;
   }
-
+  /**
+   * This method determines if a given request message should be rejected or not. Rejected in a sense
+   * that it will not be sent to the destination due to the fact the broker connection is down.
+   */
+  private boolean reject(PendingMessage pm ) {
+    return reject(pm, new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+engine.sharedConnection.getBroker()+" Has Been Lost"));
+  }
+  
+  /**
+   * This method determines if a given request message should be rejected or not. Rejected in a sense
+   * that it will not be sent to the destination due to the fact the broker connection is down.
+   */
+  private boolean reject( PendingMessage pm, Exception e ) {
+    boolean rejectRequest = false;
+    //  If the connection to a broker was lost, notify the client
+    //  and reject the request unless this is getMeta Ping request.
+    if ( !engine.sharedConnection.isConnectionValid() ) {
+      String messageKind = "";
+      if (pm.getMessageType() == AsynchAEMessage.GetMeta ) {
+        messageKind = "GetMeta";
+      } else if (pm.getMessageType() == AsynchAEMessage.Process ) {
+        messageKind = "Process";
+      } else if (pm.getMessageType() == AsynchAEMessage.CollectionProcessComplete ) {
+        messageKind = "CollectionProcessComplete";
+      }
+      rejectRequest = true;
+      try {
+        //  Is this is a Process request
+        if (pm.getMessageType() == AsynchAEMessage.Process) {
+          //  fetch the cache entry for this CAS
+          ClientRequest cacheEntry = (ClientRequest) engine.getCache().get(
+                  pm.get(AsynchAEMessage.CasReference));
+          //  We are rejecting any Process requests until connection to broker
+          //  is recovered
+          cacheEntry.setProcessException();
+          //  if the request was via synchronous API dont notify listeners
+          //  instead the code will throw exception to the client
+          boolean notifyListener = (cacheEntry.isSynchronousInvocation() == false);
+          //  handle rejected request
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+                    "reject", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_client_rejected_process_request_broker_down__INFO", new Object[] { messageKind });
+          }
+          //  Sets the state of a remote delegate to handle subsequent requests
+          engine.serviceDelegate.setState(Delegate.TIMEOUT_STATE);
+          //  handle exception but dont rethrow the exception
+          engine.handleException(e, cacheEntry.getCasReferenceId(), null, cacheEntry, notifyListener, false);
+        } else {
+          //  Dont handle GetMeta Ping. Let it flow through. The Ping will be done once the connection is recovered
+          if ( !engine.serviceDelegate.isAwaitingPingReply()) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+                      "reject", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                      "UIMAJMS_client_rejected_nonprocess_request_broker_down__INFO", new Object[] { messageKind });
+            }
+            engine.handleNonProcessException(e);
+          } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta ){
+            rejectRequest = false;   // Don't reject GetMeta Ping 
+          }
+        }
+      } catch( Exception ex ) {
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                  "reject", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAEE_exception__WARNING", new Object[] { ex });
+        }
+      }
+    }
+    return rejectRequest;
+  }
   /**
    * Initializes jms message producer and starts the main thread. This thread waits for messages
    * enqueued by application threads. The application thread adds a jms message to the
@@ -171,11 +251,11 @@
 
     engine.onProducerInitialized();
 
-    producer = getMessageProducer();
 
     // Wait for messages from application threads. The uima ee client engine
     // will call doStop() which sets the global flag 'done' to true.
     PendingMessage pm = null;
+    ClientRequest cacheEntry = null;
     while (!done) {
       // Remove the oldest message from the shared 'queue'
       // // Wait for a new message
@@ -186,66 +266,104 @@
       if (done) {
         break; // done in this loop
       }
-
-      try {
-        // Request JMS Message from the concrete implementation
-        Message message = null;
-        // Determine if this a CAS Process Request
-        boolean casProcessRequest = isProcessRequest(pm);
-        // Only Process request can be serialized as binary
-        if (casProcessRequest && engine.getSerializationStrategy().equals("binary")) {
-          message = createBytesMessage();
-        } else {
-          message = createTextMessage();
-        }
-
-        initializeMessage(pm, message);
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.FINE,
-                  CLASS_NAME.getName(),
-                  "run",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_sending_msg_to_endpoint__FINE",
-                  new Object[] {
-                      UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
-                              .getIntProperty(AsynchAEMessage.Command)),
-                      UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
-                              .getIntProperty(AsynchAEMessage.MessageType)), destination });
-        }
-        if (casProcessRequest) {
-          ClientRequest cacheEntry = (ClientRequest) engine.getCache().get(
-                  pm.get(AsynchAEMessage.CasReference));
-          if (cacheEntry != null) {
-            // Use Process Timeout value for the time-to-live property in the
-            // outgoing JMS message. When this time is exceeded
-            // while the message sits in a queue, the JMS Server will remove it from
-            // the queue. What happens with the expired message depends on the
-            // configuration. Most JMS Providers create a special dead-letter queue
-            // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
-            // are not auto evicted yet and accumulate taking up memory.
-            long timeoutValue = cacheEntry.getProcessTimeout();
-
-            if (timeoutValue > 0 && addTimeToLive ) {
-              // Set high time to live value
-              //producer.setTimeToLive(10 * timeoutValue);
-              message.setJMSExpiration(10 * timeoutValue);
-            }
-            if (pm.getMessageType() == AsynchAEMessage.Process) {
-              cacheEntry.setCASDepartureTime(System.nanoTime());
+      //  Check if the request should be rejected. If the connection to the broker is invalid and the request
+      //  is not GetMeta Ping, reject the request after the connection is made. The reject() method created
+      //  an exception and notified client of the fact that the request could not continue. The ping is a 
+      //  special case that we dont reject even though the broker connection has been lost. It is allow to
+      //  fall through and will be sent as soon as the connection is recovered.
+      boolean rejectRequest = reject(pm);
+      //  blocks until the connection is re-established with a broker
+      engine.recoverSharedConnectionIfClosed();
+      //  get the producer initialized from a valid connection
+      producer = getMessageProducer();
+      //  Check if the request should be rejected. It would be the case if the connection was invalid and
+      //  subsequently recovered. If it was invalid, we went through error handling and the request is stale.
+      if ( !rejectRequest && engine.running) {
+          if ( engine.serviceDelegate.isAwaitingPingReply() &&
+                pm.getMessageType() == AsynchAEMessage.GetMeta ){
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+                      "run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                      "UIMAJMS_client_dispatching_getmeta_ping__INFO", new Object[] { });
             }
-            UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),
-                    cacheEntry.getCasReferenceId());
-            // Notify engine before sending a message
-            engine.onBeforeMessageSend(status);
-          }
-        }
+           }
+           try {
+             // Request JMS Message from the concrete implementation
+             Message message = null;
+             // Determine if this a CAS Process Request
+             boolean casProcessRequest = isProcessRequest(pm);
+             // Only Process request can be serialized as binary
+             if (casProcessRequest && engine.getSerializationStrategy().equals("binary")) {
+               message = createBytesMessage();
+             } else {
+               message = createTextMessage();
+             }
+
+             initializeMessage(pm, message);
+             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+               UIMAFramework.getLogger(CLASS_NAME).logrb(
+                       Level.FINE,
+                       CLASS_NAME.getName(),
+                       "run",
+                       JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                       "UIMAJMS_sending_msg_to_endpoint__FINE",
+                       new Object[] {
+                           UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
+                                   .getIntProperty(AsynchAEMessage.Command)),
+                           UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
+                                   .getIntProperty(AsynchAEMessage.MessageType)), destination });
+             }
+             if (casProcessRequest) {
+               cacheEntry = (ClientRequest) engine.getCache().get(
+                       pm.get(AsynchAEMessage.CasReference));
+               if (cacheEntry != null) {
+                 // Use Process Timeout value for the time-to-live property in the
+                 // outgoing JMS message. When this time is exceeded
+                 // while the message sits in a queue, the JMS Server will remove it from
+                 // the queue. What happens with the expired message depends on the
+                 // configuration. Most JMS Providers create a special dead-letter queue
+                 // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
+                 // are not auto evicted yet and accumulate taking up memory.
+                 long timeoutValue = cacheEntry.getProcessTimeout();
+
+                 if (timeoutValue > 0 && addTimeToLive ) {
+                   // Set high time to live value
+                   message.setJMSExpiration(10 * timeoutValue);
+                 }
+                 if (pm.getMessageType() == AsynchAEMessage.Process) {
+                   cacheEntry.setCASDepartureTime(System.nanoTime());
+                 }
+                 cacheEntry.setCASDepartureTime(System.nanoTime());
+                 UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),
+                         cacheEntry.getCasReferenceId());
+                 // Notify engine before sending a message
+                 engine.onBeforeMessageSend(status);
+               }
+             }
+             // start timers
+             if( casProcessRequest ) { 
+               // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+               engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId());
+             } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta &&
+                     engine.serviceDelegate.getGetMetaTimeout() > 0 ) {
+               // timer for PING has been started in sendCAS()
+               if ( !engine.serviceDelegate.isAwaitingPingReply()) {
+                 engine.serviceDelegate.startGetMetaRequestTimer();
+               } 
+             } 
+             //  Dispatch asynchronous request to Uima AS service
+             producer.send(message);
+             
+           } catch (Exception e) {
+             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                       "run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                       "UIMAEE_exception__WARNING", new Object[] { e });
+             }
+             reject(pm,e);
+           }
 
-        producer.send(message);
-      } catch (Exception e) {
-        handleException(e, destination);
       }
-
     }
     try {
       cleanup();
@@ -293,6 +411,7 @@
               "handleException", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
               "UIMAEE_exception__WARNING", new Object[] { e });
     }
+    engine.recoverSharedConnectionIfClosed();
     // Notify the engine that there was an exception.
     engine.onException(e, aDestination);