You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/10/14 19:30:40 UTC

svn commit: r1764952 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/ uimaj-as-a...

Author: cwiklik
Date: Fri Oct 14 19:30:40 2016
New Revision: 1764952

URL: http://svn.apache.org/viewvc?rev=1764952&view=rev
Log:
UIMA-5123 refactored code that deals with recovery after broker restart

Added:
    uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml   (with props)
    uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml   (with props)
    uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml   (with props)
Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
    uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Fri Oct 14 19:30:40 2016
@@ -19,12 +19,9 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,7 +40,9 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
 import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.AsyncCallback;
 import org.apache.activemq.ConnectionFailedException;
 import org.apache.activemq.advisory.ConsumerEvent;
 import org.apache.activemq.advisory.ConsumerListener;
@@ -96,7 +95,7 @@ public class JmsEndpointConnection_impl
 
   private volatile boolean retryEnabled;
 
-  private AnalysisEngineController controller = null;
+  protected AnalysisEngineController controller = null;
 
   private volatile boolean connectionAborted = false;
 
@@ -277,7 +276,7 @@ public class JmsEndpointConnection_impl
 		            				  conn.close();
 		            			  } catch( Exception ee) {}
 		            		  }
-		            		//  if ( logConnectionProblem ) {
+		            		  if ( jex.getCause() != null && logConnectionProblem ) {
 		            			  logConnectionProblem = false;   // log once
 			            		  // Check if unable to connect to the broker and retry ...
 		            			  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -289,8 +288,15 @@ public class JmsEndpointConnection_impl
 		            		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
 		            		                  "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
 		            		                  "UIMAJMS_exception__WARNING", jex);
+		            		          
+		            		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+		            		                  "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+		            		                  "UIMAEE_service_lost_connectivity_WARNING",
+		            		                  new Object[] { controller.getComponentName(), brokerUri});
+		            		          
+		            		          
 		            		        }
-		            	//	  } 
+		            		  } 
 		            		 this.wait(1000);  // wait between retries 
 		            	  } catch ( Exception ee) {
 		            		  ee.printStackTrace();
@@ -668,7 +674,12 @@ public class JmsEndpointConnection_impl
         }
         logMessageSize(aMessage, msgSize, destinationName);
         synchronized (producer) {
-          producer.send((Destination) delegateEndpoint.getDestination(), aMessage);
+            // create amq async callback listener to detect jms msg delivery problems
+        	AsyncCallback onComplete = createAMQCallbackListener(command, aMessage);
+        	// if the msg cannot be delivered due to invalid destination, the send does
+        	// not fail since we are using AMQ async sends. To detect delivery issues
+        	// we use callback listener where such conditions are detected and handled
+        	((ActiveMQMessageProducer)producer).send((Destination) delegateEndpoint.getDestination(), aMessage, onComplete);
         }
       } else {
         destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
@@ -678,22 +689,27 @@ public class JmsEndpointConnection_impl
                   new Object[] { destinationName });
         }
         logMessageSize(aMessage, msgSize, destinationName);
-  	  // If in ParallelStep its possible to receive a reply from one of the delegates in parallel 
-  	  // step *before* a CAS is dispatched to all of the delegates. This can cause a problem
-  	  // as replies are merged which causes the CAS to be in an inconsistent state.
-  	  // The following code calls dispatchCasToParallelDelegate() which count down
-  	  // a java latch. The same latch is used when receiving replies. If the latch is non zero
-  	  // the code blocks a thread from performing deserialization.
-  	  if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
+   	    // If in ParallelStep its possible to receive a reply from one of the delegates in parallel 
+  	    // step *before* a CAS is dispatched to all of the delegates. This can cause a problem
+  	    // as replies are merged which causes the CAS to be in an inconsistent state.
+  	    // The following code calls dispatchCasToParallelDelegate() which count down
+  	    // a java latch. The same latch is used when receiving replies. If the latch is non zero
+  	    // the code blocks a thread from performing deserialization.
+  	    if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
   		  String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
   		  CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
   		  if ( casStateEntry.getNumberOfParallelDelegates() > 0) {
   			  casStateEntry.dispatchedCasToParallelDelegate();
   		  }
-  	  }
+  	    }
 
         synchronized (producer) {
-          producer.send(aMessage);
+            // create amq async callback listener to detect jms msg delivery problems
+        	AsyncCallback onComplete = createAMQCallbackListener(command, aMessage);
+        	// if the msg cannot be delivered due to invalid destination, the send does
+        	// not fail since we are using AMQ async sends. To detect delivery issues
+        	// we use callback listener where such conditions are detected and handled
+        	((ActiveMQMessageProducer)producer).send(aMessage, onComplete);
         }
 
       }
@@ -709,7 +725,7 @@ public class JmsEndpointConnection_impl
       // to find inactive sessions.
 	  lastDispatchTimestamp.set(System.currentTimeMillis());
       // Succeeded sending the CAS
-      return true;
+	  return true;
     } catch (Exception e) {
     	
     	 // if a client terminates with an outstanding request, the service will not
@@ -805,6 +821,20 @@ public class JmsEndpointConnection_impl
     return false;
   }
 
+  public AsyncCallback createAMQCallbackListener(int command, Message aMessage) throws Exception {
+  	String cid="";
+  	CasStateEntry casStateEntry = null;
+  	AsyncCallback onComplete = null;
+  	if ( command == AsynchAEMessage.Process) {
+  		cid = aMessage.getStringProperty(AsynchAEMessage.CasReference);
+		casStateEntry = controller.getLocalCache().lookupEntry(cid);
+		onComplete = new UimaAsAsyncCallbackListener(casStateEntry);
+  	} else {
+		onComplete = new UimaAsAsyncCallbackListener(command);
+  	}
+  	return onComplete;
+  }
+  
   private void logMessageSize(Message aMessage, long msgSize, String destinationName) {
     if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
       boolean isReply = false;
@@ -921,4 +951,72 @@ public class JmsEndpointConnection_impl
 //    brokerDestinations.getConnectionTimer().stopTimer();
   }
 
