You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/04/10 16:06:19 UTC

svn commit: r763936 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java

Author: cwiklik
Date: Fri Apr 10 14:06:18 2009
New Revision: 763936

URL: http://svn.apache.org/viewvc?rev=763936&view=rev
Log:
UIMA-1109 Modified to support orderly shutdown of Uima AS service

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=763936&r1=763935&r2=763936&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Fri Apr 10 14:06:18 2009
@@ -25,11 +25,13 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.InputChannel;
@@ -387,17 +389,6 @@
 	}
 	public void abort()
 	{
-//		if ( messageListener != null )
-//		{
-//			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-//                    "abort", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_jms_transport__INFO",
-//                    new Object[] { endpointName });
-//			try
-//			{
-//				messageListener.shutdown();
-//			}
-//			catch( Exception e) {}
-//		}
 	}
 	private String decodeIntToString( String aTypeToDecode, int aValueToDecode )
 	{
@@ -690,7 +681,9 @@
 		this.messageListener = messageListener;
 		System.setProperty("BrokerURI", messageListener.getBrokerUrl());
 		brokerURL = messageListener.getBrokerUrl();
-		listenerContainerList.add(messageListener);
+		if ( !listenerContainerList.contains(messageListener) ) {
+			listenerContainerList.add(messageListener);
+		}
 		this.messageListener = messageListener;
 		if ( getController() != null )
 		{
@@ -771,43 +764,75 @@
 			((PrimitiveAnalysisEngineController)getController()).getServiceInfo().setBrokerURL(brokerURL);
 		}
 	}
-	public void stop() throws Exception
-	{
+  private void stopChannel(UimaDefaultMessageListenerContainer mL) throws Exception {
+    String eName = mL.getEndpointName();
+    if (eName != null) {
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_jms_transport__INFO",
+                new Object[] { eName });
+      }
+    }
+    mL.stop();
+    System.out.println("Stopped Channel:"+mL.getDestinationName());
+  }
+  private boolean doCloseChannel(UimaDefaultMessageListenerContainer mL, int channelsToClose) {
+    //  Check if we are closing just the input channel
+    if ( channelsToClose == InputChannel.InputChannels ) {
+      //  Fetch the listener object
+      ActiveMQDestination destination = (ActiveMQDestination)mL.getListenerEndpoint();
+      //  if this is a listener on a temp queue return false. We need to keep all temp
+      //  channels open to receive replies and/or notifications to free CASes
+      //  Keep the temp reply channel open
+      if ( destination != null && destination.isTemporary() ) {
+        return false;
+      }
+    }
+    return true;
+  }
+  public void stop() throws Exception {
+    stop(InputChannel.CloseAllChannels );
+    listenerContainerList.clear();
+    failedListenerMap.clear();
+  }
+	public synchronized void stop(int channelsToClose) throws Exception {
 		
-		for( int i=0; i < listenerContainerList.size(); i++ )
-		{
-			final UimaDefaultMessageListenerContainer mL =
-				(UimaDefaultMessageListenerContainer) listenerContainerList.get(i);
-			if ( mL != null && mL.isRunning() )
-			{
-				stopped = true;
-				String eName = mL.getEndpointName();
-				if ( eName != null )
-				{
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-		                    "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_jms_transport__INFO",
-		                    new Object[] { eName /* mL.getEndpointName()*/ /*endpointName */});
-          }
-				}
-				mL.closeConnection();
-				mL.stop();
-			}
-			else
-			{
-				if ( getController() != null )
-				{
+	  List<UimaDefaultMessageListenerContainer> listenersToRemove = 
+	    new ArrayList<UimaDefaultMessageListenerContainer>();
+	  for( Object listenerObject : listenerContainerList) {
+	    final UimaDefaultMessageListenerContainer mL = (UimaDefaultMessageListenerContainer)listenerObject; 
+	    if (mL != null && mL.isRunning() && doCloseChannel(mL, channelsToClose) ) {
+        System.out.println("JmsInputChannel.stop()-Stopping Input Channel:"+mL.getDestination()+" Selector:"+mL.getMessageSelector());
+        stopChannel(mL);
+        //  Just in case check if the container still in the list. If so, add it to 
+        //  another list that container listeners that have been stopped and need
+        //  to be removed from the listenerContainerList. Removing the listener from
+        //  the listenerContainerList in this iterator loop is not working. If for
+        //  example the iterator has two elements, after the first remove from the
+        //  listenerContainerList, the iterator stops event though there is still
+        //  one element left. Process removal of listeners outside of the iterator
+        //  loop
+        if ( listenerContainerList.contains(mL) ) {
+          listenersToRemove.add(mL);
+        }
+      } else {
+        if (getController() != null) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-		                    "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_transport_not_stopped__INFO",
-		                    new Object[] { getController().getComponentName() });
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
+                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_transport_not_stopped__INFO",
+                    new Object[] { getController().getComponentName() });
           }
-				}
-			}
+        }
+      }
 		}
-		messageListener = null;
-		controller = null;
-		handler = null;
+	  // Remove listeners from the listenerContainerList
+	  for( UimaDefaultMessageListenerContainer mL : listenersToRemove) {
+	    listenerContainerList.remove(mL);
+	  }
+	  listenersToRemove.clear();
+	  if ( channelsToClose == InputChannel.CloseAllChannels ) {
+      stopped = true;
+    }
 	}
 	public boolean isStopped()
 	{
@@ -826,12 +851,10 @@
       new UimaDefaultMessageListenerContainer();
     ActiveMQConnectionFactory f = new ActiveMQConnectionFactory(failedListener.getBrokerUrl());
     newListener.setConnectionFactory(f);
-//    newListener.setConnectionFactory(failedListener.getConnectionFactory());
     newListener.setMessageListener(this);
     newListener.setController(getController());
     
     TempDestinationResolver resolver = new TempDestinationResolver();
-//    resolver.setConnectionFactory((ActiveMQConnectionFactory)failedListener.getConnectionFactory());
     resolver.setConnectionFactory(f); 
     resolver.setListener(newListener);
     newListener.setDestinationResolver(resolver);
@@ -862,50 +885,75 @@
     Object clone = ((Endpoint_impl) endpoint).clone();
     newListener.setTargetEndpoint((Endpoint)clone);
   }
