You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/10/22 18:22:05 UTC

svn commit: r707119 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq: JmsEndpointConnection_impl.java JmsInputChannel.java JmsOutputChannel.java UimaDefaultMessageListenerContainer.java

Author: eae
Date: Wed Oct 22 09:22:04 2008
New Revision: 707119

URL: http://svn.apache.org/viewvc?rev=707119&view=rev
Log: (empty)

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=707119&r1=707118&r2=707119&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Oct 22 09:22:04 2008
@@ -45,6 +45,8 @@
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.error.AsynchAEException;
@@ -53,6 +55,7 @@
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.util.Level;
+import org.springframework.jms.JmsException;
 import org.springframework.util.Assert;
 
 public class JmsEndpointConnection_impl implements ConsumerListener
@@ -93,6 +96,10 @@
 
 	private boolean isReplyEndpoint;
 	
+	private volatile boolean  failed = false;
+	
+	private Object recoveryMux = new Object();
+	
 	public JmsEndpointConnection_impl(Map aConnectionMap, Endpoint anEndpoint)
 	{
 		connectionMap = aConnectionMap;
@@ -218,6 +225,9 @@
 		}
 		catch ( Exception e)
 		{
+      if ( e instanceof JMSException ) {
+        handleJmsException( (JMSException)e );
+      }
 			throw new AsynchAEException(e);
 		}
 		
@@ -471,83 +481,133 @@
 	{
 		String destinationName = "";
 
-		Exception lastException = null;
-		// Send a message to the destination. Retry 10 times if unable to send.
-		// After 10 tries give up and throw exception
-		for (int i = 0; i < 10; i++)
+		try
 		{
-			try
-			{
-				stopTimer();
+			  stopTimer();
 
-					if (conn == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
-					{
-						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
-						openChannel();
+				if ( failed || conn == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
+				{
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
+					openChannel();
+					// The connection has been successful. Now check if we need to create a new listener
+					// and a temp queue to receive replies. A new listener will be created only if the
+					// endpoint for the delegate is marked as FAILED. This will be the case if the listener
+					// on the reply queue for the endpoint has failed.
+					synchronized( recoveryMux ) {
+	          if ( controller instanceof AggregateAnalysisEngineController ) {
+	            // Using the queue name lookup the delegate key
+	            String key = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(delegateEndpoint.getEndpoint());
+	            if ( key != null && destination != null && !isReplyEndpoint ) {
+	              // The aggregate has a master list of endpoints which are typically cloned during processing
+	              // This object uses a copy of the master. When a listener fails, the status of the master
+	              // endpoint is changed. To check the status, fetch the master endpoint, check its status
+	              // and if marked as FAILED, create a new listener, a new temp queue and override this
+	              // object endpoint copy destination property. It *IS* a new replyTo temp queue.
+	              Endpoint masterEndpoint = ((AggregateAnalysisEngineController)controller).lookUpEndpoint(key, false);
+	              if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
+	                // Create a new Listener Object to receive replies
+	                createListener(key);
+	                destination = (Destination)masterEndpoint.getDestination();
+	                delegateEndpoint.setDestination(destination);              
+	                // Override the reply destination. A new listener has been created along with a new temp queue for replies.
+	                aMessage.setJMSReplyTo(destination);
+	              }
+	            }
+	          }
 					}
-					//	Send a reply to a queue provided by the client
-					if ( isReplyEndpoint && delegateEndpoint.getDestination() != null  )
+				}
+				//	Send a reply to a queue provided by the client
+				if ( isReplyEndpoint && delegateEndpoint.getDestination() != null  )
+				{
+					destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
+					if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
 					{
-						destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
-						if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
-						{
-	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
-						}
-						synchronized(producer)
-						{
-              producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
-						}
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
 					}
-					else
+					synchronized(producer)
 					{
-						destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
-            if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
-            {
-              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
-            }
-            synchronized(producer)
-            {
-              producer.send(aMessage);
-            }
+             producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
 					}
+				}
+				else
+				{
+					destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
+           if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
+           {
+             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
+           }
+           synchronized(producer)
+           {
+             producer.send(aMessage);
+           }
+				}
 					
 				if (startTimer)
 				{
 					startTimer(connectionCreationTimestamp);
 				}
-				lastException = null;
 				return true;
-			}
-			catch ( Exception e)
-			{
-				lastException = e;
-				//	If the controller has been stopped no need to send messages
-				if ( controller.isStopped())
-				{
-					return true;
-				}
-				else
-				{
-	        e.printStackTrace();
-	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_ableto_send_msg_INFO", new Object[] { controller.getComponentName(), destinationName, i+1, 10 });
-				}
-			}
-			try
+		}
+		catch ( Exception e)
+		{
+			//	If the controller has been stopped no need to send messages
+			if ( controller.isStopped())
 			{
-				wait(50);
+				return true;
 			}
-			catch ( Exception ex)
+			else
 			{
+			  if ( e instanceof JMSException ) {
+			    handleJmsException( (JMSException)e );
+			  }
+			    
+			  e.printStackTrace();
+	       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), e});
 			}
 		}