+  private class UimaAsAsyncCallbackListener implements AsyncCallback {
+	CasStateEntry casState=null;
+	int command;
+	
+	public UimaAsAsyncCallbackListener(int command) {
+		this.command = command;
+	}
+	public UimaAsAsyncCallbackListener( CasStateEntry casState ) {
+		this.casState = casState;
+	}
+	public void onException(JMSException exception) {
+		if ( casState != null ) {
+			
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+						"UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+						"UIMAEE_unable_to_deliver_msg__INFO",
+						new Object[] { controller.getComponentName(), casState.getCasReferenceId(),exception.getMessage() });
+		    }
+			casState.setDeliveryToClientFailed();
+			if ( casState.isSubordinate()) {
+				try {
+					
+					String inputCasId = casState.getInputCasReferenceId();
+					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+								"UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+								"UIMAEE_force_cas_abort__INFO",
+								new Object[] { controller.getComponentName(), "parent", inputCasId });
+				    }
+					
+					
+					CasStateEntry parentCasStateEntry = controller.getLocalCache().lookupEntry(inputCasId);
+					//parentCasStateEntry.setDeliveryToClientFailed();
+					parentCasStateEntry.setFailed();
+					controller.addAbortedCasReferenceId(inputCasId);
+					if ( controller instanceof AggregateAnalysisEngineController ) {
+						List<AnalysisEngineController> controllers = 
+								((AggregateAnalysisEngineController)controller).getChildControllerList();
+						for( AnalysisEngineController ctrl : controllers) {
+							ctrl.addAbortedCasReferenceId(inputCasId);
+						}
+					}
+					
+				} catch( Exception e) {
+					e.printStackTrace();
+				}
+				controller.releaseNextCas(casState.getCasReferenceId());
+			}
+			
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+							"UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+							"UIMAEE_release_cas_req__FINE",
+							new Object[] { controller.getComponentName(), casState.getCasReferenceId() });
+			}
+		} else {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                  "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), "", endpointName});
+
+		}
+	}
+
+	public void onSuccess() {
+	}
+	  
+  }
 }

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Fri Oct 14 19:30:40 2016
@@ -27,12 +27,14 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.uima.UIMAFramework;
@@ -57,6 +59,7 @@ import org.apache.uima.adapter.jms.JmsCo
 import org.apache.uima.adapter.jms.message.JmsMessageContext;
 import org.apache.uima.util.Level;
 import org.springframework.jms.listener.SessionAwareMessageListener;
+import org.springframework.jms.support.destination.DestinationResolver;
 
 /**
  * Thin adapter for receiving JMS messages from Spring. It delegates processing of all messages to
@@ -1041,7 +1044,61 @@ public class JmsInputChannel implements
       }
     }
   }
-
+  public void createListenerOnTempQueue(ConnectionFactory cf, boolean isFreeCasDestination ) throws Exception {
+	  TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName());
+	  UimaDefaultMessageListenerContainer connector = new UimaDefaultMessageListenerContainer(true);
+	  connector.setConnectionFactory(cf);
+	  resolver.setListener(connector);
+	  connector.setConcurrentConsumers(1);
+	  connector.setDestinationResolver(resolver);
+	  connector.setController(getController());
+	  connector.setMessageListener(this);
+	  connector.initializeContainer();
+	  connector.getDestination();
+	  connector.afterPropertiesSet(false);
+	  UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,getController().getComponentName()+"-JmsInputChannel.createListenerOnTempQueue()-starting new Listener" );
+	  connector.start();
+	  boolean log = true;
+	  synchronized (mux) {
+		  while (connector.getListenerEndpoint() == null) {
+			  try {
+				  if ( log ) {
+					  log = false;
+					  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+						  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+								  "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+								  "UIMAJMS_temp_destination_not_available_retrying__INFO",
+								  new Object[] { getController().getComponentName(), "5"});
+					  }
+				  }
+				  mux.wait(5000);
+				  
+			  } catch (InterruptedException e) {
+			  }
+		  }
+
+	  }
+
+	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+		  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+				  "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				  "UIMAJMS_temp_destination_available__INFO",
+				  new Object[] { getController().getComponentName(), connector.getListenerEndpoint(), isFreeCasDestination});
+	  }
+
+	  if ( isFreeCasDestination ) {
+		  ((JmsOutputChannel) getController().getOutputChannel())
+          .setFreeCasQueue(connector.getListenerEndpoint());
+	  }
+      setListenerContainer(connector);
+
+	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
+		  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
+				  "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				  "UIMAJMS_activated_fcq__CONFIG",
+				  new Object[] { getController().getComponentName(), connector.getEndpointName() });
+	  }
+  }
   public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception {
     if (getController() instanceof AggregateAnalysisEngineController) {
       Delegate delegate = ((AggregateAnalysisEngineController) getController())
@@ -1056,7 +1113,7 @@ public class JmsInputChannel implements
         newListener.setMessageListener(this);
         newListener.setController(getController());
 
-        TempDestinationResolver resolver = new TempDestinationResolver();
+        TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName());
         resolver.setConnectionFactory(f);
         resolver.setListener(newListener);
         newListener.setDestinationResolver(resolver);

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Fri Oct 14 19:30:40 2016
@@ -530,9 +530,12 @@ public class JmsOutputChannel implements
                 //  Only one thread at a time is allowed here.
                 synchronized( masterEndpoint ) {
                   if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
+                	  
+                	String name =  anEndpoint.getDestination().toString();
                     //  Returns InputChannel if the Reply Listener for the delegate has previously failed.
                     //  If the listener hasnt failed the getReplyInputChannel returns null
-                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
+//                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
+                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString());
                     if ( iC != null ) { 
                       try {
                         // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
@@ -778,7 +781,7 @@ public class JmsOutputChannel implements
 
 
   public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
-    try {
+	  try {
       anEndpoint.setReplyEndpoint(true);
       if (anEndpoint.isRemote()) {
         if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
@@ -1670,7 +1673,6 @@ public class JmsOutputChannel implements
         // produced by the CAS Multiplier. The client will treat this CAS
         // differently from the input CAS.
         tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-
         isRequest = true;
         // Save the id of the parent CAS
         tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java Fri Oct 14 19:30:40 2016
@@ -55,7 +55,7 @@ import org.springframework.jms.support.d
 public class SpringContainerDeployer implements ControllerCallbackListener {
   private static final Class CLASS_NAME = SpringContainerDeployer.class;
 
-  private static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10;
+  public static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10;
 
   public static final int QUIESCE_AND_STOP = 1000;
 

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java Fri Oct 14 19:30:40 2016
@@ -33,6 +33,13 @@ public class TempDestinationResolver imp
 
   private Object mutex = new Object();
 
+  private String serviceName = "";
+  
+  public TempDestinationResolver() {
+  }
+  public TempDestinationResolver(String name) {
+	  serviceName = name;
+  }
   /**
    * This method is called by the Spring listener code. It creates a single temp queue for all
    * listener instances. If the Spring listener is configured with more than one concurrentConsumer,
@@ -41,7 +48,6 @@ public class TempDestinationResolver imp
    */
   public Destination resolveDestinationName(Session session, String destinationName,
           boolean pubSubDomain) throws JMSException {
-
     synchronized (mutex) {
       if (destination == null) {
         destination = session.createTemporaryQueue();

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Oct 14 19:30:40 2016
@@ -19,11 +19,9 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
-import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +36,7 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
@@ -125,6 +124,7 @@ public class UimaDefaultMessageListenerC
   //	 on listener failure log once and retry silently
   private volatile boolean logListenerFailure=true;
   
+  private static CountDownLatch recoveryLatch = new CountDownLatch(4);
   public UimaDefaultMessageListenerContainer() {
     super();
     // reset global static. This only effects unit testing as services are deployed 
@@ -132,7 +132,7 @@ public class UimaDefaultMessageListenerC
     terminating = false;
     UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
     __listenerRef = this;
-    setRecoveryInterval(30000);  // increase connection recovery to 30 sec
+    setRecoveryInterval(400);  // increase connection recovery to 30 sec
     setAcceptMessagesWhileStopping(true);
     setExceptionListener(this);
     threadGroup = new ThreadGroup("ListenerThreadGroup_"
@@ -166,7 +166,63 @@ public class UimaDefaultMessageListenerC
     		          tcon = createConnection();
     		          JmsUtils.closeConnection(tcon);
     		        }
-    		        logger.info("Successfully refreshed JMS Connection");
+    		        String ctrlName = "";
+    		        if ( controller != null ) {
+    		        	ctrlName = "Controller: "+controller.getComponentName();
+    		        }
+    		        if ( super.getMessageSelector() != null ) {
+        		        logger.info(ctrlName+" Successfully refreshed JMS Connection - Selector "+super.getMessageSelector()+" Instance hashcode:"+this.hashCode());
+    		        	
+    		        } else {
+        		        logger.info(ctrlName+" Successfully refreshed JMS Connection ");
+    		        	
+    		        }
+    		        //if (controller != null && controller instanceof AggregateAnalysisEngineController) {
+    					//	If endpoint not set, this is a temp reply queue listener.
+    	               if (getDestination() != null && ((ActiveMQDestination)getDestination()).isTemporary()) {
+    	                   destroy();
+    	            	   logger.info("Controller:"+controller.getComponentName()+"... Destroyed Listener on a temp queue:"+getDestination());
+//    	                   ((JmsOutputChannel)controller.getOutputChannel()).setFreeCasQueue(getDestination());
+    	            	 
+    	            	   if ( freeCasQueueListener ) {
+       	            		logger.info("Controller:"+controller.getComponentName()+" ------------------- Creating new listener for the FreeCas temp queue");
+       	            		try {
+       	                	    ((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory(), true);
+       	                		logger.info("Controller:"+controller.getComponentName()+"------------------- New listener on FreeCas temp queue is ready");
+       	            		} catch( Exception e) {
+       	            	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+       	            	              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+       	            	                      "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+       	            	                      "UIMAJMS_exception__WARNING", e);
+       	            	            }
+       	            		}
+    	                   }
+    	                   
+    	                       	            	/*  
+    	               	if ( getMessageListener() instanceof JmsInputChannel ) {
+    	            		System.out.println("------------------- Creating new listener for the temp queue");
+    	            		try {
+    	                	((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory());
+    	                		System.out.println("------------------- New listener on temp queue is ready");
+    	            		} catch( Exception e) {
+    	                		System.out.println("------------------- Error while creating new listener on temp queue");
+    	            	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+    	            	              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+    	            	                      "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+    	            	                      "UIMAJMS_exception__WARNING", e);
+    	            	            }
+    	            			
+    	            		}
+    	            	}
+*/
+    	               
+    	               }
+    	               
+    	  /*
+    	               String delegateKey = ((AggregateAnalysisEngineController) controller)
+    	                .lookUpDelegateKey(endpoint.getEndpoint());
+    		*/
+    		        //}
     		        break;
     		      }
     		      catch (Exception ex) {
@@ -186,8 +242,8 @@ public class UimaDefaultMessageListenerC
     		        }
     		      }
     		     // sleepInbetweenRecoveryAttempts();
-    		      setRecoveryInterval(10);
-    		    }	    
+   // 		      setRecoveryInterval(10);
+    		}	    
     	}
     } catch( IllegalStateException e ) {
     }