-  
-  private UimaDefaultMessageListenerContainer getListenerForEndpoint( String anEndpointName ) {
+  /**
+   * Given an endpoint name returns all listeners attached to this endpoint. There can be multiple
+   * listeners on an endpoint each with a different selector to receive targeted messages like GetMeta
+   * and Process.
+   * 
+   * @param anEndpointName - name of the endpoint that is used to find associated listener(s)
+   * 
+   * @return - list of listeners
+   */
+  private UimaDefaultMessageListenerContainer[] getListenersForEndpoint( String anEndpointName ) {
+    List<UimaDefaultMessageListenerContainer> listeners = new ArrayList<UimaDefaultMessageListenerContainer>();
     for( int i=0; i < listenerContainerList.size(); i++ ) {
       UimaDefaultMessageListenerContainer mListener = 
         (UimaDefaultMessageListenerContainer) listenerContainerList.get(i);
-      if ( mListener.getDestination() != null && mListener.getDestination().toString().equals( anEndpointName)) {
-        return mListener;
+      if ( mListener.getDestinationName() != null &&  mListener.getDestinationName().equals( anEndpointName)) {
+        listeners.add(mListener);
+      } else if ( mListener.getDestination() != null && mListener.getDestination().toString().equals( anEndpointName)) {
+        listeners.add(mListener);
       }
     }
+    if ( listeners.size() > 0 ) {
+      UimaDefaultMessageListenerContainer[] listenerArray = new UimaDefaultMessageListenerContainer[listeners.size()];
+      listeners.toArray(listenerArray);
+      return listenerArray;
+    }
     return null;
   }
-  public synchronized void destroyListener( String anEndpointName, String aDelegateKey ) {
-    final UimaDefaultMessageListenerContainer mListener = 
-      getListenerForEndpoint(anEndpointName);
-    if ( mListener == null ) {
-      System.out.println("--- Listener For Endpoint: "+aDelegateKey+" Not Found");
+  /**
+   * 
+   */
+  public  void destroyListener( final String anEndpointName, String aDelegateKey ) {
+    //  check if delegate listener has already been placed in the failed listeners list
+    //  If so, nothing else to do here
+    if ( failedListenerMap.containsKey(aDelegateKey) ){
       return;
     }
-    if ( !mListener.isRunning() ) {
-      return; // Already Stopped
+    //  Fetch all associated listeners. 
+    final UimaDefaultMessageListenerContainer[] mListeners = getListenersForEndpoint(anEndpointName);
+    if ( mListeners == null ) {
+      return;
     }
-    
-    try {
+    //  Stop each listener
+    for( final UimaDefaultMessageListenerContainer mListener : mListeners ) {
+      if ( !mListener.isRunning() ) {
+        continue; // Already Stopped
+      }
+
+      try {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) && mListener.getDestination() != null) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                    "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stop_listener__INFO",
-                    new Object[] { mListener.getDestination().toString() });
+                  "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stop_listener__INFO",
+                  new Object[] { mListener.getDestination().toString() });
         }
-        System.out.println("++++ Stopping Listener ...");
-        mListener.closeConnection();
-        mListener.stop();
-        System.out.println("++++ Destroying Listener ...");
-        new Thread() {
+        //  Spin a thread that will stop the listener and wait for its shutdown
+        Thread stopThread = new Thread("InputChannelStopThread") {
           public void run() {
-            mListener.destroy();
+            mListener.stop();
+            //  wait until the listener shutsdown
+            while( mListener.isRunning());      
+            System.out.println("Thread:"+Thread.currentThread().getId()+"++++ Listener on Queue:"+anEndpointName+" Has Been Stopped...");
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) && mListener.getDestination() != null) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                      "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO",
+                      new Object[] {controller.getComponentName(), mListener.getDestination().toString() });
+            }
           }
         };
-        while( mListener.isRunning());      
-        System.out.println("Thread:"+Thread.currentThread().getId()+"++++ Listener on Queue:"+anEndpointName+" Has Been Stopped...");
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) && mListener.getDestination() != null) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                    "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO",
-                    new Object[] {controller.getComponentName(), mListener.getDestination().toString() });
-        }
+        stopThread.start();
+
         if ( getController() != null ) {
           Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false);
           endpoint.setStatus(Endpoint.FAILED);
@@ -918,20 +966,22 @@
             }
           }
         }
-      //}
-    } catch( Exception e) {
-      e.printStackTrace();
+        //}
+      } catch( Exception e) {
+        e.printStackTrace();
+      }
     }
   }
   public boolean isFailed(String aDelegateKey) {
     return failedListenerMap.containsKey(aDelegateKey);
   }
   public boolean isListenerForDestination( String anEndpointName) {
-    UimaDefaultMessageListenerContainer mListener = 
-      getListenerForEndpoint(anEndpointName);
-    if ( mListener == null ) {
+    UimaDefaultMessageListenerContainer[] mListeners = 
+      getListenersForEndpoint(anEndpointName);
+    if ( mListeners == null ) {
       return false;
     }
     return true;
   }
+
 }