You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/22 03:55:35 UTC

svn commit: r1073208 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Author: cwiklik
Date: Tue Feb 22 02:55:34 2011
New Revision: 1073208

URL: http://svn.apache.org/viewvc?rev=1073208&view=rev
Log:
UIMA-2065 Added shutdown hook to enable clean shutdown

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1073208&r1=1073207&r2=1073208&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Tue Feb 22 02:55:34 2011
@@ -43,7 +43,9 @@ import javax.naming.InitialContext;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -225,16 +227,27 @@ public class BaseUIMAAsynchronousEngine_
   }
 
 	public void stop() {
-		if (!running) {
-			return;
-		}
-		super.stop();
 		synchronized (connectionMux) {
-			running = false;
+      super.doStop();
+      if (!running) {
+        return;
+      }
+      running = false;
 			if (super.serviceDelegate != null) {
 				// Cancel all timers and purge lists
 				super.serviceDelegate.cleanup();
 			}
+      if (sender != null) {
+        sender.doStop();
+      }
+      if (initialized) {
+        try {
+           consumerSession.close();
+           ((ActiveMQMessageConsumer)consumer).stop();
+           consumer.close();
+        } catch (Exception exx) {}
+      }
+
 			try {
 				// SharedConnection object manages a single JMS connection to
 				// the broker. If the client is scaled out in the same JVM, the
@@ -267,9 +280,6 @@ public class BaseUIMAAsynchronousEngine_
 				} finally {
 					sharedConnectionSemaphore.release();
 				}
-				if (sender != null) {
-					sender.doStop();
-				}
 				// Undeploy all containers
 				undeploy();
 				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
@@ -279,13 +289,6 @@ public class BaseUIMAAsynchronousEngine_
 							JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
 							"UIMAJMS_undeployed_containers__INFO");
 				}
-				if (initialized) {
-					try {
-						consumerSession.close();
-						consumer.close();
-					} catch (JMSException exx) {
-					}
-				}
 				// unregister client
 				if (jmxManager != null) {
 					jmxManager.unregisterMBean(clientJmxObjectName);
@@ -543,6 +546,10 @@ public class BaseUIMAAsynchronousEngine_
    */
   public synchronized void initialize(Map anApplicationContext)
           throws ResourceInitializationException {
+    // Add ShutdownHook to make sure the connection to the
+    // broker is always closed on process exit.
+    Runtime.getRuntime().addShutdownHook(
+            new Thread(new UimaASShutdownHook(this)));
     // Check the version of uimaj that UIMA AS was built with, against the UIMA Core version. If not the same throw Exception
     if (!UimaAsVersion.getUimajFullVersionString().equals(UimaVersion.getFullVersionString())) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1073208&r1=1073207&r2=1073208&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Feb 22 02:55:34 2011
@@ -29,6 +29,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
@@ -195,6 +197,8 @@ public abstract class BaseUIMAAsynchrono
 
   protected static SharedConnection sharedConnection = null;
 
+  private ExecutorService exec = Executors.newFixedThreadPool(1);
+  
   abstract public String getEndPointName() throws Exception;
 
   abstract protected TextMessage createTextMessage() throws Exception;
@@ -409,7 +413,7 @@ public abstract class BaseUIMAAsynchrono
     }
   }
 