@@ -243,7 +299,7 @@ public class UimaDefaultMessageListenerC
         } else {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                        "handleTempQueueFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                        "handleListenerFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                         "UIMAJMS_unable_to_lookup_input_channel__INFO", queueName);
           }
         }
@@ -291,6 +347,24 @@ public class UimaDefaultMessageListenerC
       
     } catch( Exception exx ) { // shared connection  may not exist yet if a broker is not up
     }
+    if (t instanceof InvalidDestinationException ) {
+    	destroy();
+    	if ( getMessageListener() instanceof JmsInputChannel ) {
+    		try {
+        	//	((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory());
+    		} catch( Exception e) {
+    	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+    	              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+    	                      "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+    	                      "UIMAJMS_exception__WARNING", e);
+    	            }
+    			
+    		}
+    	}
+    	return;
+    }
+
+    
     if ( (conn != null && conn.isTransportFailed() ) || 
             t instanceof javax.jms.IllegalStateException
             && t.getMessage().equals("The Consumer is closed")) {
@@ -340,6 +414,7 @@ public class UimaDefaultMessageListenerC
       }
     } else if (disableListener(t)) {
       handleQueueFailure(t);
+    } else {
     }
   }
 
@@ -442,7 +517,7 @@ public class UimaDefaultMessageListenerC
       }
     
     
-    setRecoveryInterval(0);
+    setRecoveryInterval(1);
 
     // Spin a shutdown thread to terminate listener.
     new Thread() {
@@ -886,7 +961,7 @@ public class UimaDefaultMessageListenerC
                     JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
                      new Object[] { msg });
          }
-    	setRecoveryInterval(0);
+    	setRecoveryInterval(1);
       setAutoStartup(false);
       if ( getSharedConnection() != null ) {
         ActiveMQConnection amqc = (ActiveMQConnection)getSharedConnection();

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Fri Oct 14 19:30:40 2016
@@ -319,7 +319,9 @@ public class BaseUIMAAsynchronousEngine_
     	      if ( amqc != null && !amqc.isClosed() && !amqc.isClosing() && consumerDestination != null && 
     	           consumerDestination instanceof ActiveMQTempDestination ) {
     	        try {
-    	          amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination);
+    	        	if ( !amqc.isClosed() && !amqc.isTransportFailed()) {
+    	    	          amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination);
+    	        	}
     	        } catch( Exception e) {
     	          e.printStackTrace();
     	        }
@@ -351,6 +353,7 @@ public class BaseUIMAAsynchronousEngine_
 		        sender.doStop();
 		      }
 			  try {
+				  System.out.println(this.getClass().getName()+".stop() - Stopping UIMA-AS Client");
 				stopConnection();
 				// Undeploy all containers
 				undeploy();

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Fri Oct 14 19:30:40 2016
@@ -37,6 +37,8 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -58,6 +60,8 @@ import org.apache.uima.aae.client.UimaAs
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.error.ServiceShutdownException;
+import org.apache.uima.aae.error.UimaASPingTimeout;
+import org.apache.uima.aae.error.UimaASProcessCasTimeout;
 import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
@@ -120,6 +124,358 @@ public class TestUimaASExtended extends
 
 	return b.getDefaultSocketURIString();
     }  
