You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/11/02 18:21:11 UTC

svn commit: r1538234 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/ uimaj-as-core/src/main/java/org/apache/uima/aae/ uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ uimaj-as-jms/src/main/resou...

Author: cwiklik
Date: Sat Nov  2 17:21:10 2013
New Revision: 1538234

URL: http://svn.apache.org/r1538234
Log:
UIMA-3383 Refactored code related to handling of Quiesce request. Both 'q' and CTRL-C initiated quiesce now wait for notification from InProcessCache when it becomes empty. Only when the cache is empty the process is allowed to exit.

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java Sat Nov  2 17:21:10 2013
@@ -23,12 +23,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InvalidClassException;
-import java.net.Socket;
 
 import org.apache.uima.UIMAFramework;
-import org.apache.uima.UimaContext;
-import org.apache.uima.UimaContextAdmin;
-import org.apache.uima.aae.UimaASApplicationEvent;
 import org.apache.uima.aae.UimaASApplicationExitEvent;
 import org.apache.uima.aae.UimaAsVersion;
 import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -395,7 +391,7 @@ public class UIMA_Service implements App
     } else if ( event instanceof UimaASApplicationExitEvent ) {
     	System.out.println("Service Wrapper Received UimaASApplicationEvent. Message:"+event.getSource());
     } else if ( event instanceof ContextStoppedEvent ){ // Spring has been shutdown
-    	System.exit(0);
+    	
     }
   }
 
@@ -490,7 +486,6 @@ public class UIMA_Service implements App
   }
   
   static class ServiceShutdownHook extends Thread {
-
     public SpringContainerDeployer serviceDeployer;
 
     public ServiceShutdownHook(SpringContainerDeployer serviceDeployer) {
@@ -501,18 +496,19 @@ public class UIMA_Service implements App
       try {
       	AnalysisEngineController topLevelController = serviceDeployer.getTopLevelController();
       	if (topLevelController != null && !topLevelController.isStopped() ) {
-      	  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                 "run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                 "UIMAJMS_caught_signal__INFO", new Object[] { topLevelController.getComponentName() });
-    	    serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
-    	  }
+      	  serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+    	  } 
       } catch( Exception e) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                   "run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAJMS_exception__WARNING", e);
         }
-      }
+      } 
     }
+
   } 
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Sat Nov  2 17:21:10 2013
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.Semaphore;
 
 import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.EventSubscriber;
 import org.apache.uima.aae.error.AsynchAEException;
@@ -64,6 +65,18 @@ public class InProcessCache implements I
 
   int size = 0;
 
+  private BaseAnalysisEngineController controller;
+  
+  /**
+	  Register controller to call when the cache becomes empty.
+    This call is made when the controller enters quiesce
+    state. In this state the controller waits for 
+    the cache to send notification when all CASes have been 
+    processed.
+  **/
+  public void registerController(BaseAnalysisEngineController ctrl) {
+    controller = ctrl;
+  }
   public void registerCallbackWhenCacheEmpty(EventSubscriber aController) {
     registerCallbackWhenCacheEmpty(aController, 0);
   }
@@ -271,7 +284,10 @@ public class InProcessCache implements I
         ((EventSubscriber) callbackListeners.get(i)).onCacheEmpty();
       }
     }
-
+    if (cache.size() == 0 && controller != null ) {
+      // unblock the controller waiting in quiesceAndStop
+      controller.notifyOnCacheEmpty();
+    }
   }
 
   // never called 5/2013

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java Sat Nov  2 17:21:10 2013
@@ -50,7 +50,6 @@ public class AnalysisEngineInstancePoolW
 		  lock.acquireUninterruptibly();
 		  // Call destroy() on AE on checkin if the UIMA AS process is in quiesce mode  
 		  if ( destroyAEInstanceIfFree ) {
-		    System.out.println("........... AnalysisEngineInstancePool.checkin() - Thread:"+Thread.currentThread().getId()+" calling destroy() on AE checkin");
 		    anAnalysisEngine.destroy();
 		  } else {
 	      aeInstanceMap.put(Thread.currentThread().getId(), anAnalysisEngine);

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Sat Nov  2 17:21:10 2013
@@ -105,6 +105,9 @@ public abstract class BaseAnalysisEngine
   private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
   public static enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
   public static final boolean NO_RECOVERY = true;
+  // Semaphore use only when quiesceAndStop is called
+  // When the cache becomes empty the semaphore is released.
+  private Semaphore quiesceSemaphore = new Semaphore(0);
   
   protected ServiceState currentState = ServiceState.INITIALIZING;
   
@@ -1895,7 +1898,6 @@ public abstract class BaseAnalysisEngine
         }
       }
     }