-		if ( lastException != null )
-		{
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), lastException});
-		}
 		stopTimer();
 		return false;
 	}
 
+	/**
+	 * This method is called during recovery of failed connection. It is only called if the endpoint
+	 * associated with a given delegate is marked as FAILED. It is marked that way when a listener
+	 * attached to the reply queue fails. This method creates a new listener and a new temp queue.
+	 * 
+	 * @param delegateKey
+	 * @throws Exception
+	 */
+	private void createListener(String delegateKey) throws Exception {
+    if ( controller instanceof AggregateAnalysisEngineController ) {
+      //  Fetch an InputChannel that handles messages for a given delegate
+      InputChannel iC = controller.getReplyInputChannel(delegateKey);
+      //  Create a new Listener, new Temp Queue and associate the listener with the Input Channel
+      iC.createListener(delegateKey);
+    }
+	}
+
+	private synchronized void handleJmsException( JMSException ex) {
+	    if ( failed ) {
+	      return;   // Already marked failed
+	    }
+	    failed = true;
+/*	  
+	    try {
+	    if ( controller instanceof AggregateAnalysisEngineController ) {
+        String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(delegateEndpoint.getEndpoint());
+	      if ( delegateEndpoint.getDestination() != null ) {
+	        InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(delegateEndpoint.getDestination().toString());
+	        iC.destroyListener(delegateEndpoint.getDestination().toString(), delegateKey);
+	      } else {
+	        InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(delegateEndpoint.getEndpoint());
+	        iC.destroyListener(delegateEndpoint.getEndpoint(), delegateKey);
+	      }
+	    }
+	  } catch( Exception e) {
+	    e.printStackTrace();
+	  }
+*/	  
+	}
 	public void onConsumerEvent(ConsumerEvent arg0)
 	{
 		if (controller != null)

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=707119&r1=707118&r2=707119&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Wed Oct 22 09:22:04 2008
@@ -22,22 +22,21 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
-import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 
-import mx4j.tools.adaptor.http.GetAttributeCommandProcessor;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
 import org.apache.uima.aae.error.InvalidMessageException;
 import org.apache.uima.aae.handler.Handler;
@@ -94,6 +93,9 @@
 	
 	private Object mux = new Object();
 	
+	private ConcurrentHashMap<String, UimaDefaultMessageListenerContainer> failedListenerMap = 
+	  new ConcurrentHashMap<String, UimaDefaultMessageListenerContainer>();
+	
 	public AnalysisEngineController getController()
 	{
 		return controller;
@@ -236,7 +238,8 @@
 			if ( command != AsynchAEMessage.Process && 
 				 command != AsynchAEMessage.GetMeta && 
 				 command != AsynchAEMessage.ReleaseCAS && 
-				 command != AsynchAEMessage.Stop && 
+         command != AsynchAEMessage.Stop && 
+         command != AsynchAEMessage.Ping && 
 				 command != AsynchAEMessage.CollectionProcessComplete )
 			{
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -270,7 +273,8 @@
 			int command = aMessage.getIntProperty(AsynchAEMessage.Command);
 			if ( command == AsynchAEMessage.GetMeta ||
 				 command == AsynchAEMessage.CollectionProcessComplete ||
-				 command == AsynchAEMessage.Stop ||
+         command == AsynchAEMessage.Stop ||
+         command == AsynchAEMessage.Ping ||
 				 command == AsynchAEMessage.ReleaseCAS)
 			{
 				//	Payload not included in GetMeta Request
@@ -316,7 +320,7 @@
 			//	Shutting down 
 			return true;
 		}
-		if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response )
+		if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response  )
 		{
 			String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
 			if (!getController().getInProcessCache().entryExists(casReferenceId))
@@ -404,8 +408,10 @@
 				return "CollectionProcessComplete";
 			case AsynchAEMessage.ReleaseCAS:
 				return "ReleaseCAS";
-			case AsynchAEMessage.Stop:
-				return "Stop";
+      case AsynchAEMessage.Stop:
+        return "Stop";
+      case AsynchAEMessage.Ping:
+        return "Ping";
 			}
 			
 		}