+    
+    
+    /**
+     * Tests error handling of the client. It deploys Aggregate service Cas Multiplier. initializes
+     * the client and sends a CAS for processing. The child CAS is than held in NoOp Annotator for
+     * 30 secs to simulate heavy processing. While the CAS is being processed, a broker is stopped.
+     * The client should timeout after 40 secs and attempt to send 2 more CASes. Since the broker
+     * is down, each of these 2 CASes goes into a retry list while a Connection is being retried.
+     * Both should timeout, and sendAndReceive() should fail due to a timeout.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testClientRecoveryFromBrokerFailure() throws Exception {
+      System.out.println("-------------- testClientRecoveryFromBrokerFailure -------------");
+      System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml");
+
+      Map<String, Object> appCtx = 
+         buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "TopLevelTaeQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 40000);   // AE will hold the CAS for 30 secs so this needs to be larger
+      appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+
+      ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
+
+      // schedule a thread that will stop the broker after 10 secs
+      s.schedule(
+    		  new Runnable() {
+
+				@Override
+				public void run() {
+					try {
+						System.out.println("Stopping Broker ...");
+						broker.stop();
+ 				        broker.waitUntilStopped();
+						System.out.println("Broker Stopped...");
+
+					} catch( Exception e) {
+						
+					}
+					
+				}
+    			  
+    		  }
+    		  , 10, TimeUnit.SECONDS);
+      int timeoutCount=0;
+
+      // try to send 3 CASes, each should timeout
+      for (int i = 0; i < 3; i++) {
+          CAS cas = uimaAsEngine.getCAS();
+          cas.setDocumentText("Some Text");
+          try {
+        	  System.out.println("............... Client Sending CAS #"+(i+1));
+              uimaAsEngine.sendAndReceiveCAS(cas);
+            } catch( Exception e) {
+            	if ( e instanceof UimaASProcessCasTimeout ) {
+            		timeoutCount++;
+            		System.out.println("Client .............. "+e.getMessage());
+            		if ( e.getCause() != null && e instanceof UimaASPingTimeout) {
+                		System.out.println("Client .............. "+e.getCause().getMessage());
+            		}
+            	} else if ( e.getCause() instanceof UimaASProcessCasTimeout ) {
+            		timeoutCount++;
+            		System.out.println("Client .............. "+e.getCause().getMessage());
+            		if ( e.getCause().getCause() != null && e.getCause().getCause() instanceof UimaASPingTimeout) {
+                		System.out.println("Client .............. "+e.getCause().getCause().getMessage());
+            		}
+            	} else {
+                	e.printStackTrace();
+            	}
+            	//              System.out.println("Client Received Expected Error on CAS:"+(i+1));
+            } finally {
+              cas.release();
+            }
+      }
+      if ( timeoutCount != 3) {
+          uimaAsEngine.stop();
+    	  fail("Expected 3 Errors Due to Timeout, Instead Got "+timeoutCount+" Timeouts");
+      } else {
+          uimaAsEngine.stop();
+      }
+
+ }
+    
+    @Test
+    public void testBrokerRestartWithAggregateMultiplier() throws Exception {
+      System.out.println("-------------- testBrokerRestartWithAggregateMultiplier -------------");
+      System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+      deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
+      deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
+      deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml");
+
+      String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+      Map<String, Object> appCtx = buildContext(burl, "TopLevelTaeQueue");
+      synchronized(this) {
+    	  this.wait(2000);
+      }
+      broker.stop();
+      broker.waitUntilStopped();
+
+      synchronized(this) {
+    	  this.wait(2000);
+      }
+
+      broker = createBroker();
+      broker.start();
+      broker.waitUntilStarted();
+      synchronized(this) {
+    	  this.wait(2000);
+      }
+
+
+      // reduce the cas pool size and reply window
+      appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+      appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
+      runTest(appCtx, eeUimaEngine,burl,
+              "TopLevelTaeQueue", 1, PROCESS_LATCH);
+      eeUimaEngine.stop();
+ }
+    
+    /**
+     * Tests client and service recovery from broker restart. It deploys CM service, dispatches
+     * a CAS for processing and while the CAS is in process, it bounces a broker. The service
+     * listeners should be restored and the CAS should fail due to invalid destination. Once
+     * the client times out, it should send 2 more CASes which should force client to re-establish
+     * connection with a broker and replies should come back.
+     * 
+     * @throws Exception
+     */
+
+    @Test
+    public void testBrokerRestartWithAggregateMultiplierWhileProcessingCAS() throws Exception {
+      System.out.println("-------------- testBrokerRestartWithAggregateMultiplierWhileProcessingCAS -------------");
+      System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml");
+
+      Map<String, Object> appCtx = 
+         buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "TopLevelTaeQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 40000);
+      appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+
+
+      ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
+
+      s.schedule(
+    		  new Runnable() {
+
+				@Override
+				public void run() {
+					try {
+						System.out.println("Stopping Broker ...");
+						broker.stop();
+ 				        broker.waitUntilStopped();
+						System.out.println("Broker Stopped...");
+
+					    broker = createBroker();
+					    broker.start();
+					    broker.waitUntilStarted();
+					    System.out.println("Broker Restarted...");
+
+					
+					} catch( Exception e) {
+						
+					}
+					
+				}
+    			  
+    		  }
+    		  , 10, TimeUnit.SECONDS);
+
+
+      for (int i = 0; i < 3; i++) {
+          CAS cas = uimaAsEngine.getCAS();
+          cas.setDocumentText("Some Text");
+          try {
+        	  System.out.println("............... Client Sending CAS #"+(i+1));
+              uimaAsEngine.sendAndReceiveCAS(cas);
+            } catch( Exception e) {
+            	e.printStackTrace();
+            	//              System.out.println("Client Received Expected Error on CAS:"+(i+1));
+            } finally {
+              cas.release();
+            }
+
+
+      }
+      uimaAsEngine.stop();
+ }
+
+    
+    @Test
+    public void testBrokerRestartWhileProcessingCAS() throws Exception {
+      System.out.println("--------------  testBrokerRestartWhileProcessingCAS -------------");
+      System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      
+      deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWith30SecDelay.xml");
+      Map<String, Object> appCtx = 
+      buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 40000);
+      appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+
+
+      ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
+      
+      s.schedule(
+    		  new Runnable() {
+
+				@Override
+				public void run() {
+					try {
+						System.out.println("Stopping Broker ...");
+						broker.stop();
+ 				        broker.waitUntilStopped();
+						System.out.println("Broker Stopped...");
+
+					    broker = createBroker();
+					    broker.start();
+					    broker.waitUntilStarted();
+					    System.out.println("Broker Restarted...");
+
+					
+					} catch( Exception e) {
+						
+					}
+					
+				}
+    			  
+    		  }
+    		  , 10, TimeUnit.SECONDS);
+
+      for (int i = 0; i < 1; i++) {
+          CAS cas = uimaAsEngine.getCAS();
+          cas.setDocumentText("Some Text");
+          try {
+              uimaAsEngine.sendAndReceiveCAS(cas);
+            } catch( Exception e) {
+            	e.printStackTrace();
+            	//              System.out.println("Client Received Expected Error on CAS:"+(i+1));
+            } finally {
+              cas.release();
+            }
+
+
+      }
+      uimaAsEngine.stop();
+      
+      
+    }
+    /**
+     * This tests if broker keep-alive protocol is working. With AMQ 5.13.2 the test
+     * fails due to broker bug. What happens is that when a jms client uses http
+     * protocol, the connection is made but the keep-alive chat between broker and
+     * client is not causing a timeout and an exception. 
+     * 
+     * The exception is internal to the broker but it also happens within amq
+     * client code. To get to this, a custom spring based listener is deployed 
+     * with some of its exception handling methods overriden to capture an exception. 
+     *  
+     * @throws Exception
+     */
+    @Test
+    public void testServiceWithHttpListeners() throws Exception {
+  	    System.out.println("-------------- testServiceWithHttpListeners -------------");
+  	    // Need java monitor object on which to sleep
+  	    Object waitObject = new Object();
+  	    // Custom spring listener with handleListenerSetupFailure() overriden to 
+  	    // capture AMQ exception.
+  	    TestDefaultMessageListenerContainer c = new TestDefaultMessageListenerContainer();
+  	    c.setConnectionFactory(new ActiveMQConnectionFactory("http://localhost:18888"));
+  	    c.setDestinationName("TestQ");
+  	    c.setConcurrentConsumers(2);
+  	    c.setBeanName("TestBean");
+  	    c.setMessageListener(new JmsInputChannel());
+  	    c.initialize();
+  	    c.start();
+  	    
+  	    if ( c.isRunning() ) {
+  		    System.out.println("... Listener Ready");
+  	    	
+  	    }
+  	    // Keep-alive has a default 30 secs timeout. Sleep for bit longer than that
+  	    // If there is an exception due to keep-alive, an exception handler will be
+  	    // called on the TestDefaultMessageListenerContainer instance where we 
+  	    // capture the error.
+  	    System.out.println("... Waiting for 40 secs");
+  	    try {
+  	    	synchronized(waitObject) {
+  	    		waitObject.wait(40000);
+  	    	}
+  	    	// had there been broker issues relateds to keep-alive the listener's failed
+  	    	// flag would have been set by now. Check it and fail the test 
+  	    	if ( c.failed() ) {
+  		    	fail("Broker Failed - Reason:"+c.getReasonForFailure());
+  	    	} else {
+  	    		System.out.println("Stopping Listener");
+  	    		c.stop();
+
+  	    	}
+  	    } catch( Exception e) {
+  	    	e.printStackTrace();
+  	    	fail(e.getMessage());
+  	    }
+    }
+
+    
+
+    @Test
+    public void testBrokerRestartWithPrimitiveMultiplier() throws Exception {
+      System.out.println("-------------- testBrokerRestartWithPrimitiveMultiplier -------------");
+      System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+      
+      deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
+     
+      
+      broker.stop();
+      broker.waitUntilStopped();
+
+      broker = createBroker();
+      broker.start();
+      broker.waitUntilStarted();
+
+      String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+      Map<String, Object> appCtx = 
+      buildContext(burl, "TestMultiplierQueue");
+
+      // reduce the cas pool size and reply window
+      appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+      appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
+      runTest(appCtx, eeUimaEngine,burl,
+              "TestMultiplierQueue", 1, PROCESS_LATCH);
+      
+      eeUimaEngine.stop();
+    }
+
+    
+    
   /*
   public void testContinueOnRetryFailure2() throws Exception {
 	    System.out.println("-------------- testContinueOnRetryFailure -------------");
@@ -147,9 +503,159 @@ public class TestUimaASExtended extends
 	  }
 */
   
