You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/11/02 18:21:11 UTC
svn commit: r1538234 - in /uima/uima-as/trunk:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/
uimaj-as-core/src/main/java/org/apache/uima/aae/
uimaj-as-core/src/main/java/org/apache/uima/aae/controller/
uimaj-as-jms/src/main/resou...
Author: cwiklik
Date: Sat Nov 2 17:21:10 2013
New Revision: 1538234
URL: http://svn.apache.org/r1538234
Log:
UIMA-3383 Refactored code related to handling of Quiesce request. Both 'q' and CTRL-C initiated quiesce now wait for notification from InProcessCache when it becomes empty. Only when the cache is empty the process is allowed to exit.
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java Sat Nov 2 17:21:10 2013
@@ -23,12 +23,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InvalidClassException;
-import java.net.Socket;
import org.apache.uima.UIMAFramework;
-import org.apache.uima.UimaContext;
-import org.apache.uima.UimaContextAdmin;
-import org.apache.uima.aae.UimaASApplicationEvent;
import org.apache.uima.aae.UimaASApplicationExitEvent;
import org.apache.uima.aae.UimaAsVersion;
import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -395,7 +391,7 @@ public class UIMA_Service implements App
} else if ( event instanceof UimaASApplicationExitEvent ) {
System.out.println("Service Wrapper Received UimaASApplicationEvent. Message:"+event.getSource());
} else if ( event instanceof ContextStoppedEvent ){ // Spring has been shutdown
- System.exit(0);
+
}
}
@@ -490,7 +486,6 @@ public class UIMA_Service implements App
}
static class ServiceShutdownHook extends Thread {
-
public SpringContainerDeployer serviceDeployer;
public ServiceShutdownHook(SpringContainerDeployer serviceDeployer) {
@@ -501,18 +496,19 @@ public class UIMA_Service implements App
try {
AnalysisEngineController topLevelController = serviceDeployer.getTopLevelController();
if (topLevelController != null && !topLevelController.isStopped() ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_caught_signal__INFO", new Object[] { topLevelController.getComponentName() });
- serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
- }
+ serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+ }
} catch( Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
- }
+ }
}
+
}
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Sat Nov 2 17:21:10 2013
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.Semaphore;
import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.EventSubscriber;
import org.apache.uima.aae.error.AsynchAEException;
@@ -64,6 +65,18 @@ public class InProcessCache implements I
int size = 0;
+ private BaseAnalysisEngineController controller;
+
+ /**
+ Register controller to call when the cache becomes empty.
+ This call is made when the controller enters quiesce
+ state. In this state the controller waits for
+ the cache to send notification when all CASes have been
+ processed.
+ **/
+ public void registerController(BaseAnalysisEngineController ctrl) {
+ controller = ctrl;
+ }
public void registerCallbackWhenCacheEmpty(EventSubscriber aController) {
registerCallbackWhenCacheEmpty(aController, 0);
}
@@ -271,7 +284,10 @@ public class InProcessCache implements I
((EventSubscriber) callbackListeners.get(i)).onCacheEmpty();
}
}
-
+ if (cache.size() == 0 && controller != null ) {
+ // unblock the controller waiting in quiesceAndStop
+ controller.notifyOnCacheEmpty();
+ }
}
// never called 5/2013
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java Sat Nov 2 17:21:10 2013
@@ -50,7 +50,6 @@ public class AnalysisEngineInstancePoolW
lock.acquireUninterruptibly();
// Call destroy() on AE on checkin if the UIMA AS process is in quiesce mode
if ( destroyAEInstanceIfFree ) {
- System.out.println("........... AnalysisEngineInstancePool.checkin() - Thread:"+Thread.currentThread().getId()+" calling destroy() on AE checkin");
anAnalysisEngine.destroy();
} else {
aeInstanceMap.put(Thread.currentThread().getId(), anAnalysisEngine);
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Sat Nov 2 17:21:10 2013
@@ -105,6 +105,9 @@ public abstract class BaseAnalysisEngine
private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
public static enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
public static final boolean NO_RECOVERY = true;
+ // Semaphore use only when quiesceAndStop is called
+ // When the cache becomes empty the semaphore is released.
+ private Semaphore quiesceSemaphore = new Semaphore(0);
protected ServiceState currentState = ServiceState.INITIALIZING;
@@ -1895,7 +1898,6 @@ public abstract class BaseAnalysisEngine
}
}
}
-
if (daemonServiceExecutor != null) {
daemonServiceExecutor.shutdown();
}
@@ -1907,8 +1909,8 @@ public abstract class BaseAnalysisEngine
if (getOutputChannel() != null) {
getOutputChannel().cancelTimers();
}
+
if (this instanceof PrimitiveAnalysisEngineController) {
-
getControllerLatch().release();
// Stops the input channel of this service
stopInputChannels(InputChannel.CloseAllChannels, shutdownNow);
@@ -1946,7 +1948,6 @@ public abstract class BaseAnalysisEngine
}
}
}
-
getInProcessCache().releaseAllCASes();
getLocalCache().clear();
releasedAllCASes = true;
@@ -2019,7 +2020,13 @@ public abstract class BaseAnalysisEngine
super.destroy();
}
-
+ /**
+ * This method is called by InProcessCache when the cache becomes empty while the controller
+ * is in Quiesce mode.
+ */
+ public void notifyOnCacheEmpty() {
+ quiesceSemaphore.release();
+ }
/**
* Stops input channel(s) and waits for CASes still in play to complete processing. When the
* InProcessCache becomes empty, initiate the service shutdown.
@@ -2030,6 +2037,10 @@ public abstract class BaseAnalysisEngine
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stop__INFO",
new Object[] { getComponentName() });
}
+ System.out.println("Quiescing UIMA-AS Service. Remaining Number of CASes to Process:"+getInProcessCache().getSize());
+ // Register callback when the inProcessCache becomes empty
+ getInProcessCache().registerController(this);
+
if (!isStopped() && !callbackReceived) {
getControllerLatch().release();
// To support orderly shutdown, the service must first stop its input channel and
@@ -2065,12 +2076,22 @@ public abstract class BaseAnalysisEngine
}
}
stopInputChannels(InputChannel.InputChannels, true);
- // close JMS connection
- stop(false); // wait for any remaining CASes in flight to finish
+
+ try {
+ if ( !getInProcessCache().isEmpty() ) {
+ // acquire semaphore and wait for the InProcessCache to call notifyOnCacheEmpty()
+ // on this controller when the cache becomes empty.
+ quiesceSemaphore.acquire();
+ }
+ System.out.println("UIMA-AS Service is Stopping, All CASes Have Been Processed");
+ } catch( InterruptedException e) {
+
+ }
+ stop(false);
}
}
}
-
+
protected void stopDelegateTimers() {
Iterator<Delegate> it = delegates.iterator();
while (it.hasNext()) {
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Sat Nov 2 17:21:10 2013
@@ -238,3 +238,7 @@ UIMAJMS_start_inactivity_monitor__INFO=C
UIMAJMS_discard_msg__INFO=Component: {0} Message From: {1} IP: {2} Message Type: {3} Command: {4} CasId: {5}
UIMAJMS_show_stats__INFO=Component: {0} Stats for CasId:{1}\n\t{2}
UIMAJMS_session_open__FINE=Service: {0} Opened Session To Queue:{1} Broker:{2}
+UIMAJMS_invalid_broker_url__WARNING=Unexpected State - the Broker URL is NULL
+UIMAJMS_shared_connections__INFO=Shared Connection: {0}
+UIMAJMS_retrying_jms_connection__WARNING=Connection Invalid - Retrying Connection to Broker:{0} Every 5 Seconds
+