@@ -706,7 +712,11 @@
 	public String getInputQueueName()
 	{
 		if ( messageListener != null )
-			return messageListener.getDestinationName();//getEndpointName();
+		  if ( messageListener.getDestination() != null ) {
+		    return messageListener.getDestination().toString();
+		  } else {
+	      return messageListener.getDestinationName();//getEndpointName();
+		  }
 		else
 		{
 			return "";
@@ -777,4 +787,103 @@
   {
     return messageListener.getConcurrentConsumers();
   }
+
+  public void createListener( String aDelegateKey ) throws Exception {
+    
+    UimaDefaultMessageListenerContainer failedListener =
+      failedListenerMap.get(aDelegateKey);
+    UimaDefaultMessageListenerContainer newListener = 
+      new UimaDefaultMessageListenerContainer();
+    newListener.setConnectionFactory(failedListener.getConnectionFactory());
+    newListener.setMessageListener(this);
+    newListener.setController(getController());
+    
+    TempDestinationResolver resolver = new TempDestinationResolver();
+    resolver.setConnectionFactory((ActiveMQConnectionFactory)failedListener.getConnectionFactory());
+    newListener.setDestinationResolver(resolver);
+
+    org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor executor = new org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor();
+    executor.setCorePoolSize(failedListener.getConcurrentConsumers());
+    executor.setMaxPoolSize(failedListener.getMaxConcurrentConsumers());
+    executor.setQueueCapacity(failedListener.getMaxConcurrentConsumers());
+    executor.initialize();
+
+    newListener.setTaskExecutor(executor);
+    newListener.initialize();
+    newListener.start();
+    //  Wait until the resolver plugs in the destination
+    while (newListener.getDestination() == null) {
+      synchronized( newListener ) {
+        
+        System.out.println(".... Waiting For Destination ...");
+        newListener.wait(100);
+      }
+    }
+    newListener.afterPropertiesSet();
+    //  Get the endpoint object for a given delegate key from the Aggregate
+    Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false);
+    endpoint.setStatus(Endpoint.OK);
+    //  Override the reply destination. 
+    endpoint.setDestination(newListener.getDestination());
+    Object clone = ((Endpoint_impl) endpoint).clone();
+    newListener.setTargetEndpoint((Endpoint)clone);
+  }
+  
+  private UimaDefaultMessageListenerContainer getListenerForEndpoint( String anEndpointName ) {
+    for( int i=0; i < listenerContainerList.size(); i++ ) {
+      UimaDefaultMessageListenerContainer mListener = 
+        (UimaDefaultMessageListenerContainer) listenerContainerList.get(i);
+      if ( mListener.getDestination() != null && mListener.getDestination().toString().equals( anEndpointName)) {
+        return mListener;
+      }
+    }
+    return null;
+  }
+  public void destroyListener( String anEndpointName, String aDelegateKey ) {
+    final UimaDefaultMessageListenerContainer mListener = 
+      getListenerForEndpoint(anEndpointName);
+    if ( mListener == null ) {
+      System.out.println("--- Listener For Endpoint: "+aDelegateKey+" Not Found");
+      return;
+    }
+    
+    try {
+//      if ( messageListener.getDestination().toString().equals( anEndpointName)) {
+        System.out.println("++++ Stopping Listener ...");
+        mListener.stop();
+        System.out.println("++++ Destroying Listener ...");
+        new Thread() {
+          public void run() {
+            mListener.destroy();
+          }
+        };
+        while( mListener.isRunning());      
+        System.out.println("++++ Listener on Queue:"+anEndpointName+" Has Been Stopped...");
+        Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false);
+        endpoint.setStatus(Endpoint.FAILED);
+        if ( mListener.getConnectionFactory() != null) {
+          if ( getController() instanceof AggregateAnalysisEngineController ) {
+            if ( !failedListenerMap.containsKey(aDelegateKey )) {
+              failedListenerMap.put( aDelegateKey, mListener);
+              listenerContainerList.remove(mListener);
+              System.out.println("++++ Saving Connection Factory");
+            }
+          }
+        }
+      //}
+    } catch( Exception e) {
+      e.printStackTrace();
+    }
+  }
+  public boolean isFailed(String aDelegateKey) {
+    return failedListenerMap.containsKey(aDelegateKey);
+  }
+  public boolean isListenerForDestination( String anEndpointName) {
+    UimaDefaultMessageListenerContainer mListener = 
+      getListenerForEndpoint(anEndpointName);
+    if ( mListener == null ) {
+      return false;
+    }
+    return true;
+  }
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=707119&r1=707118&r2=707119&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Oct 22 09:22:04 2008
@@ -559,8 +559,6 @@
 	                        "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST",
 	                        new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS  });
 	        }