+    
+    
+    /*
+     * Tests 
+     */
+    @Test
+    public void testSyncClientRecoveryFromBrokerStopAndRestart3() throws Exception  {
+      System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
+      System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+ 
+      // Instantiate Uima AS Client
+        BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+        //BrokerService broker2 = setupSecondaryBroker(true);
+        // Deploy Uima AS Primitive Service
+        deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+        
+        Map<String, Object> appCtx = 
+        buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue");
+        appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+        appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+        appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+        initialize(uimaAsEngine, appCtx);
+        waitUntilInitialized();
+
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+        //broker2 = setupSecondaryBroker(true);
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+        int errorCount = 0;
+        System.out.println("Sending CASes");
+        for (int i = 0; i < 60; i++) {
+            CAS cas = uimaAsEngine.getCAS();
+            cas.setDocumentText("Some Text");
+            try {
+                uimaAsEngine.sendAndReceiveCAS(cas);
+              } catch( Exception e) {
+                System.out.println("Client Received Expected Error on CAS:"+(i+1));
+              } finally {
+                cas.release();
+              }
 
 
- 
+        }
+        uimaAsEngine.stop();
+        
+        /*
+        int errorCount=0;
+        for (int i = 0; i < 20; i++) {
+          
+          if ( i == 5 ) {
+            broker2.stop();
+            broker2.waitUntilStopped();
+          } else if ( i == 10 ) {
+            //  restart the broker 
+            System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+            broker2 = setupSecondaryBroker(true);
+            
+            broker2.start();
+            broker2.waitUntilStarted();
+
+          }
+          CAS cas = uimaAsEngine.getCAS();
+          cas.setDocumentText("Some Text");
+        //  System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+          try {
+            uimaAsEngine.sendAndReceiveCAS(cas);
+          } catch( Exception e) {
+            errorCount++;
+            System.out.println("Client Received Expected Error on CAS:"+(i+1));
+          } finally {
+            cas.release();
+          }
+        }
+        
+        uimaAsEngine.stop();
+        super.cleanBroker(broker2);
+
+        broker2.stop();
+
+        //  expecting 5 failures due to broker missing
+        if ( errorCount != 5 ) {
+          fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
+        }
+        broker2.waitUntilStopped();
+*/
+    }
+
+    /*
+    
+    @Test
+    public void testSyncClientRecoveryFromBrokerStopAndRestart2() throws Exception  {
+        broker.stop();
+        broker.waitUntilStopped();
+    	System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
+         // Instantiate Uima AS Client
+          BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+          BrokerService broker2 = setupSecondaryBroker(true);
+          // Deploy Uima AS Primitive Service
+          deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+          
+          Map<String, Object> appCtx = 
+          buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
+          appCtx.put(UimaAsynchronousEngine.Timeout, 5100);
+          appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+          appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+          initialize(uimaAsEngine, appCtx);
+          waitUntilInitialized();
+          
+          // Get meta received, bounce the broker now. 
+          broker2.stop();
+          broker2.waitUntilStopped();
+
+          
+//          ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("");
+//          ActiveMQConnection c = (ActiveMQConnection)f.createConnection();
+          //  restart the broker 
+          System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+          broker2 = setupSecondaryBroker(true);
+          
+          broker2.start();
+          broker2.waitUntilStarted();
+        
+          // new broker is up. Send a few CASes now
+          for( int i=0; i < 5; i++) {
+              CAS cas = uimaAsEngine.getCAS();
+              cas.setDocumentText("Some Text");
+              try {
+                uimaAsEngine.sendAndReceiveCAS(cas);
+              } catch( Exception e) {
+            	  e.printStackTrace();
+            	 // fail("Unexpected exception from sendAndReceive()- test Failed");
+              } finally {
+                cas.release();
+              }
+        	  
+          }
+
+          
+          
+          uimaAsEngine.stop();
+          super.cleanBroker(broker2);
+
+          broker2.stop();
+          broker2.waitUntilStopped();
+
+      }
+
+ */
   
   /**
    * Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to emulate usage from
@@ -636,63 +1142,6 @@ public class TestUimaASExtended extends
 	}
   
 
-  /**
-   * This tests if broker keep-alive protocol is working. With AMQ 5.13.2 the test
-   * fails due to broker bug. What happens is that when a jms client uses http
-   * protocol, the connection is made but the keep-alive chat between broker and
-   * client is not causing a timeout and an exception. 
-   * 
-   * The exception is internal to the broker but it also happens within amq
-   * client code. To get to this, a custom spring based listener is deployed 
-   * with some of its exception handling methods overriden to capture an exception. 
-   *  
-   * @throws Exception
-   */
-  @Test
-  public void testServiceWithHttpListeners() throws Exception {
-	    System.out.println("-------------- testServiceWithHttpListeners -------------");
-	    // Need java monitor object on which to sleep
-	    Object waitObject = new Object();
-	    // Custom spring listener with handleListenerSetupFailure() overriden to 
-	    // capture AMQ exception.
-	    TestDefaultMessageListenerContainer c = new TestDefaultMessageListenerContainer();
-	    c.setConnectionFactory(new ActiveMQConnectionFactory("http://localhost:18888"));
-	    c.setDestinationName("TestQ");
-	    c.setConcurrentConsumers(2);
-	    c.setBeanName("TestBean");
-	    c.setMessageListener(new JmsInputChannel());
-	    c.initialize();
-	    c.start();
-	    
-	    if ( c.isRunning() ) {
-		    System.out.println("... Listener Ready");
-	    	
-	    }
-	    // Keep-alive has a default 30 secs timeout. Sleep for bit longer than that
-	    // If there is an exception due to keep-alive, an exception handler will be
-	    // called on the TestDefaultMessageListenerContainer instance where we 
-	    // capture the error.
-	    System.out.println("... Waiting for 40 secs");
-	    try {
-	    	synchronized(waitObject) {
-	    		waitObject.wait(40000);
-	    	}
-	    	// had there been broker issues relateds to keep-alive the listener's failed
-	    	// flag would have been set by now. Check it and fail the test 
-	    	if ( c.failed() ) {
-		    	fail("Broker Failed - Reason:"+c.getReasonForFailure());
-	    	} else {
-	    		System.out.println("Stopping Listener");
-	    		c.stop();
-
-	    	}
-	    } catch( Exception e) {
-	    	e.printStackTrace();
-	    	fail(e.getMessage());
-	    }
-  }
-
-  
   @Test
   public void testAggregateHttpTunnelling() throws Exception {
     System.out.println("-------------- testAggregateHttpTunnelling -------------");
@@ -1469,15 +1918,18 @@ public class TestUimaASExtended extends
     deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
     Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
             "NoOpAnnotatorQueueLongDelay");
