You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/04/10 16:06:19 UTC
svn commit: r763936 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
Author: cwiklik
Date: Fri Apr 10 14:06:18 2009
New Revision: 763936
URL: http://svn.apache.org/viewvc?rev=763936&view=rev
Log:
UIMA-1109 Modified to support orderly shutdown of Uima AS service
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=763936&r1=763935&r2=763936&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Fri Apr 10 14:06:18 2009
@@ -25,11 +25,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
@@ -387,17 +389,6 @@
}
public void abort()
{
-// if ( messageListener != null )
-// {
-// UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-// "abort", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_jms_transport__INFO",
-// new Object[] { endpointName });
-// try
-// {
-// messageListener.shutdown();
-// }
-// catch( Exception e) {}
-// }
}
private String decodeIntToString( String aTypeToDecode, int aValueToDecode )
{
@@ -690,7 +681,9 @@
this.messageListener = messageListener;
System.setProperty("BrokerURI", messageListener.getBrokerUrl());
brokerURL = messageListener.getBrokerUrl();
- listenerContainerList.add(messageListener);
+ if ( !listenerContainerList.contains(messageListener) ) {
+ listenerContainerList.add(messageListener);
+ }
this.messageListener = messageListener;
if ( getController() != null )
{
@@ -771,43 +764,75 @@
((PrimitiveAnalysisEngineController)getController()).getServiceInfo().setBrokerURL(brokerURL);
}
}
- public void stop() throws Exception
- {
+ private void stopChannel(UimaDefaultMessageListenerContainer mL) throws Exception {
+ String eName = mL.getEndpointName();
+ if (eName != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_jms_transport__INFO",
+ new Object[] { eName });
+ }
+ }
+ mL.stop();
+ System.out.println("Stopped Channel:"+mL.getDestinationName());
+ }
+ private boolean doCloseChannel(UimaDefaultMessageListenerContainer mL, int channelsToClose) {
+ // Check if we are closing just the input channel
+ if ( channelsToClose == InputChannel.InputChannels ) {
+ // Fetch the listener object
+ ActiveMQDestination destination = (ActiveMQDestination)mL.getListenerEndpoint();
+ // if this is a listener on a temp queue return false. We need to keep all temp
+ // channels open to receive replies and/or notifications to free CASes
+ // Keep the temp reply channel open
+ if ( destination != null && destination.isTemporary() ) {
+ return false;
+ }
+ }
+ return true;
+ }
+ public void stop() throws Exception {
+ stop(InputChannel.CloseAllChannels );
+ listenerContainerList.clear();
+ failedListenerMap.clear();
+ }
+ public synchronized void stop(int channelsToClose) throws Exception {
- for( int i=0; i < listenerContainerList.size(); i++ )
- {
- final UimaDefaultMessageListenerContainer mL =
- (UimaDefaultMessageListenerContainer) listenerContainerList.get(i);
- if ( mL != null && mL.isRunning() )
- {
- stopped = true;
- String eName = mL.getEndpointName();
- if ( eName != null )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_jms_transport__INFO",
- new Object[] { eName /* mL.getEndpointName()*/ /*endpointName */});
- }
- }
- mL.closeConnection();
- mL.stop();
- }
- else
- {
- if ( getController() != null )
- {
+ List<UimaDefaultMessageListenerContainer> listenersToRemove =
+ new ArrayList<UimaDefaultMessageListenerContainer>();
+ for( Object listenerObject : listenerContainerList) {
+ final UimaDefaultMessageListenerContainer mL = (UimaDefaultMessageListenerContainer)listenerObject;
+ if (mL != null && mL.isRunning() && doCloseChannel(mL, channelsToClose) ) {
+ System.out.println("JmsInputChannel.stop()-Stopping Input Channel:"+mL.getDestination()+" Selector:"+mL.getMessageSelector());
+ stopChannel(mL);
+ // Just in case check if the container still in the list. If so, add it to
+ // another list that container listeners that have been stopped and need
+ // to be removed from the listenerContainerList. Removing the listener from
+ // the listenerContainerList in this iterator loop is not working. If for
+ // example the iterator has two elements, after the first remove from the
+ // listenerContainerList, the iterator stops event though there is still
+ // one element left. Process removal of listeners outside of the iterator
+ // loop
+ if ( listenerContainerList.contains(mL) ) {
+ listenersToRemove.add(mL);
+ }
+ } else {
+ if (getController() != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_transport_not_stopped__INFO",
- new Object[] { getController().getComponentName() });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_transport_not_stopped__INFO",
+ new Object[] { getController().getComponentName() });
}
- }
- }
+ }
+ }
}
- messageListener = null;
- controller = null;
- handler = null;
+ // Remove listeners from the listenerContainerList
+ for( UimaDefaultMessageListenerContainer mL : listenersToRemove) {
+ listenerContainerList.remove(mL);
+ }
+ listenersToRemove.clear();
+ if ( channelsToClose == InputChannel.CloseAllChannels ) {
+ stopped = true;
+ }
}
public boolean isStopped()
{
@@ -826,12 +851,10 @@
new UimaDefaultMessageListenerContainer();
ActiveMQConnectionFactory f = new ActiveMQConnectionFactory(failedListener.getBrokerUrl());
newListener.setConnectionFactory(f);
-// newListener.setConnectionFactory(failedListener.getConnectionFactory());
newListener.setMessageListener(this);
newListener.setController(getController());
TempDestinationResolver resolver = new TempDestinationResolver();
-// resolver.setConnectionFactory((ActiveMQConnectionFactory)failedListener.getConnectionFactory());
resolver.setConnectionFactory(f);
resolver.setListener(newListener);
newListener.setDestinationResolver(resolver);
@@ -862,50 +885,75 @@
Object clone = ((Endpoint_impl) endpoint).clone();
newListener.setTargetEndpoint((Endpoint)clone);
}
-
- private UimaDefaultMessageListenerContainer getListenerForEndpoint( String anEndpointName ) {
+ /**
+ * Given an endpoint name returns all listeners attached to this endpoint. There can be multiple
+ * listeners on an endpoint each with a different selector to receive targeted messages like GetMeta
+ * and Process.
+ *
+ * @param anEndpointName - name of the endpoint that is used to find associated listener(s)
+ *
+ * @return - list of listeners
+ */
+ private UimaDefaultMessageListenerContainer[] getListenersForEndpoint( String anEndpointName ) {
+ List<UimaDefaultMessageListenerContainer> listeners = new ArrayList<UimaDefaultMessageListenerContainer>();
for( int i=0; i < listenerContainerList.size(); i++ ) {
UimaDefaultMessageListenerContainer mListener =
(UimaDefaultMessageListenerContainer) listenerContainerList.get(i);
- if ( mListener.getDestination() != null && mListener.getDestination().toString().equals( anEndpointName)) {
- return mListener;
+ if ( mListener.getDestinationName() != null && mListener.getDestinationName().equals( anEndpointName)) {
+ listeners.add(mListener);
+ } else if ( mListener.getDestination() != null && mListener.getDestination().toString().equals( anEndpointName)) {
+ listeners.add(mListener);
}
}
+ if ( listeners.size() > 0 ) {
+ UimaDefaultMessageListenerContainer[] listenerArray = new UimaDefaultMessageListenerContainer[listeners.size()];
+ listeners.toArray(listenerArray);
+ return listenerArray;
+ }
return null;
}
- public synchronized void destroyListener( String anEndpointName, String aDelegateKey ) {
- final UimaDefaultMessageListenerContainer mListener =
- getListenerForEndpoint(anEndpointName);
- if ( mListener == null ) {
- System.out.println("--- Listener For Endpoint: "+aDelegateKey+" Not Found");
+ /**
+ *
+ */
+ public void destroyListener( final String anEndpointName, String aDelegateKey ) {
+ // check if delegate listener has already been placed in the failed listeners list
+ // If so, nothing else to do here
+ if ( failedListenerMap.containsKey(aDelegateKey) ){
return;
}
- if ( !mListener.isRunning() ) {
- return; // Already Stopped
+ // Fetch all associated listeners.
+ final UimaDefaultMessageListenerContainer[] mListeners = getListenersForEndpoint(anEndpointName);
+ if ( mListeners == null ) {
+ return;
}
-
- try {
+ // Stop each listener
+ for( final UimaDefaultMessageListenerContainer mListener : mListeners ) {
+ if ( !mListener.isRunning() ) {
+ continue; // Already Stopped
+ }
+
+ try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) && mListener.getDestination() != null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stop_listener__INFO",
- new Object[] { mListener.getDestination().toString() });
+ "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stop_listener__INFO",
+ new Object[] { mListener.getDestination().toString() });
}
- System.out.println("++++ Stopping Listener ...");
- mListener.closeConnection();
- mListener.stop();
- System.out.println("++++ Destroying Listener ...");
- new Thread() {
+ // Spin a thread that will stop the listener and wait for its shutdown
+ Thread stopThread = new Thread("InputChannelStopThread") {
public void run() {
- mListener.destroy();
+ mListener.stop();
+ // wait until the listener shutsdown
+ while( mListener.isRunning());
+ System.out.println("Thread:"+Thread.currentThread().getId()+"++++ Listener on Queue:"+anEndpointName+" Has Been Stopped...");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) && mListener.getDestination() != null) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO",
+ new Object[] {controller.getComponentName(), mListener.getDestination().toString() });
+ }
}
};
- while( mListener.isRunning());
- System.out.println("Thread:"+Thread.currentThread().getId()+"++++ Listener on Queue:"+anEndpointName+" Has Been Stopped...");
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) && mListener.getDestination() != null) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO",
- new Object[] {controller.getComponentName(), mListener.getDestination().toString() });
- }
+ stopThread.start();
+
if ( getController() != null ) {
Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false);
endpoint.setStatus(Endpoint.FAILED);
@@ -918,20 +966,22 @@
}
}
}
- //}
- } catch( Exception e) {
- e.printStackTrace();
+ //}
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
}
}
public boolean isFailed(String aDelegateKey) {
return failedListenerMap.containsKey(aDelegateKey);
}
public boolean isListenerForDestination( String anEndpointName) {
- UimaDefaultMessageListenerContainer mListener =
- getListenerForEndpoint(anEndpointName);
- if ( mListener == null ) {
+ UimaDefaultMessageListenerContainer[] mListeners =
+ getListenersForEndpoint(anEndpointName);
+ if ( mListeners == null ) {
return false;
}
return true;
}
+
}