-	        
-	        
 	        //  Send process request to remote delegate and start timeout timer
 	        sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
 			  } else {

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=707119&r1=707118&r2=707119&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Wed Oct 22 09:22:04 2008
@@ -23,19 +23,21 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
-import org.apache.uima.aae.controller.Endpoint_impl;
 import org.apache.uima.aae.error.ErrorHandler;
 import org.apache.uima.aae.error.Threshold;
 import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
@@ -51,14 +53,16 @@
 	private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class;
 	private String destinationName="";
 	private Endpoint endpoint;
-	private boolean freeCasQueueListener;
+	private volatile boolean freeCasQueueListener;
 	private AnalysisEngineController controller;
-	private int retryCount = 2;
+	private volatile boolean failed = false;
+	private Object mux = new Object();
 	
 	public UimaDefaultMessageListenerContainer()
 	{
 		super();
-		setRecoveryInterval(60000);
+    setRecoveryInterval(5);
+//    setRecoveryInterval(60000);
 		setAcceptMessagesWhileStopping(false);
 		setExceptionListener(this);
 	}
@@ -71,6 +75,11 @@
 	{
 		controller = aController;
 	}
+	/**
+	 * 
+	 * @param t
+	 * @return
+	 */
 	private boolean disableListener( Throwable t)
 	{
 		System.out.println(t.toString());
@@ -79,119 +88,202 @@
 			return true;
 		return false;
 	}
+	/**
+	 * Stops this Listener
+	 */
+	private synchronized void handleListenerFailure() {
+    try {
+      if ( controller instanceof AggregateAnalysisEngineController ) {
+        String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
+        if ( endpoint.getDestination() != null ) {
+          InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(endpoint.getDestination().toString());
+          iC.destroyListener(endpoint.getDestination().toString(), delegateKey);
+        } else {
+          InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(endpoint.getEndpoint());
+          iC.destroyListener(endpoint.getEndpoint(), delegateKey);
+        }
+      }
+    } catch( Exception e) {
+      e.printStackTrace();
+    }
+  }
+	/**
+	 * Handles failure on a temp queue
+	 * @param t
+	 */
+  private void handleTempQueueFailure(Throwable t) {
+    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+            "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
+            new Object[] {  endpoint.getDestination(), getBrokerUrl(), t });
+    // Check if the failure is due to the failed connection. Spring (and ActiveMQ) dont seem to provide 
+    // the cause. Just the top level IllegalStateException with a text message. This is what we need to
+    //  check for.
+    if ( t instanceof javax.jms.IllegalStateException && t.getMessage().equals("The Consumer is closed")) {
+      if ( controller != null && controller instanceof AggregateAnalysisEngineController ) {
+        String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
+        try {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
+                  "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_listener_INFO",
+                  new Object[] {  controller.getComponentName(), endpoint.getDestination(),delegateKey });
+          // Stop current listener 
+          handleListenerFailure();
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
+                  "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO",
+                  new Object[] {  controller.getComponentName(), endpoint.getDestination() });
+        } catch ( Exception e ) {
+          e.printStackTrace();
+        }
+      }
+    } else if ( disableListener(t)) {
+      handleQueueFailure(t);
+//      terminate(t);
+    }
+  }
+  
+  private ErrorHandler fetchGetMetaErrorHandler() {
+    ErrorHandler handler = null;
+    Iterator it = controller.getErrorHandlerChain().iterator();
+    //  Find the error handler for GetMeta in the Error Handler List provided in the 
+    //  deployment descriptor
+    while ( it.hasNext() )
+    {
+      handler = (ErrorHandler)it.next();
+      if ( handler instanceof GetMetaErrorHandler )
+      {
+        return handler;
+      }
+    }
+    return null;
+  }
+  /**
+   * Handles failures on non-temp queues
+   * @param t
+   */
+  private void handleQueueFailure(Throwable t) {
+    final String endpointName = 
+      (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); 
+    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+            "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
+            new Object[] {  endpointName, getBrokerUrl(), t });
+    boolean terminate = true;
+    //  Check if the failure is severe enough to disable this listener. Whether or not this listener is actully
+    //  disabled depends on the action associated with GetMeta Error Handler. If GetMeta Error Handler is
+    //  configured to terminate the service on failure, this listener will be terminated and the entire service
+    //  will be stopped.
+    if (  disableListener(t) ) {
+      endpoint.setReplyDestinationFailed();
+      //  If this is a listener attached to the Aggregate Controller, use GetMeta Error
+      //  Thresholds defined to determine what to do next after failure. Either terminate
+      //  the service or disable the delegate with which this listener is associated with
+      if ( controller != null && controller instanceof AggregateAnalysisEngineController )
+      {
+        ErrorHandler handler = fetchGetMetaErrorHandler();
+        //  Fetch a Map containing thresholds for GetMeta for each delegate. 
+        Map thresholds = handler.getEndpointThresholdMap(); 
+        //  Lookup delegate's key using delegate's endpoint name
+        String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
+        //  If the delegate has a threshold defined on GetMeta apply Action defined 
+        if ( delegateKey != null && thresholds.containsKey(delegateKey))
+        {
+          //  Fetch the Threshold object containing error configuration
+          Threshold threshold = (Threshold) thresholds.get(delegateKey);
+          //  Check if the delegate needs to be disabled
+          if (threshold.getAction().equalsIgnoreCase(ErrorHandler.DISABLE)) {
+            //  The disable delegate method takes a list of delegates
+            List list = new ArrayList();
+            //  Add the delegate to disable to the list
+            list.add(delegateKey);
+            try {
+              System.out.println(">>>> Controller:"+controller.getComponentName()+" Disabling Listener On Queue:"+endpoint.getEndpoint()+". Component's "+delegateKey+" Broker:"+getBrokerUrl()+" is Invalid");
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
+                          "handleQueueFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_bad_broker__INFO",
+                          new Object[] {  controller.getComponentName(), delegateKey, getBrokerUrl() });
+              //  Remove the delegate from the routing table. 
+              ((AggregateAnalysisEngineController) controller).disableDelegates(list);
+              terminate = false; //just disable the delegate and continue
+            } catch (Exception e) {
+              e.printStackTrace();
+              terminate = true;
+            }
+          }
+        }
+      }
+    }
+    System.out.println("****** Unable To Connect Listener To Broker:"+getBrokerUrl());
+    System.out.println("****** Closing Listener on Queue:"+endpoint.getEndpoint());
+    setRecoveryInterval(0);
+    
+    //  Spin a shutdown thread to terminate listener. 
+    new Thread() {
+      public void run()
+      {
+        try
+        {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+                      "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_disable_listener__WARNING",
+                      new Object[] {  endpointName, getBrokerUrl() });
+          
+          shutdown();
+        }
+        catch( Exception e) { e.printStackTrace();}
+      }
+    }.start();
+
+    if ( terminate )
+    {
+      terminate(t);
+    }
+
+  }
+  /**
+	 * This method is called by Spring when a listener fails
+	 */
 	protected void handleListenerSetupFailure( Throwable t, boolean alreadyHandled )
 	{
-		if ( !(t instanceof javax.jms.IllegalStateException ) )
-		{
-			t.printStackTrace();
-			final String endpointName = 
-				(getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); 
-				
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
-	                "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
-	                new Object[] {  endpointName, getBrokerUrl(), t });
-
-			boolean terminate = true;
-			if (  disableListener(t) )
-			{
-				if ( endpoint != null )
-				{
-					endpoint.setReplyDestinationFailed();
-					//	If this is a listener attached to the Aggregate Controller, use GetMeta Error
-					//	Thresholds defined to determine what to do next after failure. Either terminate
-					//	the service or disable the delegate with which this listener is associated with
-					if ( controller != null && controller instanceof AggregateAnalysisEngineController )
-					{
-						ErrorHandler handler = null;
-						Iterator it = controller.getErrorHandlerChain().iterator();
-						//	Find the error handler for GetMeta in the Error Handler List provided in the 
-						//	deployment descriptor
-						while ( it.hasNext() )
-						{
-							handler = (ErrorHandler)it.next();
-							if ( handler instanceof GetMetaErrorHandler )
-							{
-								break;
-							}
-						}
-						//	Fetch a Map containing thresholds for GetMeta for each delegate. 
-						java.util.Map thresholds = handler.getEndpointThresholdMap(); 
-						//	Lookup delegate's key using delegate's endpoint name
-						String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
-						//	If the delegate has a threshold defined on GetMeta apply Action defined 
-						if ( delegateKey != null && thresholds.containsKey(delegateKey))
-						{
-							//	Fetcg the Threshold object containing error configuration
-							Threshold threshold = (Threshold) thresholds.get(delegateKey);
-							//	Check if the delegate needs to be disabled
-							if (threshold.getAction().equalsIgnoreCase(ErrorHandler.DISABLE)) {
-								//	The disable delegate method takes a list of delegates
-								List list = new ArrayList();
-								//	Add the delegate to disable to the list
-								list.add(delegateKey);
-								try {
-									System.out.println(">>>> Controller:"+controller.getComponentName()+" Disabling Listener On Queue:"+endpoint.getEndpoint()+". Component's "+delegateKey+" Broker:"+getBrokerUrl()+" is Invalid");
-									UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
-							                "handleListenerSetupFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_bad_broker__INFO",
-							                new Object[] {  controller.getComponentName(), delegateKey, getBrokerUrl() });
-									//	Remove the delegate from the routing table. 
-									((AggregateAnalysisEngineController) controller).disableDelegates(list);
-								} catch (Exception e) {
-									e.printStackTrace();
-								}
-								terminate = false; //just disable the delegate and continue
-							}
-						}
-					}
-				}
-
-				System.out.println("****** Unable To Connect Listener To Broker:"+getBrokerUrl());
-				if ( endpoint != null )
-				{
-					System.out.println("****** Closing Listener on Queue:"+endpoint.getEndpoint());
-				}
-				setRecoveryInterval(0);
-				
-				//	Spin a shutdown thread to terminate listener. 
-				new Thread() {
-					public void run()
-					{
-						try
-						{
-							UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
-					                "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_disable_listener__WARNING",
-					                new Object[] {  endpointName, getBrokerUrl() });
-							
-							shutdown();
-						}
-						catch( Exception e) { e.printStackTrace();}
-					}
-				}.start();
-
-				if ( terminate )
-				{
-					// ****************************************
-					//	terminate the service
-					// ****************************************
-					System.out.println(">>>> Terminating Controller:"+controller.getComponentName()+" Unable To Initialize Listener Due to Invalid Broker URL:"+getBrokerUrl());
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
-			                "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_terminate_service_dueto_bad_broker__WARNING",
-			                new Object[] {  controller.getComponentName(), getBrokerUrl() });
-					controller.stop();
-					controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t));
-				}
-			}
-			else
-			{
-				super.handleListenerSetupFailure(t, false);
-			}
-		}
-		
-		
+	  // If controller is stopping not need to recover the connection
+	  if ( controller != null && controller.isStopped()) {
+	    return;
+	  }
+	  t.printStackTrace();
+
+	  
+	  if ( endpoint == null ) {
+	    
+      super.handleListenerSetupFailure(t, false);
+      terminate(t);
+      return;
+	  }
+ 
+	  synchronized( mux ) {
+	      if ( !failed ) {
+	        // Check if this listener is attached to a temp queue. If so, this is a listener
+	        // on a reply queue. Handle temp queue listener failure differently than an
+	        // input queue listener. 
+	        if ( endpoint.isTempReplyDestination()) {
+	          handleTempQueueFailure(t);
+	        } else {
+	          // Handle non-temp queue failure
+	          handleQueueFailure(t);
+	        }
+	      }
+	      failed = true;
+	    }
+	}
+
+	private void terminate(Throwable t) {
+    // ****************************************
+    //  terminate the service
+    // ****************************************
+    System.out.println(">>>> Terminating Controller:"+controller.getComponentName()+" Unable To Initialize Listener Due to Invalid Broker URL:"+getBrokerUrl());
+    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+                "terminate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_terminate_service_dueto_bad_broker__WARNING",
+                new Object[] {  controller.getComponentName(), getBrokerUrl() });
+    controller.stop();
+    controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t));
 	}
-
 	protected void handleListenerException( Throwable t )
 	{
+      t.printStackTrace();
       String endpointName = 
         (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); 
       
@@ -277,6 +369,13 @@
 		if ( endpoint != null)
 		{
 			endpoint.setDestination(aDestination);
+			if ( aDestination instanceof TemporaryQueue ) {
+			  endpoint.setTempReplyDestination(true);
+			  System.out.println("Resolver Plugged In a Temp Queue:"+aDestination);
+			  if ( getMessageListener() != null && getMessageListener() instanceof InputChannel ) {
+			    ((JmsInputChannel)getMessageListener()).setListenerContainer(this);
+			  }
+			}
 			endpoint.setServerURI(getBrokerUrl());
 		}
 	}
@@ -287,7 +386,8 @@
 
 	public void onException(JMSException arg0)
 	{
-       String endpointName = 
+	  arg0.printStackTrace();
+    String endpointName = 
       (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); 
     
     UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),