You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/12/13 15:51:12 UTC
svn commit: r1213723 - in /uima/uima-as/trunk:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-core/src/main/java/org/apache/uima/aae/
uimaj-as-core/src/main/java/org/apache/uima/aae/controller/
uimaj-as-core/src/main/res...
Author: cwiklik
Date: Tue Dec 13 14:51:11 2011
New Revision: 1213723
URL: http://svn.apache.org/viewvc?rev=1213723&view=rev
Log:
UIMA-2309 Refactored code that handles orderly shutdown of uima as service. On shutdown, AE.destroy() is called on the same thread that initialized it.
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Tue Dec 13 14:51:11 2011
@@ -840,17 +840,20 @@ public class JmsInputChannel implements
}
}
- private void stopChannel(UimaDefaultMessageListenerContainer mL) throws Exception {
+ private void stopChannel(UimaDefaultMessageListenerContainer mL, boolean shutdownNow) throws Exception {
String eName = mL.getEndpointName();
if (eName != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stopChannel",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_jms_transport__INFO",
- new Object[] { eName });
+ new Object[] { eName, shutdownNow });
}
}
- mL.stop();
-
+ mL.delegateStop();
+ if (shutdownNow) {
+ mL.destroy(shutdownNow);
+ }
+
String selector = "";
if (mL.getMessageSelector() != null) {
selector = " Selector:" + mL.getMessageSelector();
@@ -877,8 +880,8 @@ public class JmsInputChannel implements
return true;
}
- public void stop() throws Exception {
- stop(InputChannel.CloseAllChannels);
+ public void stop(boolean shutdownNow) throws Exception {
+ stop(InputChannel.CloseAllChannels, shutdownNow);
listenerContainerList.clear();
failedListenerMap.clear();
if ( remoteJMXServer != null ) {
@@ -886,6 +889,12 @@ public class JmsInputChannel implements
remoteJMXServer = null;
}
}
+ public void disconnectListenersFromQueue() throws Exception {
+ for (Object listenerObject : listenerContainerList) {
+ final UimaDefaultMessageListenerContainer mL = (UimaDefaultMessageListenerContainer) listenerObject;
+ stopChannel(mL, false);
+ }
+ }
public void setTerminating() {
if ( listenerContainerList.size() > 0 ) {
// set a global static flag to stop spring's from automatic recovery on lost connection
@@ -905,12 +914,12 @@ public class JmsInputChannel implements
}
}
- public synchronized void stop(int channelsToClose) throws Exception {
- List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
+ public synchronized void stop(int channelsToClose, boolean shutdownNow) throws Exception {
+ List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
for (Object listenerObject : listenerContainerList) {
final UimaDefaultMessageListenerContainer mL = (UimaDefaultMessageListenerContainer) listenerObject;
if (mL != null && doCloseChannel(mL, channelsToClose)) {
- stopChannel(mL);
+ stopChannel(mL, shutdownNow);
// Just in case check if the container still in the list. If so, add it to
// another list that container listeners that have been stopped and need
// to be removed from the listenerContainerList. Removing the listener from
@@ -1014,7 +1023,7 @@ public class JmsInputChannel implements
}
newListener.afterPropertiesSet();
if ( controller != null && controller.isStopped() ) {
- newListener.stop();
+ newListener.destroy(true); // shutdownNow
// we are aborting, the controller has been stopped
return;
}
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Tue Dec 13 14:51:11 2011
@@ -1671,12 +1671,15 @@ public class JmsOutputChannel implements
long t = System.nanoTime();
getAnalysisEngineController().saveReplyTime(t, "");
}
-
public void stop() {
- stop(Channel.CloseAllChannels);
+ stop(Channel.CloseAllChannels, true);
+}
+
+ public void stop(boolean shutdownNow) {
+ stop(Channel.CloseAllChannels, shutdownNow);
}
- public void stop(int channelsToClose) {
+ public void stop(int channelsToClose, boolean shutdownNow) {
aborting = true;
try {
// Fetch iterator over all Broker Connections. This service may be connected
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Tue Dec 13 14:51:11 2011
@@ -923,15 +923,22 @@ public class UimaDefaultMessageListenerC
// we will start listeners on input queue.
this.setAutoStartup(false);
}
- public void shutdownTaskExecutor(ThreadPoolExecutor tpe) throws InterruptedException {
- tpe.purge();
- tpe.shutdownNow();
+ public void shutdownTaskExecutor(ThreadPoolExecutor tpe, boolean stopImmediate) throws InterruptedException {
+ if ( stopImmediate ) {
+ tpe.purge();
+ tpe.shutdownNow();
+ } else {
+ tpe.shutdown();
+ }
+ }
+ public void destroy() {
+ destroy(true);
}
/**
* Spins a shutdown thread and stops Sprint and ActiveMQ threads.
*
*/
- public void destroy() {
+ public void destroy(final boolean stopImmediate) {
if (awaitingShutdown) {
return;
@@ -948,16 +955,33 @@ public class UimaDefaultMessageListenerC
// delegate stop request to Spring
__listenerRef.delegateStop();
if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
- ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().purge();
- ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdownNow();
- } else if (concurrentListener != null) {
- shutdownTaskExecutor(concurrentListener.getTaskExecutor());
+ // Modify task executor to terminate idle threads. While the thread terminates
+ // it calls destroy() method on the pinned instance of AE
+ ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().allowCoreThreadTimeOut(true);
+ ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().setKeepAliveTime(1000, TimeUnit.MILLISECONDS);
+ ((ThreadPoolTaskExecutor) taskExecutor).setWaitForTasksToCompleteOnShutdown(true);
+ ((ThreadPoolTaskExecutor) taskExecutor).shutdown();
+ } else if (concurrentListener != null) {
+ shutdownTaskExecutor(concurrentListener.getTaskExecutor(), stopImmediate);
concurrentListener.stop();
- } else if ( threadPoolExecutor != null ) {
- shutdownTaskExecutor(threadPoolExecutor);
- }
+ } else if ( threadPoolExecutor != null ) {
+ shutdownTaskExecutor(threadPoolExecutor, true);
+ }
}
+ // Close Connection to the broker
+ String controllerName = (__listenerRef.controller == null) ? "" :__listenerRef.controller.getComponentName();
+ __listenerRef.getSharedConnection().close();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_listener_shutdown__INFO", new Object[] {controllerName,__listenerRef.getMessageSelector(),__listenerRef.getBrokerUrl()});
+ }
__listenerRef.shutdown();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_listener_jms_connection_closed__INFO", new Object[] {controllerName,__listenerRef.getMessageSelector()});
+ }
} catch (Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
@@ -989,7 +1013,7 @@ public class UimaDefaultMessageListenerC
// 2) ReleaseCAS request
if ( taskExecutor == null ) {
UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
- tf.setDaemon(true);
+ tf.setDaemon(false);
if ( isFreeCasQueueListener()) {
tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
} else if ( isGetMetaListener() ) {
@@ -1006,6 +1030,20 @@ public class UimaDefaultMessageListenerC
threadPoolExecutor = (ThreadPoolExecutor)es;
super.setTaskExecutor(es);
}
+ } else {
+ UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
+ tf.setDaemon(false);
+ if ( isFreeCasQueueListener()) {
+ tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
+ } else if ( isGetMetaListener() ) {
+ tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+ } else if ( getDestination() != null && getMessageSelector() != null ) {
+ tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
+ } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
+ tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+ } else {
+ throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
+ }
}
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java Tue Dec 13 14:51:11 2011
@@ -24,9 +24,9 @@ public interface Channel {
public static final int InputChannels = 1;
- public void stop() throws Exception;
+ public void stop(boolean shutdownNow) throws Exception;
- public void stop(int channelsToStop) throws Exception;
+ public void stop(int channelsToStop, boolean shutdownNow) throws Exception;
public String getName();
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java Tue Dec 13 14:51:11 2011
@@ -53,4 +53,6 @@ public interface InputChannel extends Ch
public void setTerminating();
public void terminate();
+
+ public void disconnectListenersFromQueue() throws Exception;
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java Tue Dec 13 14:51:11 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
import org.apache.uima.util.Level;
/**
@@ -44,7 +45,7 @@ public class UimaAsThreadFactory impleme
private String threadNamePrefix=null;
- private boolean isDaemon;
+ private boolean isDaemon=false;
public static AtomicInteger poolIdGenerator = new AtomicInteger();
@@ -70,7 +71,7 @@ public class UimaAsThreadFactory impleme
theThreadGroup = tGroup;
}
public void setDaemon(boolean daemon) {
- isDaemon = daemon;
+ // isDaemon = daemon;
}
public void stop() {
}
@@ -99,6 +100,9 @@ public class UimaAsThreadFactory impleme
if (controller != null && !controller.threadAssignedToAE()) {
// call the controller to initialize next instance of AE. Once initialized this
// AE instance process() method will only be called from this thread
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+ "UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_calling_ae_initialize__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
controller.initializeAnalysisEngine();
}
// Call given Worker (Runnable) run() method and block. This call block until the
@@ -118,7 +122,18 @@ public class UimaAsThreadFactory impleme
controller.notifyListenersWithInitializationStatus(ex);
}
return;
+ } finally {
+ if ( controller instanceof PrimitiveAnalysisEngineController_impl ) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+ "UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_process_thread_exiting__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
+ ((PrimitiveAnalysisEngineController_impl)controller).destroyAE();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+ "UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_ae_instance_destroy_called__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
+ }
}
+
}
});
} catch (Exception e) {
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Tue Dec 13 14:51:11 2011
@@ -656,7 +656,7 @@ public class AggregateAnalysisEngineCont
}
// Any problems in completeInitialization() is a reason to stop
notifyListenersWithInitializationStatus(ex);
- super.stop();
+ super.stop(true); // shutdown now
}
private void stopListener(String key, Endpoint endpoint) throws Exception {
@@ -3064,7 +3064,7 @@ public class AggregateAnalysisEngineCont
}
public void stop() {
- super.stop();
+ super.stop(true); // shutdown now
this.cleanUp();
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java Tue Dec 13 14:51:11 2011
@@ -25,16 +25,6 @@ import org.apache.uima.analysis_engine.A
public interface AnalysisEngineInstancePool {
/**
- * Creates and initializes the AE Pool with intances of AEs provided in the
- * anAnalysisEngineInstanceList
- *
- * @param anAnalysisEngineInstanceList
- * - list of AnalysisEngine instances
- * @throws Exception
- */
- public void intialize(List anAnalysisEngineInstanceList) throws Exception;
-
- /**
* Adds an instance of AnalysisEngine to the pool
*
* @param anAnalysisEngine
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java Tue Dec 13 14:51:11 2011
@@ -22,54 +22,39 @@
package org.apache.uima.aae.controller;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.Semaphore;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.util.Level;
public class AnalysisEngineInstancePoolWithThreadAffinity implements AnalysisEngineInstancePool {
private static final Class CLASS_NAME = AnalysisEngineInstancePoolWithThreadAffinity.class;
- private boolean allThreadsAlreadyAssigned = false;
-
- private Map aeInstanceMap = new HashMap();
-
- private List aeList = new ArrayList();
-
- private int analysisEnginePoolSize = 0;
-
- public AnalysisEngineInstancePoolWithThreadAffinity(int aePoolSize) {
- analysisEnginePoolSize = aePoolSize;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#intialize(java.util.List)
- */
- public void intialize(List anAnalysisEngineInstanceList) throws Exception {
- aeList = anAnalysisEngineInstanceList;
- }
+ private volatile boolean destroyAEInstanceIfFree=false;
+ private Semaphore lock = new Semaphore(1);
+
+ private Map<Long, AnalysisEngine> aeInstanceMap = new HashMap<Long,AnalysisEngine>();
public int size() {
return aeInstanceMap.size();
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.uima.aae.controller.AnalysisEngineInstancePool#checkin(org.apache.uima.analysis_engine
- * .AnalysisEngine)
- */
- public synchronized void checkin(AnalysisEngine anAnalysisEngine) throws Exception {
- aeInstanceMap.put(Thread.currentThread().getId(), anAnalysisEngine);
+ public void checkin(AnalysisEngine anAnalysisEngine) throws Exception {
+ try {
+ lock.acquireUninterruptibly();
+ aeInstanceMap.put(Thread.currentThread().getId(), anAnalysisEngine);
+ } catch( Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ lock.release();
+ }
}
public boolean exists() {
@@ -83,39 +68,38 @@ public class AnalysisEngineInstancePoolW
*
* @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#checkout()
**/
- public synchronized AnalysisEngine checkout() throws Exception {
- AnalysisEngine ae = null;
-
- // AEs are instantiated and initialized in the the main thread and placed in the temporary list.
- // First time in the process() method, each thread will remove AE instance from the temporary
- // list
- // and place it in the permanent instanceMap. The key to the instanceMap is the thread name.
- // Each
- // thread will always process a CAS using its own and dedicated AE instance.
- return (AnalysisEngine) aeInstanceMap.remove(Thread.currentThread().getId());
+ public AnalysisEngine checkout() throws Exception {
+ try {
+ lock.acquireUninterruptibly();
+ if ( !exists() ) {
+ throw new AsynchAEException("AE instance not found in AE pool. Most likely due to service quiescing");
+ }
+ // AEs are instantiated and initialized in the the main thread and placed in the temporary list.
+ // First time in the process() method, each thread will remove AE instance from the temporary
+ // list
+ // and place it in the permanent instanceMap. The key to the instanceMap is the thread name.
+ // Each
+ // thread will always process a CAS using its own and dedicated AE instance.
+ return (AnalysisEngine) aeInstanceMap.remove(Thread.currentThread().getId());
+
+ } catch( Exception e) {
+ throw e;
+ } finally {
+ lock.release();
+ }
+
+
}
-
/*
* (non-Javadoc)
*
* @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#destroy()
*/
public void destroy() throws Exception {
-
- Iterator aeInstanceIterator = aeInstanceMap.keySet().iterator();
- int i = 0;
- while (aeInstanceIterator.hasNext()) {
- AnalysisEngine ae = (AnalysisEngine) aeInstanceMap.get((Long) aeInstanceIterator.next());
- ae.destroy();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "abort",
- UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_destroying_ae__INFO",
- new Object[] { ae.getAnalysisEngineMetaData().getName(), i });
- }
- i++;
- }
- aeInstanceMap.clear();
+ // set the flag so that any AE instance returned from PrimitiveController
+ // will be destroyed.
+ destroyAEInstanceIfFree = true;
}
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Tue Dec 13 14:51:11 2011
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
@@ -210,6 +211,8 @@ public abstract class BaseAnalysisEngine
protected ConcurrentHashMap<String, UimaMessageListener> messageListeners = new ConcurrentHashMap<String, UimaMessageListener>();
private Exception initException = null;
+
+ private Object lock = new Object();
// Local cache for this controller only. This cache stores state of
// each CAS. The actual CAS is still stored in the global cache. The
@@ -230,7 +233,9 @@ public abstract class BaseAnalysisEngine
// Monitor used in stop() to await a callback from InProcessCache
protected Object callbackMonitor = new Object();
-
+
+ protected Semaphore onEmptyCacheSemaphore = new Semaphore(1);
+
protected volatile boolean awaitingCacheCallbackNotification = false;
protected ConcurrentHashMap<String, String> abortedCasesMap = new ConcurrentHashMap<String, String>();
@@ -1768,11 +1773,11 @@ public abstract class BaseAnalysisEngine
* Stops input channel(s) and initiates a shutdown of all delegates ( if this is an aggregate ).
* At the end sends an Exception to the client and closes an output channel.
*/
- public void stop() {
- this.stop(null, null);
+ public void stop(boolean shutdownNow) {
+ this.stop(null, null,shutdownNow);
}
- public void stop(Throwable cause, String aCasReferenceId) {
+ public void stop(Throwable cause, String aCasReferenceId, boolean shutdownNow ) {
if (!isStopped()) {
setStopped();
}
@@ -1792,11 +1797,11 @@ public abstract class BaseAnalysisEngine
getControllerLatch().release();
// Stops the input channel of this service
- stopInputChannels(InputChannel.CloseAllChannels);
+ stopInputChannels(InputChannel.CloseAllChannels, shutdownNow);
} else {
((AggregateAnalysisEngineController_impl) this).stopTimers();
// Stops ALL input channels of this service including the reply channels
- stopInputChannels(InputChannel.CloseAllChannels);
+ stopInputChannels(InputChannel.CloseAllChannels,shutdownNow);
List<AnalysisEngineController> colocatedControllerList =
((AggregateAnalysisEngineController_impl)this).getChildControllerList();
@@ -1843,7 +1848,6 @@ public abstract class BaseAnalysisEngine
}
}
}
-
getInProcessCache().releaseAllCASes();
releasedAllCASes = true;
@@ -1857,6 +1861,7 @@ public abstract class BaseAnalysisEngine
jmxManagement.destroy();
} catch (Exception e) {
}
+
try {
getInProcessCache().destroy();
} catch (Exception e) {
@@ -1936,12 +1941,35 @@ public abstract class BaseAnalysisEngine
// Stops all input channels of this service, but keep temp reply queue input channels open
// to process replies.
- stopInputChannels(InputChannel.InputChannels);
+ stopReceivingCASes();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
"quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_register_onEmpty_callback__INFO", new Object[] { getComponentName() });
}
+ if ( this instanceof PrimitiveAnalysisEngineController_impl &&
+ ((PrimitiveAnalysisEngineController_impl)this).aeInstancePool != null ) {
+ // Since we are quiescing, destroy all AEs that are in AE pool. Those that
+ // are still busy processing will be destroyed when they finish.
+ try {
+ // Sleep for 2secs to allow any CASes that just arrived to reach process
+ // method. There may be CASes in flight that just came in before we
+ // stopped input channel but not yet reached process method. We allow
+ // them to be processed before we clean AE pool below.
+ synchronized(lock) {
+ lock.wait(2000);
+ }
+ // Set a flag on the AEPool manager to destroy any AE instance being returned
+ // to the pool. The AE.destroy() method must be called on the same thread
+ // that initialized the AE instance. Any AE instances already in the pool
+ // will be destroyed when a thread pool is shutdown
+ ((PrimitiveAnalysisEngineController_impl)this).aeInstancePool.destroy();
+ } catch( Exception e) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
+ }
// Register a callback with the cache. The callback will be made when the cache becomes
// empty
getInProcessCache().registerCallbackWhenCacheEmpty(this,
@@ -1970,8 +1998,9 @@ public abstract class BaseAnalysisEngine
"quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_onEmpty_callback_received__INFO", new Object[] { getComponentName() });
}
- getInputChannel().terminate();
- stop();
+ stopInputChannels(InputChannel.InputChannels, true);
+ // close JMS connection
+ stop(false); // wait for any remaining CASes in flight to finish
}
}
}
@@ -2018,14 +2047,14 @@ public abstract class BaseAnalysisEngine
iC.setTerminating();
}
// Stop the inflow of new input CASes
- stopInputChannel();
- if ( iC != null ) {
+ stopInputChannel(true); // shutdownNow
+ if ( iC != null ) {
iC.terminate();
}
stopCasMultipliers();
stopTransportLayer();
if (cause != null && aCasReferenceId != null) {
- this.stop(cause, aCasReferenceId);
+ this.stop(cause, aCasReferenceId, true); // shutdownNow
} else {
this.stop();
}
@@ -2150,11 +2179,11 @@ public abstract class BaseAnalysisEngine
* Stops a listener on the main input channel
*
*/
- protected void stopInputChannel() {
+ protected void stopInputChannel(boolean shutdownNow) {
InputChannel iC = getInputChannel(endpointName);
if (iC != null && !iC.isStopped()) {
try {
- iC.stop();
+ iC.stop(shutdownNow);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "terminate",
@@ -2171,7 +2200,43 @@ public abstract class BaseAnalysisEngine
iC.setTerminating();
}
}
- protected void stopInputChannels( int channelsToStop) { //, boolean norecovery) {
+ protected void stopReceivingCASes() {
+
+ InputChannel iC = null;
+ setInputChannelForNoRecovery();
+ Iterator<String> it = inputChannelMap.keySet().iterator();
+ while (it.hasNext()) {
+ try {
+ String key = it.next();
+ if (key != null && key.trim().length() > 0) {
+ iC = (InputChannel) inputChannelMap.get(key);
+ if (iC != null) {
+ iC.disconnectListenersFromQueue();
+ }
+ }
+ } catch (Exception e) {
+ if (iC != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+ "stopReceivingCASes", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_unable_to_stop_inputchannel__INFO",
+ new Object[] { getComponentName(), iC.getInputQueueName() });
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "stopReceivingCASes", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", getComponentName());
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "stopReceivingCASes", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
+ }
+ }
+ }
+
+ }
+ protected void stopInputChannels( int channelsToStop, boolean shutdownNow) { //, boolean norecovery) {
InputChannel iC = null;
setInputChannelForNoRecovery();
Iterator it = inputChannelMap.keySet().iterator();
@@ -2186,10 +2251,10 @@ public abstract class BaseAnalysisEngine
&& iC.getServiceInfo().getInputQueueName().startsWith("top_level_input_queue")) {
// This closes both listeners on the input queue: Process Listener and GetMeta
// Listener
- iC.stop(channelsToStop);
+ iC.stop(channelsToStop,shutdownNow);
return; // Just closed input channels. Keep the others open
}
- iC.stop(channelsToStop);
+ iC.stop(channelsToStop,shutdownNow);
}
}
i++;
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Tue Dec 13 14:51:11 2011
@@ -86,7 +86,7 @@ public class PrimitiveAnalysisEngineCont
// Pool containing instances of AE. The default implementation provides Thread affinity
// meaning each thread executes the same AE instance.
- private AnalysisEngineInstancePool aeInstancePool = null;
+ protected AnalysisEngineInstancePool aeInstancePool = null;
private String abortedCASReferenceId = null;
// Create a shared semaphore to serialize creation of AE instances.
@@ -182,7 +182,7 @@ public class PrimitiveAnalysisEngineCont
return;
}
if (aeInstancePool == null) {
- aeInstancePool = new AnalysisEngineInstancePoolWithThreadAffinity(analysisEnginePoolSize);
+ aeInstancePool = new AnalysisEngineInstancePoolWithThreadAffinity();//analysisEnginePoolSize);
}
if (analysisEngineMetadata == null) {
analysisEngineMetadata = ae.getAnalysisEngineMetaData();
@@ -283,6 +283,10 @@ public class PrimitiveAnalysisEngineCont
// Initialize Cas Manager
if (getCasManagerWrapper() != null) {
try {
+ // Below should always be true. In spring context file AsynchAECasManager_impl
+ // is instantiated and setCasPoolSize() method is called which sets the
+ // initialized state = true. isInitialized() returning true just means that
+ // setCasPoolSize() was called.
if (getCasManagerWrapper().isInitialized()) {
getCasManagerWrapper().addMetadata(getAnalysisEngineMetadata());
if (isTopLevelComponent()) {
@@ -477,7 +481,15 @@ public class PrimitiveAnalysisEngineCont
}
}
}
+ public void destroyAE() {
+ try {
+ AnalysisEngine ae = aeInstancePool.checkout();
+ ae.destroy();
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
/**
* This is called when a Stop request is received from a client. Add the provided Cas id to the
* list of aborted CASes. The process() method checks this list to determine if it should continue
@@ -1017,7 +1029,7 @@ public class PrimitiveAnalysisEngineCont
}
public void stop() {
- super.stop();
+ super.stop(true); // shutdown now
if (aeInstancePool != null) {
try {
aeInstancePool.destroy();
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Tue Dec 13 14:51:11 2011
@@ -245,4 +245,10 @@ UIMAEE_service_sending_release_cas_reque
UIMAEE_service_sending_stop_request__FINE = Controller: {0} Sending STOP Request to Delegate: {1}
UIMAEE_service_delivery_exception__WARNING = Controller: {0} Failed To Send Message to Delegates {1} Queue: {2}
UIMAEE_service_delivery_to_client_exception__WARNING = Controller: {0} Failed To Send Message to Clients Queue: {1}
-UIMAEE_service_state__INFO={0}
\ No newline at end of file
+UIMAEE_service_state__INFO={0}
+UIMAEE_service_returned_reply__INFO=Controller: {0} Returned Input CAS: {1}
+UIMAEE_checking_in_ae_to_pool__INFO=Controller: {0} Checking in AE instance to AePoolManager. Thread Id:{1}
+UIMAEE_checked_in_ae_to_pool__INFO=Controller: {0} Checked in AE instance to AePoolManager. Thread Id:{1}
+UIMAEE_calling_ae_initialize__INFO=Controller: {0} Initializing AE instance on Thread Id: {1}
+UIMAEE_process_thread_exiting__INFO=Controller: {0} --------------- Process Thread ID:{1} EXITING
+UIMAEE_ae_instance_destroy_called__INFO=Controller: {0} --------------- AE destroy() Method Call Returned ID:{1}
\ No newline at end of file
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Tue Dec 13 14:51:11 2011
@@ -68,7 +68,7 @@ UIMAJMS_connection_closed_to_endpoint__F
UIMAJMS_open_connection_to_endpoint__FINE = Opening Connection To Endpoint: {0}
UIMAJMS_sending_msg_to_endpoint__FINE = Sending Message to Endpoint: {0}
UIMAJMS_service_listening__INFO = {0} Service Starting - Listening for Messages
-UIMAJMS_stopping_jms_transport__INFO = Stopping Service JMS Transport. Service: {0}
+UIMAJMS_stopping_jms_transport__INFO = Stopping Service JMS Transport. Service: {0} ShutdownNow {1}
UIMAJMS_recvd_msg__FINE = Service:{0} Received New Message
UIMAJMS_connector_list__FINE = ActiveMQ Broker Connector List: {0}
UIMAJMS_broker_uri__FINE = Broker URI: {0}
@@ -211,7 +211,8 @@ UIMAJMS_service_not_responding_to_ping__
UIMAJMS_starting_listener__INFO=Controller: {0} Starting Listener on Endpoint: {1} Selector: {2} Broker: {3}
UIMAJMS_caught_signal__INFO= Uima AS Service {0} Caught Kill Signal - Initiating Quiesce and Stop
UIMAJMS_listener_added_after_initialize__WARNING = UIMA AS Already Initialized - Attempt to Add Callback Listener Failed. Add Callback Listener Before calling initialize().
-UIMAJMS_client_interrupted_INFO= UIMA AS Client Thread Interrupted While Waiting For a Reply. CAS: {0} CasHashCode: {1}
+UIMAJMS_client_interrupted_INFO= UIMA AS Client Thread [ID:{0}]Interrupted While Waiting For a Reply. CAS: {1} CasHashCode: {2}
+UIMAJMS_client_canceled_timer_INFO=UIMA AS Client Thread [ID:{0}] Cancelled Timer and Throwing Exception from sendAndReceive for CAS: {1} CasHashCode: {2}
UIMAJMS_failed_cache_lookup__WARNING= UIMA AS Client Failed Cache Look Up For CAS: {0} Command:{1} Message:{2} Destination:{3}
UIMAJMS_calling_onBeforeMessageSend__FINE= UIMA AS Client Calling onBeforeMessageSend - CAS:{0} CasHashCode:{1}
UIMAJMS_completed_onBeforeMessageSend__INFO= UIMA AS Client Completed onBeforeMessageSend - CAS:{0} CasHashCode:{1}
@@ -221,4 +222,7 @@ UIMAJMS_cas_added_to_pending_FINE = UIMA
UIMAJMS_cas_submitted_FINE=UIMA AS sendAndReceive Received CAS:{0} HashCode:{1} For Processing - Forwarding to sendCAS() on Thread:{2}
UIMAJMS_calling_onBeforeProcessCAS_FINE = UIMA AS Client Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1}
UIMAJMS_completed_onBeforeProcessCAS_FINE = UIMA AS Client Completed onBeforeMessageProcess For CAS:{0} Hashcode:{1}
-UIMAJMS_skipping_onBeforeProcessCAS_INFO= UIMA AS Client Not Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1}. Invalid state: Node: {2} IP: {3}
\ No newline at end of file
+UIMAJMS_skipping_onBeforeProcessCAS_INFO= UIMA AS Client Not Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1}. Invalid state: Node IP: {2} PID: {3}
+UIMAJMS_listener_shutdown__INFO = +++++++++++++++ Controller: {0} UIMA AS Listener With Selector {1} shutdown() completed
+UIMAJMS_listener_jms_connection_closed__INFO = +++++++++++++++ Controller: {0} UIMA AS Listener With Selector {1} closed JMS connection to Broker {2}
+