-  public void stop() {
+  public void doStop() {
     synchronized (stopMux) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
@@ -420,7 +424,8 @@ public abstract class BaseUIMAAsynchrono
         return;
       }
 
-      running = false;
+      exec.shutdownNow();
+      
       casQueueProducerReady = false;
       if (serviceDelegate != null) {
         serviceDelegate.cancelDelegateTimer();
@@ -509,12 +514,12 @@ public abstract class BaseUIMAAsynchrono
             // Every thread requesting a CAS adds an entry to this
             // queue.
             CasQueueEntry entry = threadQueue.take();
-            if (!running) {
-              return; // client API has been stopped
-            }
             CAS cas = null;
             long startTime = System.nanoTime();
             // Wait for a free CAS instance
+            if (!running || asynchManager == null) {
+              return; // client API has been stopped
+            }
             if (remoteService) {
               cas = asynchManager.getNewCas("ApplicationCasPoolContext");
             } else {
@@ -1312,8 +1317,9 @@ public abstract class BaseUIMAAsynchrono
     if (exception != null && cachedRequest != null) {
       cachedRequest.setException(exception);
       if (exception instanceof AnalysisEngineProcessException
-              || (exception.getCause() != null && (exception.getCause() instanceof AnalysisEngineProcessException || exception
-                      .getCause() instanceof ServiceShutdownException))) {
+              || (exception.getCause() != null && 
+                      (exception.getCause() instanceof AnalysisEngineProcessException || 
+                       exception.getCause() instanceof ServiceShutdownException))) {
         // Indicate that this is a process exception.
         cachedRequest.setProcessException();
       }
@@ -1649,58 +1655,78 @@ public abstract class BaseUIMAAsynchrono
    * Listener method receiving JMS Messages from the response queue.
    * 
    */
-  public void onMessage(Message message) {
-    try {
-
+  public void onMessage(final Message message) {
+    // Process message in a separate thread. Previously the message was processed in ActiveMQ dispatch thread.
+    // This onMessage() method is called by ActiveMQ code from a critical region protected with a lock. The lock 
+    // is only released if this method returns. Running in a dispatch thread caused a hang when an application
+    // decided to call System.exit() in any of its callback listener methods. The UIMA AS client adds a 
+    // ShutdownHoook to the JVM to enable orderly shutdown which includes stopping JMS Consumer, JMS Producer
+    // and finally stopping JMS Connection. The ShutdownHook support was added to the client in case the 
+    // application doesnt call client's stop() method. Now, the hang was caused by the fact that the dispatch
+    // thread was used to call System.exit() which in turn executed client's ShutdownHook code. The ShutdownHook
+    // code runs in a separate thread, but the the JVM blocks the dispatch thread until the ShutdownHook 
+    // finishes. It never will though, since the ShutdownHook is calling ActiveMQSession.close() which tries to enter
+    // the same critical region that the dispatch thread is still stuck into. DEADLOCK.
+    // The code below uses a simple FixedThreadPool Executor with a single thread. This thread is reused instead
+    // creating one on the fly.
+    exec.execute( new Runnable() {
       
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
-                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST",
-                new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
-      }
-      if (!message.propertyExists(AsynchAEMessage.Command)) {
-        return;
-      }
+      public void run() {
+        try {
+
+          
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
+                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST",
+                    new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+          }
+          if (!message.propertyExists(AsynchAEMessage.Command)) {
+            return;
+          }
+
+          int command = message.getIntProperty(AsynchAEMessage.Command);
+          if (AsynchAEMessage.CollectionProcessComplete == command) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE",
+                      new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+            }
+            handleCollectionProcessCompleteReply(message);
+          } else if (AsynchAEMessage.GetMeta == command) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
+                      new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+            }
+            handleMetadataReply(message);
+          } else if (AsynchAEMessage.Process == command) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE",
+                      new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+            }
+            handleProcessReply(message, true, null);
+          } else if (AsynchAEMessage.ServiceInfo == command) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+                      "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                      "UIMAJMS_received_service_info_FINEST",
+                      new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+            }
+            handleServiceInfo(message);
+          }
+        } catch (Exception e) {
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                    "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAEE_exception__WARNING", e);
+          }
 
-      int command = message.getIntProperty(AsynchAEMessage.Command);
-      if (AsynchAEMessage.CollectionProcessComplete == command) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE",
-                  new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
-        }
-        handleCollectionProcessCompleteReply(message);
-      } else if (AsynchAEMessage.GetMeta == command) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
-                  new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
-        }
-        handleMetadataReply(message);
-      } else if (AsynchAEMessage.Process == command) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE",
-                  new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
-        }
-        handleProcessReply(message, true, null);
-      } else if (AsynchAEMessage.ServiceInfo == command) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
-                  "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_received_service_info_FINEST",
-                  new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
         }
-        handleServiceInfo(message);
-      }
-    } catch (Exception e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
-                "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
+        
       }
+    });
 
-    }
   }
 
   /**
@@ -2410,7 +2436,11 @@ public abstract class BaseUIMAAsynchrono
               JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING",
               new Object[] { aDestination, aFailure });
     }
-    stop();
+    try {
+      stop();
+    } catch( Exception e) {
+      e.printStackTrace();
+    }
   }
 
   /**
@@ -2609,11 +2639,17 @@ public abstract class BaseUIMAAsynchrono
       synchronized(destroyMux) {
         //  Check if all clients have terminated and only than stop the shared connection
         if (getClientCount() == 0 && connection != null
-                && !((ActiveMQConnection) connection).isClosed()) {
+                && !((ActiveMQConnection) connection).isClosed() 
+                && !((ActiveMQConnection) connection).isClosing()) {
           try {
             stop = true;
             connection.stop();
             connection.close();
+            while( !((ActiveMQConnection) connection).isClosed() ) {
+              try {
+                destroyMux.wait(100);
+              } catch( InterruptedException exx) {}
+            }
           } catch (Exception e) {
             /* ignore */
           }
@@ -2636,4 +2672,20 @@ public abstract class BaseUIMAAsynchrono
       }
     }
   }
+  public class UimaASShutdownHook implements Runnable {
+    UimaAsynchronousEngine asEngine=null;
+    public UimaASShutdownHook( UimaAsynchronousEngine asEngine) {
+      this.asEngine = asEngine;
+    }
+    public void run() {
+      try {
+        if ( asEngine != null ) {
+          asEngine.stop();
+        } 
+      } catch( Exception ex) {
+        ex.printStackTrace();
+      }
+    }
+    
+  }
 }