-    appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+    appCtx.put(UimaAsynchronousEngine.Timeout, 300);
     initialize(uimaAsEngine, appCtx);
     waitUntilInitialized();
-
-    for (int i = 0; i < 1; i++) {
+    Object o = new Object();
+    for (int i = 0; i < 6; i++) {
       CAS cas = uimaAsEngine.getCAS();
       cas.setDocumentText("Some Text");
  //     System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
       uimaAsEngine.sendCAS(cas);
+      synchronized(o) {
+    	  o.wait(1000);
+      }
     }
     
     uimaAsEngine.collectionProcessingComplete();
@@ -1882,19 +2334,26 @@ public class TestUimaASExtended extends
   public void testDeployAggregateService() throws Exception {
     System.out.println("-------------- testDeployAggregateService -------------");
     BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+    
+    
+ //   System.setProperty("BrokerURL", "tcp::/localhost:61616");
+
+    
     System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
     deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
     deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
     
     Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
+//   		Map<String, Object> appCtx = buildContext("tcp://localhost:61616",
             "TopLevelTaeQueue");
-    appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+    appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
     appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
     
     addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); 
     
-    runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
-            10, PROCESS_LATCH);
+//    runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
+    runTest(appCtx, eeUimaEngine, "tcp://localhost:61616", "TopLevelTaeQueue",
+            1, PROCESS_LATCH);
   }
   /**
    * Sends total of 10 CASes to async aggregate configured to process 2 CASes at a time.
@@ -2257,7 +2716,7 @@ public class TestUimaASExtended extends
 	    System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
 	    deployService(eeUimaEngine, relativePath + "/Deploy_ComplexAggregateWithInnerUimaAggregateCM.xml");
 	    runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
-	            10, PROCESS_LATCH);
+	            1, PROCESS_LATCH);
 	  }
 
   /**
@@ -2932,23 +3391,50 @@ public class TestUimaASExtended extends
   @Test
   public void testClientWithAggregateMultiplier() throws Exception {
     System.out.println("-------------- testClientWithAggregateMultiplier -------------");
+    System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
     BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
     deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
+    broker.stop();
+    broker.waitUntilStopped();
+
+    //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+    //broker2 = setupSecondaryBroker(true);
+    broker = createBroker();
+    broker.start();
+    broker.waitUntilStarted();
+
     deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
     deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml");
 
-    Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
-            "TopLevelTaeQueue");
+    String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+    Map<String, Object> appCtx = 
+    buildContext(burl, "TopLevelTaeQueue");
+
+//    Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
+  //          "TopLevelTaeQueue");
+
+broker.stop();
+broker.waitUntilStopped();
+
+//System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+//broker2 = setupSecondaryBroker(true);
+broker = createBroker();
+broker.start();
+broker.waitUntilStarted();
+
 
     // reduce the cas pool size and reply window
     appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
     appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
-    runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
+//    runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
+    runTest(appCtx, eeUimaEngine,burl,
             "TopLevelTaeQueue", 1, PROCESS_LATCH);
   }
   @Test
   public void testClientProcessWithRemoteMultiplier() throws Exception {
     System.out.println("-------------- testClientProcessWithRemoteMultiplier -------------");
+   
     BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
     deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
 

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java Fri Oct 14 19:30:40 2016
@@ -353,8 +353,10 @@ public class ActiveMQSupport extends Tes
     System.clearProperty("BrokerURL");
   
     wait(3000);
-    cleanBroker(broker);
-    stopBroker();
+    if ( !broker.isStopped()) {
+        cleanBroker(broker);
+        stopBroker();
+    }
   }
   
   public class UimaASErrorHandler implements ErrorHandler {

Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml (added)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml Fri Oct 14 19:30:40 2016
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+   <!--
+   ***************************************************************
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+         *
+   *   http://www.apache.org/licenses/LICENSE-2.0
+   * 
+   * Unless required by applicable law or agreed to in writing,
+   * software distributed under the License is distributed on an
+   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   * KIND, either express or implied.  See the License for the
+   * specific language governing permissions and limitations
+   * under the License.
+   ***************************************************************
+   -->
+<analysisEngineDeploymentDescription 
+  xmlns="http://uima.apache.org/resourceSpecifier">
+  
+  <name>NoOp Annotator A</name>
+  <description>Deploys NoOp Annotator Primitive AE</description>
+  
+  <deployment protocol="jms" provider="activemq">
+    <casPool numberOfCASes="5" initialFsHeapSize="500"/> 
+    <service>
+      <inputQueue endpoint="NoOpAnnotatorQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
+      <topDescriptor>
+       <import location="../descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml"/> 
+      </topDescriptor>
+      <analysisEngine>
+      <asyncPrimitiveErrorConfiguration>
+           <!-- <processCasErrors thresholdCount="4" thresholdWindow="10" thresholdAction="terminate" /> -->
+           <collectionProcessCompleteErrors additionalErrorAction="terminate" />
+      </asyncPrimitiveErrorConfiguration>
+      </analysisEngine>
+    
+      
+    </service>
+  </deployment>
+
+</analysisEngineDeploymentDescription>
+

Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml (added)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml Fri Oct 14 19:30:40 2016
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+  <!--
+   ***************************************************************
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+         *
+   *   http://www.apache.org/licenses/LICENSE-2.0
+   * 
+   * Unless required by applicable law or agreed to in writing,
+   * software distributed under the License is distributed on an
+   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   * KIND, either express or implied.  See the License for the
+   * specific language governing permissions and limitations
+   * under the License.
+   ***************************************************************
+   -->
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier">
+  <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+  <primitive>true</primitive>
+  <annotatorImplementationName>org.apache.uima.ae.noop.NoOpAnnotator</annotatorImplementationName>
+  <analysisEngineMetaData>
+    <name>NoOp Annotator</name>
+    <description>Annotator That Does Nothin</description>
+    <version>1.0</version>
+    <vendor>The Apache Software Foundation</vendor>
+    
+    <configurationParameters>
+          <configurationParameter>
+        <name>ErrorFrequency</name>
+        <description>Frequency of Generated Errors</description>
+        <type>Integer</type>
+        <multiValued>false</multiValued>
+        <mandatory>true</mandatory>
+      </configurationParameter>
+
+      <configurationParameter>
+        <name>ProcessDelay</name>
+        <description>Process Delay</description>
+        <type>Integer</type>
+        <multiValued>false</multiValued>
+        <mandatory>true</mandatory>
+      </configurationParameter>
+
+    
+    </configurationParameters>
+    
+    <configurationParameterSettings>
+          <nameValuePair>
+        <name>ErrorFrequency</name>
+        <value>
+          <integer>0</integer>
+        </value>
+      </nameValuePair>
+
+          <nameValuePair>
+        <name>ProcessDelay</name>
+        <value>
+          <integer>30000</integer>
+        </value>
+      </nameValuePair>
+    
+    </configurationParameterSettings>
+    
+    
+    
+    <typeSystemDescription>
+    </typeSystemDescription>
+    
+    <capabilities>
+    </capabilities>
+	
+    <operationalProperties>
+		<modifiesCas>true</modifiesCas>
+		<multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+		<outputsNewCASes>false</outputsNewCASes>
+	</operationalProperties>	  
+  </analysisEngineMetaData>
+</analysisEngineDescription>
+

Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml (added)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml Fri Oct 14 19:30:40 2016
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+	<!--
+	 ***************************************************************
+	 * Licensed to the Apache Software Foundation (ASF) under one
+	 * or more contributor license agreements.  See the NOTICE file
+	 * distributed with this work for additional information
+	 * regarding copyright ownership.  The ASF licenses this file
+	 * to you under the Apache License, Version 2.0 (the
+	 * "License"); you may not use this file except in compliance
+	 * with the License.  You may obtain a copy of the License at
+         *
+	 *   http://www.apache.org/licenses/LICENSE-2.0
+	 * 
+	 * Unless required by applicable law or agreed to in writing,
+	 * software distributed under the License is distributed on an
+	 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+	 * KIND, either express or implied.  See the License for the
+	 * specific language governing permissions and limitations
+	 * under the License.
+	 ***************************************************************
+   -->
+   
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier">
+  <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+  <primitive>false</primitive>
+  <delegateAnalysisEngineSpecifiers>
+    
+    <delegateAnalysisEngine key="TestMultiplier">
+      <import location="../multiplier/SimpleCasGeneratorProducing1000Cases.xml"/>
+    </delegateAnalysisEngine>
+
+
+      <delegateAnalysisEngine key="NoOp">
+      <import location="NoOpAnnotatorWith30SecDelay.xml"/>
+    </delegateAnalysisEngine>
+  
+  </delegateAnalysisEngineSpecifiers>
+  <analysisEngineMetaData>
+    <name>Test Aggregate TAE</name>
+    <description>Detects Nothing</description>
+    <configurationParameters/>
+    <configurationParameterSettings/>
+    <flowConstraints>
+      <fixedFlow>
+      
+        <node>TestMultiplier</node>
+        <node>NoOp</node> 
+      </fixedFlow>
+    </flowConstraints>
+    <capabilities>
+      <capability>
+        <inputs/>
+        <outputs>
+        </outputs>
+        <languagesSupported>
+          <language>en</language>
+        </languagesSupported>
+      </capability>
+    </capabilities>
+	<operationalProperties>
+		<modifiesCas>true</modifiesCas>
+		<multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+		<outputsNewCASes>true</outputsNewCASes>
+	</operationalProperties>
+  </analysisEngineMetaData>
+</analysisEngineDescription>

Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Fri Oct 14 19:30:40 2016
@@ -145,9 +145,9 @@ public abstract class BaseAnalysisEngine
 
   protected long errorCount = 0;
 
-  protected List inputChannelList = new ArrayList();
+  protected List<InputChannel> inputChannelList = new ArrayList<InputChannel>();
 
-  protected ConcurrentHashMap inputChannelMap = new ConcurrentHashMap();
+  protected ConcurrentHashMap<String, InputChannel> inputChannelMap = new ConcurrentHashMap<String, InputChannel>();
 
   private UimaEEAdminContext adminContext;
 
@@ -1088,7 +1088,7 @@ public abstract class BaseAnalysisEngine
   public void addInputChannel(InputChannel anInputChannel) {
     if (!inputChannelMap.containsKey(anInputChannel.getInputQueueName())) {
       inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
-      if (inputChannelList.contains(anInputChannel)) {
+      if (!inputChannelList.contains(anInputChannel)) {
         inputChannelList.add(anInputChannel);
       }
     }
@@ -2410,13 +2410,34 @@ public abstract class BaseAnalysisEngine
     return null;
   }
 
-  public InputChannel getReplyInputChannel(String aDelegateKey) {
-    for (int i = 0; inputChannelList != null && i < inputChannelList.size(); i++) {
-      if (((InputChannel) inputChannelList.get(i)).isFailed(aDelegateKey)) {
-        return (InputChannel) inputChannelList.get(i);
+//  public InputChannel getReplyInputChannel(String aDelegateKey) {
+  public InputChannel getReplyInputChannel(String aDestination) {
+	  InputChannel IC = null;
+	  if ( inputChannelMap.containsKey(aDestination) ) {
+		  return inputChannelMap.get(aDestination);
+	  }
+/*
+	  for( InputChannel inputChannel : inputChannelList) {
+//		  if ( inputChannel.get)
+	    	if ( inputChannel.isFailed(aDelegateKey)) {
+	    		System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Found InputChannel for Delegate:"+aDelegateKey+" hashCode="+inputChannel.hashCode());
+	    		IC = inputChannel;
+	      }
+			System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Next Input Channel - hashcode="+inputChannel.hashCode());
+
+	  }
+	  */
+/*
+	  for (int i = 0; inputChannelList != null && i < inputChannelList.size(); i++) {
+
+    	if (((InputChannel) inputChannelList.get(i)).isFailed(aDelegateKey)) {
+    		System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Found InputChannel for Delegate:"+aDelegateKey);
+    		return (InputChannel) inputChannelList.get(i);
       }
+		System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Next Input Channel - hashcode="+);
     }
-    return null;
+    */
+    return IC;
 
   }
 

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Fri Oct 14 19:30:40 2016
@@ -1034,6 +1034,11 @@ public class PrimitiveAnalysisEngineCont
             
               //	Check for delivery failure. The client may have terminated while an input CAS was being processed
             if ( childCasStateEntry.deliveryToClientFailed() ) {
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
+                            UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delivery_to_client_failed_INFO",
+                            new Object[] { getComponentName(), aCasReferenceId });
+              }
               clientUnreachable = true;
               if ( cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) {
               	  cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId());

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Fri Oct 14 19:30:40 2016
@@ -522,7 +522,10 @@ public class ProcessResponseHandler exte
                         getController().getEventListener());
               } else {
                 // Callback to notify that the cache is empty
-                getController().getEventListener().onCacheEmpty();
+            	  
+            	  // !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF CAS IS ABORTED?
+            	  // !!!!!!!!!!!!!!!!!!!!!! ?????????????????????????????????
+//                getController().getEventListener().onCacheEmpty();
               }
             }
 

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Fri Oct 14 19:30:40 2016
@@ -266,3 +266,6 @@ UIMAEE_service_warmup_start_INFO = Servi
 UIMAEE_service_warmup_success_INFO = Service: {0} Thread: {1} WarmUp Has Finished Successfully - Processed: {2} CASes - Time Spent Warming Up: {3} secs- Ready For Processing
 UIMAEE_warmup_dropping_cas__FINE = Aggregate Warmup Stage - Dropping CAS:{0} Processing took {1}
 UIMAEE_warmup_start_cas__FINE = Aggregate Warmup Stage - Processing CAS id:{0}