-
     if (daemonServiceExecutor != null) {
       daemonServiceExecutor.shutdown();
     }
@@ -1907,8 +1909,8 @@ public abstract class BaseAnalysisEngine
     if (getOutputChannel() != null) {
       getOutputChannel().cancelTimers();
     }
+    
     if (this instanceof PrimitiveAnalysisEngineController) {
-
       getControllerLatch().release();
       // Stops the input channel of this service
       stopInputChannels(InputChannel.CloseAllChannels, shutdownNow);
@@ -1946,7 +1948,6 @@ public abstract class BaseAnalysisEngine
          }
        }
     }   
-    
     getInProcessCache().releaseAllCASes();
     getLocalCache().clear();
     releasedAllCASes = true;
@@ -2019,7 +2020,13 @@ public abstract class BaseAnalysisEngine
     super.destroy();
 
   }
-
+  /**
+   * This method is called by InProcessCache when the cache becomes empty while the controller
+   * is in Quiesce mode.
+   */
+  public void notifyOnCacheEmpty() {
+    quiesceSemaphore.release();
+  }
   /**
    * Stops input channel(s) and waits for CASes still in play to complete processing. When the
    * InProcessCache becomes empty, initiate the service shutdown.
@@ -2030,6 +2037,10 @@ public abstract class BaseAnalysisEngine
               UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stop__INFO",
               new Object[] { getComponentName() });
     }
+    System.out.println("Quiescing UIMA-AS Service. Remaining Number of CASes to Process:"+getInProcessCache().getSize());
+    // Register callback when the inProcessCache becomes empty
+    getInProcessCache().registerController(this);
+    
     if (!isStopped() && !callbackReceived) {
       getControllerLatch().release();
       // To support orderly shutdown, the service must first stop its input channel and
@@ -2065,12 +2076,22 @@ public abstract class BaseAnalysisEngine
         	}
         }
         stopInputChannels(InputChannel.InputChannels, true);  
-        // close JMS connection 
-        stop(false); // wait for any remaining CASes in flight to finish
+       
+        try {
+          if ( !getInProcessCache().isEmpty() ) {
+            // acquire semaphore and wait for the InProcessCache to call notifyOnCacheEmpty()
+            // on this controller when the cache becomes empty.
+            quiesceSemaphore.acquire();
+          }
+          System.out.println("UIMA-AS Service is Stopping, All CASes Have Been Processed");
+        } catch( InterruptedException e) {
+          
+        }
+        stop(false); 
       }
     }
   }
-
+  
   protected void stopDelegateTimers() {
     Iterator<Delegate> it = delegates.iterator();
     while (it.hasNext()) {

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Sat Nov  2 17:21:10 2013
@@ -238,3 +238,7 @@ UIMAJMS_start_inactivity_monitor__INFO=C
 UIMAJMS_discard_msg__INFO=Component: {0} Message From: {1} IP: {2} Message Type: {3} Command: {4} CasId: {5}
 UIMAJMS_show_stats__INFO=Component: {0} Stats for CasId:{1}\n\t{2}
 UIMAJMS_session_open__FINE=Service: {0} Opened Session To Queue:{1} Broker:{2}
+UIMAJMS_invalid_broker_url__WARNING=Unexpected State - the Broker URL is NULL
+UIMAJMS_shared_connections__INFO=Shared Connection: {0}
+UIMAJMS_retrying_jms_connection__WARNING=Connection Invalid - Retrying Connection to Broker:{0} Every 5 Seconds
+