You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/09/09 15:29:35 UTC

svn commit: r995417 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Author: cwiklik
Date: Thu Sep  9 13:29:35 2010
New Revision: 995417

URL: http://svn.apache.org/viewvc?rev=995417&view=rev
Log:
UIMA-1867 Modified dispatch() to detect send failures and mark CAS as undelivered

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=995417&r1=995416&r2=995417&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu Sep  9 13:29:35 2010
@@ -45,6 +45,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
+import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 import javax.management.ServiceNotFoundException;
 
@@ -522,14 +523,17 @@ public class JmsOutputChannel implements
                   new Object[] { destination });
         }
         endpointConnection.open();
-        brokerConnectionEntry.getConnectionTimer()
-                .setConnectionCreationTimestamp(System.nanoTime());
-        if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController ) {
-          Endpoint masterEndpoint = 
-            ((AggregateAnalysisEngineController) getAnalysisEngineController()).lookUpEndpoint(
-                  anEndpoint.getDelegateKey(), false);
-          masterEndpoint.setStatus(Endpoint.OK);
-        }
+        if ( endpointConnection.isOpen()) {
+            brokerConnectionEntry.getConnectionTimer()
+            .setConnectionCreationTimestamp(System.nanoTime());
+            if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController &&
+            		anEndpoint.getDelegateKey() != null ) {
+            	Endpoint masterEndpoint = 
+            		((AggregateAnalysisEngineController) getAnalysisEngineController()).lookUpEndpoint(
+            				anEndpoint.getDelegateKey(), false);
+            	masterEndpoint.setStatus(Endpoint.OK);
+            }
+        } 
       }
     }
     return endpointConnection;
@@ -1673,7 +1677,7 @@ public class JmsOutputChannel implements
       // on the delegate that we were unable to send a message to. The delegate state is
       // set to FAILED. If there are retries or more CASes to send to this delegate the
       // connection will be retried.
-      if (isRequest) {
+      if (isRequest && anEndpoint.getDelegateKey() != null) {
         // Spin recovery thread to handle send error. After the recovery thread
         // is started the current (process) thread goes back to a thread pool in
         // ThreadPoolExecutor. The recovery thread can than stop the listener and the
@@ -1684,6 +1688,30 @@ public class JmsOutputChannel implements
         Thread t = new Thread(Thread.currentThread().getThreadGroup().getParent(), recoveryThread);
         t.start();
       } else {
+    	  try {
+        	  CasStateEntry casStateEntry = getAnalysisEngineController().
+				getLocalCache().lookupEntry(entry.getCasReferenceId());
+        	  casStateEntry.setDeliveryToClientFailed();   // Mark the CAS, so that later we know that the delivery to client failed
+        	  if ( anEndpoint != null ) {
+				  // Add the reply destination (temp queue) to a dead client map
+            	  Object clientDestination = anEndpoint.getDestination();
+        		  if ( clientDestination != null && clientDestination instanceof TemporaryQueue ) {
+        			  if ( !getAnalysisEngineController().
+                	  		getDeadClientMap().containsKey(clientDestination.toString())) {
+                    	  getAnalysisEngineController().
+                    	  		getDeadClientMap().
+                    	  			put(clientDestination.toString(),clientDestination.toString());
+        			  }
+        		  }
+        	  }
+        	  
+    	  } catch( Exception e ) {
+    		  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+    	                "dispatch", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+    	                "UIMAJMS_exception__WARNING", e);
+    	  }
+    	  
+    	  
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(
                   Level.INFO,
@@ -2542,15 +2570,15 @@ public class JmsOutputChannel implements
           ic.destroyListener(delegate.getEndpoint().getDestination().toString(), endpoint
                   .getDelegateKey());
         }
+        // Setup error context and handle failure in the error handler
+        ErrorContext errorContext = new ErrorContext();
+        errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+        errorContext.add(AsynchAEMessage.CasReference, entry.getCasReferenceId());
+        errorContext.add(AsynchAEMessage.Endpoint, endpoint);
+        errorContext.handleSilently(true); // dont dump exception to the log
+        // Failure on send treat as timeout
+        delegate.handleError(new MessageTimeoutException(), errorContext);
       }
-      // Setup error context and handle failure in the error handler
-      ErrorContext errorContext = new ErrorContext();
-      errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
-      errorContext.add(AsynchAEMessage.CasReference, entry.getCasReferenceId());
-      errorContext.add(AsynchAEMessage.Endpoint, endpoint);
-      errorContext.handleSilently(true); // dont dump exception to the log
-      // Failure on send treat as timeout
-      delegate.handleError(new MessageTimeoutException(), errorContext);
 
     }
   }