+UIMAEE_delivery_to_client_failed_INFO = Service:{0} Unable to Deliver CAS:{1} to Client - Dropping CAS
+UIMAEE_unable_to_deliver_msg__INFO=Service:{0} JMS unable to Deliver CAS:{1} - Error:{2}
+UIMAEE_force_cas_abort__INFO="Service:{0} Forcing {1} CAS:{1} to Abort
\ No newline at end of file

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri Oct 14 19:30:40 2016
@@ -21,6 +21,8 @@ package org.apache.uima.adapter.jms.clie
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
@@ -41,6 +43,7 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -49,6 +52,8 @@ import javax.jms.ObjectMessage;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.DestinationDoesNotExistException;
+import org.apache.activemq.transport.TransportListener;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMARuntimeException;
 import org.apache.uima.UIMA_IllegalStateException;
@@ -1498,8 +1503,15 @@ public abstract class BaseUIMAAsynchrono
       TextMessage msg = createTextMessage();
       msg.setText("");
       setReleaseCASMessage(msg, casReferenceId);
-      // Create Message Producer for the Destination
-      MessageProducer msgProducer = getMessageProducer(freeCASNotificationDestination);
+  	MessageProducer msgProducer = null;
+  	try {
+        // Create Message Producer for the Destination
+        msgProducer = getMessageProducer(freeCASNotificationDestination);
+  		
+  	} catch( DestinationDoesNotExistException ee) {
+  		
+  	}
+
       if (msgProducer != null) {
         try {
           // Send FreeCAS message to a Cas Multiplier
@@ -3136,10 +3148,13 @@ public abstract class BaseUIMAAsynchrono
       //System.out.println("------------------------ stop2? "+stop);
       //  This loop attempts to recover broker connection every 5 seconds and ends when all clients 
       //  using this shared object terminate or a connection is recovered
+      boolean log = true;
       while( !stop ) {
-          //System.out.println("------------------------ clientList.size()- "+clientList.size());
         if ( clientList.size() == 0 ) {
           break; // no more active clients - break out of connection recovery
+        } else {
+        	BaseUIMAAsynchronousEngineCommon_impl c =
+        			clientList.get(0);
         }
         try {
           //  Attempt a new connection to a broker
@@ -3154,8 +3169,22 @@ public abstract class BaseUIMAAsynchrono
           }
           break;
         } catch( Exception e) {
-        	e.printStackTrace();
-          synchronized( stateMonitor ) {
+        	
+    		if ( log ) {
+                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "retryConnectionUntilSuccessfull",
+                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_retry__INFO",
+                          new Object[] { brokerURL });
+                }
+        		if ( e instanceof JMSException && e.getMessage().endsWith("Connection refused") ) {
+        			log = false;
+            		System.out.println("Uima AS Client:"+e.getMessage()+" Retrying every 5 seconds until successfull");
+                    
+        	    } else {
+            	    e.printStackTrace();
+        	    }
+    		}
+           synchronized( stateMonitor ) {
             try {
               stateMonitor.wait(5000); // retry every 5 secs
             } catch( InterruptedException ie) {}
@@ -3264,6 +3293,33 @@ public abstract class BaseUIMAAsynchrono
         return false;
       }
   }
+  public static class UimaAsTransportListener implements TransportListener {
+
+	@Override
+	public void onCommand(Object arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void onException(IOException arg0) {
+		System.out.println("!!!!!!!!!!!!!!!!!! UimaAsTransportListener.onException() - lost connectipon to broker");
+		
+	}
+
+	@Override
+	public void transportInterupted() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void transportResumed() {
+		// TODO Auto-generated method stub
+		
+	}
+	  
+  }
   public static class UimaASShutdownHook implements Runnable {
     UimaAsynchronousEngine asEngine=null;
     public UimaASShutdownHook( UimaAsynchronousEngine asEngine) {

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java Fri Oct 14 19:30:40 2016
@@ -174,7 +174,7 @@ public class ClientServiceDelegate exten
                       try {
                         clientUimaAsEngine.handleException(new UimaASProcessCasTimeout("Service Not Responding to Ping - CAS:"+de.getCasReferenceId(), new UimaASPingTimeout("Forced Timeout on CAS in PendingDispatch list. The CAS Has Not Been Dispatched since the Service Appears to be Unavailable")), de.getCasReferenceId(), null,cachedRequest, !cachedRequest.isSynchronousInvocation(), false);
                       } catch( Exception ex) {
-                        ex.printStackTrace();
+                        //ex.printStackTrace();
                       }
                     }
                     if ( clientUimaAsEngine.running ) {

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Fri Oct 14 19:30:40 2016
@@ -244,4 +244,7 @@ UIMAJMS_retrying_jms_connection__WARNING
 UIMAJMS_service_recvd_new_message__FINE = > service recvd CAS RefId: {0}
 UIMAJMS_sent_ack_message__FINE = < service sent ACK for CAS RefId: {0}
 UIMAJMS_received_service_info_FINEST = Received ServiceInfo message from {0}
-UIMAJMS_debug_msg__FINEST={0}
\ No newline at end of file
+UIMAJMS_debug_msg__FINEST={0}
+UIMAJMS_temp_destination_not_available_retrying__INFO=Service:{0} Unable to refresh temp destination - retrying in {1} seconds until success ...
+UIMAJMS_temp_destination_available__INFO=Service:{0} succesfully refreshed temp destination:{1} - FreeCas Queue:{2}
+UIMAJMS_client_connection_retry__INFO=UIMA-AS Client Unable to Connect to Broker:{0} - Retrying Until Success ...
\ No newline at end of file