You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/05/07 21:27:44 UTC

svn commit: r1831129 [1/5] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima/...

Author: cwiklik
Date: Mon May  7 21:27:43 2018
New Revision: 1831129

URL: http://svn.apache.org/viewvc?rev=1831129&view=rev
Log:
UIMA-5501

Added:
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessServiceInfoResponseCommand.java
Modified:
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Mon May  7 21:27:43 2018
@@ -181,7 +181,9 @@ public class JmsEndpointConnection_impl
           ServiceShutdownException, ConnectException {
 	  synchronized (lock) {
 		    try {
-
+if ( !controller.isPrimitive() && isReplyEndpoint ) {
+	System.out.println("JmsOutputChannel.openChannel()");
+}
 		        // If replying to http request, reply to a queue managed by this service broker using tcp
 		        // protocol
 		        if (isReplyEndpoint && brokerUri.startsWith("http")) {

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Mon May  7 21:27:43 2018
@@ -935,18 +935,26 @@ public class JmsInputChannel implements
     }
   }
   private void stopListener(final UimaDefaultMessageListenerContainer mL) throws Exception {
+	 // Thread.currentThread().dumpStack();
 	  System.out.println(".... "+getController().getComponentName()+" Stopping Listener Type:"+mL.getType());
 	  mL.setTerminating();
 	  mL.setAcceptMessagesWhileStopping(false);
 
 	  mL.stop();
-	  System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Calling destroy()");
+	  System.out.println(".... "+getController().getComponentName()+" Stopping "+mL.getType()+" Listener - Calling destroy()");
 	  System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Done Calling destroy()");
 
 	  if ( mL.getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
 		  System.out.println(".... "+getController().getComponentName()+" Stopping ThreadPoolTaskExecutor");
 		  ThreadPoolTaskExecutor tpe = ((ThreadPoolTaskExecutor)mL.getTaskExecutor());
-		  tpe.destroy();
+		  
+		  try {
+			  tpe.shutdown();
+			  
+		  } catch( Exception e) {
+			  e.printStackTrace();
+		  }
+
 
 	  } else {
 		  System.out.println(".... "+getController().getComponentName()+" ActiveConsumerCount:"+mL.getActiveConsumerCount());
@@ -969,6 +977,7 @@ public class JmsInputChannel implements
 		 
 		  
 	  }
+	 
 	  System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Stopped Task Executor");
 	  mL.shutdown();
 

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon May  7 21:27:43 2018
@@ -461,8 +461,15 @@ public class JmsOutputChannel implements
           
         }
         */
-        brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
-
+        /*
+        if ( anEndpoint.isFreeCasEndpoint() && anEndpoint.isCasMultiplier() && anEndpoint.isReplyEndpoint()) {
+            brokerConnectionURL = anEndpoint.getServerURI();
+        } else {
+            brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
+        }
+        */
+        brokerConnectionURL = anEndpoint.getServerURI();
+        
         String key = getLookupKey(anEndpoint);
         String destination = getDestinationName(anEndpoint);
 
@@ -1063,14 +1070,17 @@ public class JmsOutputChannel implements
     	if ( destination == null ) {
     		destination = anEndpoint.getDestination();
     	}
-    	System.out.println(".......... Service:"+getAnalysisEngineController().getComponentName()+" replying with GetMeta to reply queue:"+destination);
 
-    	
+    	  if ( analysisEngineController instanceof AggregateAnalysisEngineController ) {
+      		  System.out.println("Aggregate Controller replying with GetMeta");
+      	  }
+     
       anEndpoint.setReplyEndpoint(true);
       // Initialize JMS connection to given endpoint
       JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+  	System.out.println(".......... Service:"+getAnalysisEngineController().getComponentName()+" replying with GetMeta to reply queue:"+destination+" Broker:"+endpointConnection.getServerUri());
 
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendReply",
                 JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_produce_txt_msg__FINE",
                 new Object[] {});

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java Mon May  7 21:27:43 2018
@@ -49,7 +49,7 @@ import org.springframework.jms.listener.
 public class PriorityMessageHandler implements SessionAwareMessageListener {
 
 	private PriorityBlockingQueue<MessageWrapper> queue =
-			new PriorityBlockingQueue<MessageWrapper>();
+			new PriorityBlockingQueue<>();
 	private Semaphore targetedListenerSemaphore = null;
 	private Semaphore processListenerSemaphore = null;
 	

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java Mon May  7 21:27:43 2018
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.springframework.jms.support.destination.DestinationResolver;
 
 public class TempDestinationResolver implements DestinationResolver {
@@ -52,15 +53,20 @@ public class TempDestinationResolver imp
   public Destination resolveDestinationName(Session session, String destinationName,
           boolean pubSubDomain) throws JMSException {
 	  System.out.println("************ resolveDestinationName() Controller:"+serviceName+" Endpoint:"+endpoint+"************************");
-
-	  synchronized (mutex) {
-      if (destination == null) {
-        destination = session.createTemporaryQueue();
-        if (listener != null) {
-          listener.setDestination(destination);
-        }
-      }
-    }
+	  try {
+		  synchronized (mutex) {
+		    //  if (destination == null) {
+			  if ( listener.getDestination() == null || ((ActiveMQDestination)listener.getDestination()).isTemporary() ) {
+		        destination = session.createTemporaryQueue();
+		        if (listener != null) {
+		          listener.setDestination(destination);
+		        }
+		      }
+		  }
+	  } catch( Exception e) {
+		  e.printStackTrace();
+		  throw e;
+	  }
     return destination;
   }
 

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Mon May  7 21:27:43 2018
@@ -79,6 +79,7 @@ import org.springframework.jms.listener.
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.listener.SessionAwareMessageListener;
 import org.springframework.jms.support.JmsUtils;
+import org.springframework.jms.support.destination.CachingDestinationResolver;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
@@ -168,6 +169,7 @@ public class UimaDefaultMessageListenerC
 	  return transport;
   }
   public void setType(Type t) {
+	  System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>> Listener.setType() - assigning type:"+t);
 	  this.type = t;
   }
   public Type getType() {
@@ -180,7 +182,29 @@ public class UimaDefaultMessageListenerC
     this();
     this.freeCasQueueListener = freeCasQueueListener;
   }
-  
+  protected void refreshDestination() {
+		String destName = getDestinationName();
+		Destination d = getDestination();
+		if (destName != null) {
+			System.out.println(">>>>>>>>>>>>>>> refreshDestination() - destination:"+d+" DestName:"+destName+" Type:"+type);
+			DestinationResolver destResolver = getDestinationResolver();
+			if (destResolver instanceof CachingDestinationResolver) {
+				((CachingDestinationResolver) destResolver).removeFromCache(destName);
+			}
+			if( Type.Reply.equals(type) || Type.FreeCAS.equals(type)) {
+				try {
+					ActiveMQConnection c = (ActiveMQConnection)getSharedConnection();
+					Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+					getDestinationResolver().resolveDestinationName(s,"",false);
+				} catch( Exception e) {
+					e.printStackTrace();
+				}
+
+			}
+
+		}
+	//super.refreshDestination();
+  }
   public void setTargetedListener() {
 	  targetedListener = true;
   }
@@ -198,39 +222,38 @@ public class UimaDefaultMessageListenerC
    * Overriden Spring's method that tries to recover from lost connection. We dont 
    * want to recover when the service is stopping.
    */
-  protected void refreshConnectionUntilSuccessful() {
-	 // System.out.println("............refreshConnectionUntilSuccessful() called");
-	  while (isRunning()) {
-
-			try {
-				
-				if (sharedConnectionEnabled()) {
-					refreshSharedConnection();
-				} else {
-					Connection con = createConnection();
-					JmsUtils.closeConnection(con);
-				}
-				// Use UIMA-AS custom Destination Resolver to create a new temp queue for this reply listener
-				if( Type.Reply.equals(type) || Type.FreeCAS.equals(type)) {
-					getDestinationResolver().resolveDestinationName(getSharedConnection().createSession(false, Session.AUTO_ACKNOWLEDGE),"",false);
-				}
-				logger.info(getType().name()+" Listener Successfully refreshed JMS Connection to broker: "+getBrokerUrl()+" Endpoint:"+getDestination());
-				logListenerFailure = true;
-				break;
-			}
-			catch (Exception ex) {
-				
-				if (ex instanceof JMSException) {
-					if (logListenerFailure && ex.getCause() instanceof ConnectException) {
-						
-						logger.info(getType().name()+" Listener lost connection to broker: "+getBrokerUrl()+"- Retrying until successfull ...");
-						logListenerFailure = false;
-					} else {
-						invokeExceptionListener((JMSException) ex);
-					}
-				}
-			}
-	  }
+//  protected void refreshConnectionUntilSuccessful() {
+//	 System.out.println("............refreshConnectionUntilSuccessful() called - Listener Hashcode:"+this.hashCode()+ " Thread Name:"+Thread.currentThread().getName());;
+//	  while (isRunning()) {
+//
+//			try {
+//				
+//				if (sharedConnectionEnabled()) {
+//					refreshSharedConnection();
+//				} 
+//				// Use UIMA-AS custom Destination Resolver to create a new temp queue for this reply listener
+//				if( Type.Reply.equals(type) || Type.FreeCAS.equals(type)) {
+//					getDestinationResolver().resolveDestinationName(getSharedConnection().createSession(false, Session.AUTO_ACKNOWLEDGE),"",false);
+//					Thread.currentThread().dumpStack();
+//				}
+//				logger.info(getType().name()+" Listener Successfully refreshed JMS Connection to broker: "+getBrokerUrl()+" Endpoint:"+getDestination());
+//				logListenerFailure = true;
+//				break;
+//			}
+//			catch (Exception ex) {
+//				
+//				if (ex instanceof JMSException) {
+//					if (logListenerFailure && ex.getCause() instanceof ConnectException) {
+//						
+//						logger.info(getType().name()+" Listener lost connection to broker: "+getBrokerUrl()+"- Retrying until successfull ...");
+//						logListenerFailure = false;
+//					} else {
+//						ex.printStackTrace();
+//						invokeExceptionListener((JMSException) ex);
+//					}
+//				}
+//			}
+//	  }
 	  /*
 	  boolean doLogFailureMsg = true;
     try {
@@ -323,12 +346,12 @@ public class UimaDefaultMessageListenerC
     } catch( IllegalStateException e ) {
     }
     */
-  }
-  protected void recoverAfterListenerSetupFailure() {
-	  if ( !terminating ) {
-		  super.recoverAfterListenerSetupFailure();
-	  }
-  }
+  //}
+//  protected void recoverAfterListenerSetupFailure() {
+//	  if ( !terminating ) {
+//		  super.recoverAfterListenerSetupFailure();
+//	  }
+//  }
 
   public void setTerminating() {
     terminating = true;
@@ -635,16 +658,18 @@ public class UimaDefaultMessageListenerC
   /**
    * This method is called by Spring when a listener fails
    */
-  protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
-    if ( t.getCause() instanceof InterruptedException ) {
-//  	  System.out.println("............handleListenerFailure(Throwable t, boolean alreadyHandled) called - Cause:"+t);
-  	  return;
-    }
-    // If shutdown already, nothing to do
-	    // If controller is stopping no need to recover the connection
-    if (!super.isRunning() || awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
-      return;
-    }
+//  protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
+//    if ( t.getCause() instanceof InterruptedException ) {
+////  	  System.out.println("............handleListenerFailure(Throwable t, boolean alreadyHandled) called - Cause:"+t);
+//  	  return;
+//    }
+//    // If shutdown already, nothing to do
+//	    // If controller is stopping no need to recover the connection
+//    if (!super.isRunning() || awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
+//      return;
+//    }
+//    
+    
     /*
     if ( controller != null ) {
       controller.changeState(ServiceState.FAILED);
@@ -711,7 +736,7 @@ public class UimaDefaultMessageListenerC
       failed = true;
     }
     */
-  }
+//  }
   public Endpoint getEndpoint() {
 	  return endpoint;
   }
@@ -1120,18 +1145,21 @@ public class UimaDefaultMessageListenerC
 	  if ( isRunning()) {
 		  return;
 	  }
-	  int consumerThreadCount=-1;
-	  if ( getTaskExecutor() instanceof ThreadPoolTaskExecutor) {
+	//  int consumerThreadCount=-1;
+	  /*
+	  if ( !Type.ProcessCAS.equals(type) && getTaskExecutor() instanceof ThreadPoolTaskExecutor) {
 		  ((ThreadPoolTaskExecutor)getTaskExecutor()).initialize();
 		  // if this listener is a handling Process requests, the prestartAllCoreThreads() below
 		  // will force initialization of AEs if this is a primitive service.
 		  ((ThreadPoolTaskExecutor)getTaskExecutor()).getThreadPoolExecutor().prestartAllCoreThreads();
 	  }
+	  
+	  */
 	  super.afterPropertiesSet();
 	  super.initialize();
 	  super.start();
 	  
-      System.out.println(">>>>>>> Listener Service:"+controller.getComponentName()+" Broker URL:"+getBrokerUrl()+" Endpoint:"+__listenerRef.getEndpoint()+" ConsumerThreadCount:"+consumerThreadCount);
+      System.out.println(">>>>>>> Listener Service:"+controller.getComponentName()+" Broker URL:"+getBrokerUrl()+" Endpoint:"+__listenerRef.getEndpoint()+" ConsumerThreadCount:"+getConcurrentConsumers()+ " Type:"+getType());
 
   }
   private Object getPojoListener() {
@@ -1500,14 +1528,26 @@ public class UimaDefaultMessageListenerC
    * Called by Spring to inject TaskExecutor
    */
   public void setTaskExecutor(TaskExecutor aTaskExecutor) {
-    taskExecutor = aTaskExecutor;
+	  
+  taskExecutor = aTaskExecutor;
+    /*
+    if ( Type.ProcessCAS.equals(getType())) {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(1);
+        executor.setMaxPoolSize(1);
+        executor.setQueueCapacity(1);
+        executor.initialize();
+        super.setTaskExecutor(executor); 
+    } else {
+        super.setTaskExecutor(aTaskExecutor);
+    }
+    */
     super.setTaskExecutor(aTaskExecutor);
   }
-
   public TaskExecutor getTaskExecutor() {
-	return taskExecutor;
+
+	  return taskExecutor;
   }
-  
   /**
    * This method initializes ThreadPoolExecutor with a custom ThreadPool. Each thread produced by
    * the ThreadPool is used to first initialize an instance of the AE before the thread is added to

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Mon May  7 21:27:43 2018
@@ -109,6 +109,7 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
 import org.apache.uima.util.Level;
 import org.apache.xmlbeans.XmlDocumentProperties;
+import org.apache.xmlbeans.XmlOptions;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationEvent;
 import org.springframework.context.ApplicationListener;
@@ -165,14 +166,25 @@ public class BaseUIMAAsynchronousEngine_
   
   private Thread dispatchThread;
   
+  private Transport transport;
+  
   protected volatile boolean stopped = false;
   public BaseUIMAAsynchronousEngine_impl() {
+	  this(Transport.JMS);  // default
+	  /*
 	  super();
     UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
             "UIMA Version " + UIMAFramework.getVersionString() +
     " UIMA-AS Version " + UimaAsVersion.getVersionString());
+    */
+  }
+  public BaseUIMAAsynchronousEngine_impl(Transport transport) {
+	super();
+	this.transport = transport;
+    UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
+            "UIMA Version " + UIMAFramework.getVersionString() +
+    " UIMA-AS Version " + UimaAsVersion.getVersionString());
   }
-
   protected void beforeProcessReply(String casReferenceId) {
 	  if ( service != null ) {
 		  service.removeFromCache(casReferenceId);
@@ -698,13 +710,7 @@ public class BaseUIMAAsynchronousEngine_
 		// Create a singleton shared connection object
 		SharedConnection sharedConnection = sharedConnections.get(aBrokerURI);
 		sharedConnection.setConnectionFactory(connectionFactory);
-/*
-		new SharedConnection(
-				connectionFactory,
-				//new ActiveMQConnectionFactory(aBrokerURI),
-				aBrokerURI);
-		sharedConnection.setSemaphore(semaphore);
-	*/
+
 		// Add AMQ specific connection validator
 		sharedConnection
 				.setConnectionValidator(connectionValidator);
@@ -798,7 +804,7 @@ public class BaseUIMAAsynchronousEngine_
   private void addPrefetch(ActiveMQConnection aConnection) {
     ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
     prefetchPolicy.setQueuePrefetch(5);
-    ((ActiveMQConnection) aConnection).setPrefetchPolicy(prefetchPolicy);
+    aConnection.setPrefetchPolicy(prefetchPolicy);
   }
 
   protected SharedConnection validateConnection(String aBrokerURI) throws Exception {
@@ -971,13 +977,17 @@ public class BaseUIMAAsynchronousEngine_
     reset();
     Properties performanceTuningSettings = new Properties();
     fetchRequiredProperties(anApplicationContext, performanceTuningSettings);
-    transport = (Transport)anApplicationContext.get(UimaAsynchronousEngine.ClientTransport);
+    //transport = (Transport)anApplicationContext.get(UimaAsynchronousEngine.ClientTransport);
     if ( Transport.JMS.equals(transport)) {
     	initializeJMS(anApplicationContext);
     } else if ( Transport.Java.equals(transport)) {
     	if ( service == null ) {
     		service = UimaAsServiceRegistry.getInstance().lookupByEndpoint(endpoint);
     	}
+    	if ( !(service instanceof AsynchronousUimaASService ) ) {
+    		// WRONG SERVICE TYPE FOR THIS CLIENT
+    		throw new ResourceInitializationException(); 
+    	}
     	brokerURI = "java";
     	initializeLocal(anApplicationContext);
     	
@@ -1145,7 +1155,7 @@ public class BaseUIMAAsynchronousEngine_
   }
 	private AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
 		org.apache.xmlbeans.XmlOptions options = new org.apache.xmlbeans.XmlOptions();
-		
+		options.setValidateOnSet();
 		
 		XMLReader xmlReader = XMLReaderFactory.createXMLReader();
 		xmlReader.setFeature("http://xml.org/sax/features/external-general-entities", false);
@@ -1165,23 +1175,13 @@ public class BaseUIMAAsynchronousEngine_
 		 provider = UimaAsDirectServiceBuilder.resolvePlaceholder(provider);
 		 System.out.println("...... provider() - "+provider);
 
-		 if (Provider.JAVA.get().equals(provider)) {
-			 return Provider.JAVA;
-		 } else if (Provider.ACTIVEMQ.get().equals(provider)) {
-			 return Provider.ACTIVEMQ;
+		 if (org.apache.uima.as.deployer.ServiceDeployers.Provider.JAVA.get().equals(provider)) {
+			 return org.apache.uima.as.deployer.ServiceDeployers.Provider.JAVA;
+		 } else if (org.apache.uima.as.deployer.ServiceDeployers.Provider.ACTIVEMQ.get().equals(provider)) {
+			 return org.apache.uima.as.deployer.ServiceDeployers.Provider.ACTIVEMQ;
 		 } else {
-			 throw new RuntimeException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
+			 throw new IllegalArgumentException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
 		 }
-
-		 /*
-		 if (Provider.JAVA.get().equals(provider)) {
-			 return Provider.JAVA;
-		 } else if (Provider.ACTIVEMQ.get().equals(provider)) {
-			 return Provider.ACTIVEMQ;
-		 } else {
-			 throw new RuntimeException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
-		 }
-		 */
 	}
 	private Protocol protocol( AnalysisEngineDeploymentDescriptionDocument dd) {
 	     String protocol =
@@ -1190,23 +1190,14 @@ public class BaseUIMAAsynchronousEngine_
 		 protocol = UimaAsDirectServiceBuilder.resolvePlaceholder(protocol);
 		 
 		 System.out.println("...... protocol() - "+protocol);
-	     if (Protocol.JAVA.get().equalsIgnoreCase(protocol) ) {
-			 return Protocol.JAVA;
-		 } else if (Protocol.JMS.get().equalsIgnoreCase(protocol)  ) {
-			 return Protocol.JMS;
+	     if (org.apache.uima.as.deployer.ServiceDeployers.Protocol.JAVA.get().equalsIgnoreCase(protocol) ) {
+			 return org.apache.uima.as.deployer.ServiceDeployers.Protocol.JAVA;
+		 } else if (org.apache.uima.as.deployer.ServiceDeployers.Protocol.JMS.get().equalsIgnoreCase(protocol)  ) {
+			 return org.apache.uima.as.deployer.ServiceDeployers.Protocol.JMS;
 		 } else {
 			 throw new RuntimeException("Invalid protocol attribute value in Deployment Descriptor :{"+protocol+"} please check <deployment> element. Expected \"java\" or \"jms\"");
 		 }
 
-		 /*
-	     if (Protocol.JAVA.get().equals(protocol)) {
-			 return Protocol.JAVA;
-		 } else if (Protocol.JMS.get().equals(protocol)) {
-			 return Protocol.JMS;
-		 } else {
-			 throw new RuntimeException("Invalid protocol attribute value in Deployment Descriptor :{"+protocol+"} please check <deployment> element. Expected \"java\" or \"jms\"");
-		 }
-		 */
 	}
   /**
    * First generates a Spring context from a given deploy descriptor and than deploys the context
@@ -1220,38 +1211,81 @@ public class BaseUIMAAsynchronousEngine_
    * @return - a unique spring container id
    * 
    */
-  public String deploy(String deploymentDescriptorPath, Map anApplicationContext) throws Exception {
-	  AnalysisEngineDeploymentDescriptionDocument dd =
-	            parseDD(deploymentDescriptorPath);
-		  
-		  XmlDocumentProperties dp = dd.documentProperties();
-		  System.out.println(dp.getSourceName());
-
-		  // Use factory to create deployer instance for a given protocol and provider
-		  UimaAsServiceDeployer deployer = 
-				  ServiceDeployers.newDeployer(protocol(dd), provider(dd));
-		  
-		  service = deployer.deploy(dd, anApplicationContext);
-		  
-		  UimaAsServiceRegistry.getInstance().register(service);
+	public String deploy(String deploymentDescriptorPath, Map anApplicationContext) throws Exception {
+		// parse provided deployment descriptor and produce object representation for it
+		AnalysisEngineDeploymentDescriptionDocument dd = parseDD(deploymentDescriptorPath);
+/*		
+		ArrayList<?> validationErrors = new ArrayList<>();
+		XmlOptions validationOptions = new XmlOptions();
+		validationOptions.setErrorListener(validationErrors);
+		validationOptions.setLoadLineNumbers(XmlOptions.LOAD_LINE_NUMBERS_END_ELEMENT);
+		validationOptions.setLoadLineNumbers();
+		boolean isValid = dd.validate(validationOptions);
+		if ( !isValid ) {
+			 Iterator<?> iter = validationErrors.iterator();
+			 System.out.println("*** *** *** *** *** *** *** *** *** ***");
+			    while (iter.hasNext())
+			    {
+			        System.out.println("*** DD Validation ERROR>> " + iter.next() + "\n");
+			    }
+			    throw new AsynchAEException("*** ERROR deployment descriptor validation failed");
+		}
+		*/
+		XmlDocumentProperties dp = dd.documentProperties();
+		System.out.println(dp.getSourceName());
+		
+		String protocolOverride = null;
+		if ( anApplicationContext!= null && anApplicationContext.containsKey(UimaAsynchronousEngine.Protocol) ) {
+			protocolOverride = (String)anApplicationContext.get(UimaAsynchronousEngine.Protocol);
+		}
+		String providerOverride = null;
+		if ( anApplicationContext!= null && anApplicationContext.containsKey(UimaAsynchronousEngine.Provider)) {
+			providerOverride = (String)anApplicationContext.get(UimaAsynchronousEngine.Provider);
+		}
+		UimaAsServiceDeployer deployer;
+		// if client does not override provider and protocol, use
+		// the DD settings 
+		if ( protocolOverride == null && providerOverride == null) {
+			// Use factory to create deployer instance for a given 
+			// protocol and provider defined in the DD
+			deployer = 
+					ServiceDeployers.newDeployer(protocol(dd), provider(dd));
+		} else {
+			Protocol deploymentProtocol = null;
+			try {
+				deploymentProtocol =
+						org.apache.uima.as.deployer.ServiceDeployers.Protocol.valueOf(protocolOverride.toUpperCase());
+			} catch( IllegalArgumentException e) {
+				System.out.println("***\n*** ERROR specified  protocol not supported. Only 'java' and 'jms' are expected. You've provided "+protocolOverride+"\n***");
+				throw e;
+			}
+			Provider deploymentProvider = null;
+			try {
+				deploymentProvider =
+						org.apache.uima.as.deployer.ServiceDeployers.Provider.valueOf(providerOverride.toUpperCase());
+
+			} catch( IllegalArgumentException e) {
+				System.out.println("***\n*** ERROR specified provider not supported. Only 'java' and 'activemq' are expected. You've provided "+providerOverride+" \n***");
+				throw e;
 
-		  return service.getId();
+			}
+			deployer = 
+					ServiceDeployers.newDeployer(deploymentProtocol, deploymentProvider);
+		}
 
-  }
-  
-  protected UimaASService getServiceReference()  {
-	  return service;
-  }
+		service = deployer.deploy(dd, anApplicationContext);
+
+		UimaAsServiceRegistry.getInstance().register(service);
+
+		return service.getId();
+
+	}
   
-  private void disposeContextFiles(String ...contextFiles) {
-    for( String contextFile: contextFiles) {
-      File file = new File(contextFile);
-      if ( file.exists()) {
-        file.delete();
-      }
-    }
-  }
-  /**
+	protected UimaASService getServiceReference() {
+		return service;
+	}
+
+	 /**
 	 * 
 	 */
   public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
@@ -1334,8 +1368,8 @@ public class BaseUIMAAsynchronousEngine_
     */
   }
 
-  public void undeploy(String aSpringContainerId) throws Exception {
-    this.undeploy(aSpringContainerId, SpringContainerDeployer.STOP_NOW);
+  public void undeploy(String serviceId) throws Exception {
+    this.undeploy(serviceId, SpringContainerDeployer.STOP_NOW);
   }
 
   /**
@@ -1343,124 +1377,36 @@ public class BaseUIMAAsynchronousEngine_
    * registered in the local registry under a unique id.
    * 
    */
-  public void undeploy(String aSpringContainerId, int stop_level) throws Exception {
-    if (aSpringContainerId == null  ) {
+  public void undeploy(String serviceId, int stop_level) throws Exception {
+    if (serviceId == null  ) {
       return;
     }
     
 //    UimaEEAdminSpringContext adminContext = null;
     final UimaASService deployedService =
-    		UimaAsServiceRegistry.getInstance().lookupById(aSpringContainerId);
+    		UimaAsServiceRegistry.getInstance().lookupById(serviceId);
     if ( deployedService == null ) {
         throw new InvalidContainerException(
                 "Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
       }
     switch (stop_level) {
-    case SpringContainerDeployer.QUIESCE_AND_STOP:
-      //((AnalysisEngineController) ctrer).quiesceAndStop();
-        //service.stop();
+    case UimaASService.QUIESCE_AND_STOP:
     	deployedService.quiesce();
 
       break;
-    case SpringContainerDeployer.STOP_NOW:
-     // ((AnalysisEngineController) ctrer).terminate();
-        //service.stop();
+    case UimaASService.STOP_NOW:
         deployedService.stop();
-        
- 
-      break;
+        break;
+    default:
+        throw new UnsupportedOperationException(
+                "Unsupported argument value in the undeploy() call. Please use stop level "
+                        + "UimaASService.QUIESCE_AND_STOP" + " OR " + "UimaASService.STOP_NOW"
+                        + " as an argument to undeploy() method.");
+
+     
   }
     UimaAsServiceRegistry.getInstance().unregister(deployedService);
-    /*
-//    if (!springContainerRegistry.containsKey(aSpringContainerId)) {
-//        return;
-//        // throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId +
-//        // ". Unable to undeploy the Spring container");
-//      }
-//      // Fetch an administrative context which contains a Spring Container
-//      adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
-//      if (adminContext == null) {
-//        throw new InvalidContainerException(
-//                "Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
-//      }
-      // Fetch instance of the Container from its context
-      ApplicationContext ctx = adminContext.getSpringContainer();
-      // Query the container for objects that implement
-      // ControllerLifecycle interface. These
-      // objects are typically of type AnalysisEngineController or
-      // UimacppServiceController.
-      String[] asyncServiceList = ctx
-              .getBeanNamesForType(org.apache.uima.aae.controller.ControllerLifecycle.class);
-      // Given a valid list of controllers select the first from the list
-      // and
-      // initiate a shutdown. We don't care which controller will be
-      // invoked. In case of
-      // AggregateAnalysisEngineController the terminate event will
-      // propagate all the way
-      // to the top controller in the hierarchy and the shutdown will take
-      // place from there.
-      // If the controller is of kind UimecppServiceController or
-      // PrimitiveAnalysisController
-      // the termination logic will be immediately triggered in the
-      // terminate() method.
-      if (asyncServiceList != null && asyncServiceList.length > 0) {
-        boolean topLevelController = false;
-        ControllerLifecycle ctrer = null;
-        int indx = 0;
-        while (!topLevelController) {
-          ctrer = (ControllerLifecycle) ctx.getBean(asyncServiceList[indx++]);
-          if (ctrer instanceof UimacppServiceController
-                  || ((AnalysisEngineController) ctrer).isTopLevelComponent()) {
-            topLevelController = true;
-          }
-        }
-        // Send a trigger to initiate shutdown.
-        if (ctrer != null) {
-          if (ctrer instanceof AnalysisEngineController &&
-        		  ((AnalysisEngineController) ctrer).getControllerLatch() != null ) {
-            ((AnalysisEngineController) ctrer).getControllerLatch().release();
-          }
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-              String msg = "++++++++++++++++++++++ calling terminate()-service:"+((AnalysisEngineController) ctrer).getComponentName();
-              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
-                     JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
-                      new Object[] { msg });
-          }
-          switch (stop_level) {
-            case SpringContainerDeployer.QUIESCE_AND_STOP:
-                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-                    String msg = "++++++++++++++++++++++ calling quiesceAndStop()";
-                    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
-                           JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
-                            new Object[] { msg });
-                }
-              ((AnalysisEngineController) ctrer).quiesceAndStop();
-
-              break;
-            case SpringContainerDeployer.STOP_NOW:
-                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-                    String msg = "++++++++++++++++++++++ calling terminate()";
-                    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
-                           JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
-                            new Object[] { msg });
-                }
-              ((AnalysisEngineController) ctrer).terminate();
-              break;
-          }
-        }
-      }
-      if (ctx instanceof FileSystemXmlApplicationContext) {
-        ((FileSystemXmlApplicationContext) ctx).destroy();
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-            String msg = "---------------------- Destroying Application Context:"+((FileSystemXmlApplicationContext) ctx).getApplicationName();
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
-                   JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
-                    new Object[] { msg });
-        }
-      }
-      // Remove the container from a local registry
-      springContainerRegistry.remove(aSpringContainerId);
-      */
+
   }
   private void initJMX() throws Exception {
 	  	// Generate unique identifier
@@ -1715,33 +1661,7 @@ public class BaseUIMAAsynchronousEngine_
 	                "UIMAJMS_exception__WARNING", e);
 	      }
 	    }
-	    /*
-	  PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.Stop);
-	     msg.addProperty(AsynchAEMessage.Destination, cmFreeCasQueue);
-		 msg.addProperty(AsynchAEMessage.CasReference, casReferenceId);
-	     try {
-	    	 if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                         "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                         "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue});
-             }
-		     sender.dispatchMessage(msg, this, false);
- 
-	     } catch (Exception ex) {
-	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-	              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-	                      "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-	                      "UIMAJMS_exception__WARNING",
-	                      ex);
-	          }
-	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                          "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
-              }
-	     }
 
-	 */
   }
   protected void dispatchFreeCasRequest(String casReferenceId, Message message) throws Exception {
      PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.ReleaseCAS);

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java Mon May  7 21:27:43 2018
@@ -19,24 +19,22 @@
 
 package org.apache.uima.adapter.jms.service;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
 import java.io.InvalidClassException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Scanner;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UimaASApplicationExitEvent;
 import org.apache.uima.aae.UimaAsVersion;
-import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.client.UimaAS;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
 import org.apache.uima.aae.jmx.monitor.BasicUimaJmxMonitorListener;
 import org.apache.uima.aae.jmx.monitor.JmxMonitor;
 import org.apache.uima.aae.jmx.monitor.JmxMonitorListener;
 import org.apache.uima.aae.service.UimaASService;
-import org.apache.uima.aae.service.UimaAsServiceRegistry;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
@@ -60,6 +58,19 @@ public class UIMA_Service implements App
   private JmxMonitor monitor = null;
 
   private Thread monitorThread = null;
+  
+  private void setDefaultBrokerURL(String[] args) {
+	    String brokerURL = getArg("-brokerURL", args);
+	    // Check if broker URL is specified on the command line. If it is not, use the default
+	    // localhost:61616. In either case, set the System property defaultBrokerURL. It will be used
+	    // by Spring Framework to substitute a place holder in Spring xml.
+	    if (brokerURL != "") {
+	      System.setProperty("defaultBrokerURL", brokerURL);
+	      System.out.println(">>> Setting defaultBrokerURL to:" + brokerURL);
+	    } else if ( System.getProperty("defaultBrokerURL") == null) {  
+	      System.setProperty("defaultBrokerURL", "tcp://localhost:61616");
+	    }
+  }
 
   /**
    * Parse command args, run dd2spring on the deployment descriptors to generate Spring context
@@ -74,99 +85,32 @@ public class UIMA_Service implements App
     UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
             "UIMA-AS version " + UimaAsVersion.getFullVersionString());
 
-    int nbrOfArgs = args.length;
-    String[] springConfigFileArray;
     String[] deploymentDescriptors = getMultipleArg("-d", args);
     if (deploymentDescriptors.length == 0) {
       // allow multiple args for one key
       deploymentDescriptors = getMultipleArg2("-dd", args);
     }
-    /*
-    String saxonURL = getArg("-saxonURL", args);
-    String xslTransform = getArg("-xslt", args);
-    String uimaAsDebug = getArg("-uimaEeDebug", args);
-
-    if (nbrOfArgs < 1
-            || (args[0].startsWith("-") && (deploymentDescriptors.length == 0
-                    || saxonURL.equals("") || xslTransform.equals("")))) {
-      printUsageMessage();
-      return null;
-    }
-    */
-    String brokerURL = getArg("-brokerURL", args);
-    // Check if broker URL is specified on the command line. If it is not, use the default
-    // localhost:61616. In either case, set the System property defaultBrokerURL. It will be used
-    // by Spring Framework to substitute a place holder in Spring xml.
-    if (brokerURL != "") {
-      System.setProperty("defaultBrokerURL", brokerURL);
-      System.out.println(">>> Setting defaultBrokerURL to:" + brokerURL);
-    } else if ( System.getProperty("defaultBrokerURL") == null) {  // perhaps already set using -D
-      System.setProperty("defaultBrokerURL", "tcp://localhost:61616");
-    }
+    
+    setDefaultBrokerURL(args);
 
     if (System.getProperty(JmsConstants.SessionTimeoutOverride) != null) {
       System.out.println(">>> Setting Inactivity Timeout To: "
               + System.getProperty(JmsConstants.SessionTimeoutOverride));
     }
-    /*
-    if (deploymentDescriptors.length == 0) {
-      // array of context files passed in
-      springConfigFileArray = args;
-    } else {
-      // create a String array of spring context files
-      springConfigFileArray = new String[deploymentDescriptors.length];
-
-      Dd2spring aDd2Spring = new Dd2spring();
-      for (int dd = 0; dd < deploymentDescriptors.length; dd++) {
-        String deploymentDescriptor = deploymentDescriptors[dd];
-
-        File springConfigFile = aDd2Spring.convertDd2Spring(deploymentDescriptor, xslTransform,
-                saxonURL, uimaAsDebug);
-
-        // if any are bad, fail
-        if (null == springConfigFile) {
-          return null;
-        }
-        springConfigFileArray[dd] = springConfigFile.getAbsolutePath();
 
-        // get the descriptor to register with the engine controller
-        String deployDescriptor = "";
-        File afile = null;
-        FileInputStream fis = null;
-        try {
-          afile = new File(deploymentDescriptor);
-          fis = new FileInputStream(afile);
-          byte[] bytes = new byte[(int) afile.length()];
-          fis.read(bytes);
-          deployDescriptor = new String(bytes);
-          // Log Deployment Descriptor
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "main",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_deploy_desc__FINEST",
-                  new Object[] { deployDescriptor });
-        } catch (IOException e) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_exception__WARNING", e);
-          }
-        } finally {
-          if (fis != null) {
-            try {
-              fis.close();
-            } catch (IOException e) {
-            }
-          }
-        }
-      }
-    }
-    
-    return springConfigFileArray;
-    */
     return deploymentDescriptors;
   }
+  /**
+   * @deprecated Spring context files are no longer generated or used in favor of direct parsing of deployment descriptors.
+   * @param springContextFiles
+   * @param listener
+   * @return
+   * @throws Exception
+   */
   public SpringContainerDeployer deploy(String[] springContextFiles, ApplicationListener<ApplicationEvent> listener) throws Exception {
-	    SpringContainerDeployer springDeployer;
+	    SpringContainerDeployer springDeployer = null;
 	    
+	    /*
 	    if ( listener == null ) {
 	    	springDeployer = new SpringContainerDeployer(this);
         } else {
@@ -193,7 +137,7 @@ public class UIMA_Service implements App
 	    FileSystemXmlApplicationContext context = springDeployer.getSpringContext();
 	    context.addApplicationListener(this);
 	    springDeployer.startListeners();
-	    
+	    */
 	    return springDeployer;
 	  
   }
@@ -411,15 +355,11 @@ public class UIMA_Service implements App
 
   /**
    * The main routine for starting the deployment of a UIMA-AS instance. The args are either: 1 or
-   * more "paths" to Spring XML descriptors representing the information needed or some number of
+   * more "paths" to DD XML descriptors representing the information needed or some number of
    * parameters, preceeded by a "-" sign. If the first arg doesn't start with a "-" it is presumed
    * to be the first format.
    * 
-   * For the 2nd style, the arguments are: -saxonURL a-URL-to-the-saxon-jar usually starting with
-   * "file:", -xslt path-to-the-dd2spring.xsl file, -d path-to-UIMA-deployment-descriptor [-d
-   * path-to-another-dd ...] these arguments may be in any order)
-   * 
-   * For the 3rd style, like #2 but with multiple dd-files following a single -dd Useful for calling
+   * For the 2nd style, like #2 but with multiple dd-files following a single -dd Useful for calling
    * from scripts.
    * 
    * @param args
@@ -427,43 +367,18 @@ public class UIMA_Service implements App
   public static void main(String[] args) {
     try {
       UIMA_Service service = new UIMA_Service();
-      /*
-      // parse command args and run dd2spring to generate spring context
-      // files from deployment descriptors
-      String contextFiles[] = service.initialize(args);
-      // If no context files generated there is nothing to do
-      if (contextFiles == null) {
-        return;
-      }
-      */
-      String dd[] = service.initialize(args);
-      /*
-      // Deploy components defined in Spring context files. This method blocks until
-      // the container is fully initialized and all UIMA-AS components are succefully
-      // deployed.
-      SpringContainerDeployer serviceDeployer = service.deploy(contextFiles);
-      if (serviceDeployer == null) {
-        System.out.println(">>> Failed to Deploy UIMA Service. Check Logs for Details");
-        System.exit(0);
-      }
-      
-      // remove temporary spring context files generated from DD
-      for( String contextFile: contextFiles) {
-        File file = new File(contextFile);
-        if ( file.exists()) {
-          file.delete();
-        }
-      }
-      */
-      BaseUIMAAsynchronousEngine_impl engine = 
-    		  new BaseUIMAAsynchronousEngine_impl();
+      // fetch deployment descriptors from the command line
+      String[] dd = service.initialize(args);
+
+      UimaAsynchronousEngine uimaAS = 
+    		  UimaAS.newInstance(Transport.JMS);
       
+      List<String> serviceList = new ArrayList<>();
       for( String deploymentDescriptorPath : dd ) {
-    	  engine.deploy(deploymentDescriptorPath, new HashMap<>());
+    	  serviceList.add(uimaAS.deploy(deploymentDescriptorPath, new HashMap<>()));
       }
       // Add a shutdown hook to catch kill signal and to force quiesce and stop
-      ServiceShutdownHook shutdownHook = new ServiceShutdownHook(engine);
-//      ServiceShutdownHook shutdownHook = new ServiceShutdownHook(serviceDeployer);
+      ServiceShutdownHook shutdownHook = new ServiceShutdownHook(uimaAS);
       Runtime.getRuntime().addShutdownHook(shutdownHook);
       // Check if we should start an optional JMX-based monitor that will provide service metrics
       // The monitor is enabled by existence of -Duima.jmx.monitor.interval=<number> parameter. By
@@ -476,39 +391,43 @@ public class UIMA_Service implements App
         service.startMonitor(Long.parseLong(monitorCheckpointFrequency));
       }
       
-//      AnalysisEngineController topLevelControllor = serviceDeployer.getTopLevelController();
       String prompt = "Press 'q'+'Enter' to quiesce and stop the service or 's'+'Enter' to stop it now.\nNote: selected option is not echoed on the console.";
-  //    if (topLevelControllor != null) {
-        System.out.println(prompt);
+       System.out.println(prompt);
         // Loop forever or until the service is stopped
-        while ( engine.isRunning() ) {
-//            while (!topLevelControllor.isStopped()) {
+        boolean stop = false;
+        while ( !stop) {
+        	Scanner in = null;
+        	try {
+               	in = new Scanner(System.in);
+            	String cmd = in.nextLine();
+            	System.out.println("You've Entered .... "+cmd);
+            	
+            	if ( cmd.equalsIgnoreCase("s")) {
+            		System.out.println("Calling STOP....");
+            		for( String serviceId : serviceList ) {
+            			uimaAS.undeploy(serviceId, UimaASService.STOP_NOW);
+            		}
+            		
+            		stop = true;
+            	//	System.exit(0);
+            	} else if ( cmd.equalsIgnoreCase("q") ) {
+            		for( String serviceId : serviceList ) {
+                   		System.out.println("Calling QUIT....");
+                   	 
+                   		uimaAS.undeploy(serviceId, UimaASService.QUIESCE_AND_STOP);
+            		}
+
+            		stop = true;
+
+            	}
+        	} finally {
+        		if ( in != null ) {
+        			in.close();
+        		}
+        	}
+        }
+		System.exit(0);
 
-          if (System.in.available() > 0) {
-            int c = System.in.read();
-            if (c == 's') {
-              service.stopMonitor();
-              engine.undeploy();
-              //serviceDeployer.undeploy(SpringContainerDeployer.STOP_NOW);
-              System.exit(0);
-            } else if (c == 'q') {
-            	engine.undeploy();
-              //service.stopMonitor();
-             // serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
-              System.exit(0);
-
-            } else if (Character.isLetter(c) || Character.isDigit(c)) {
-              System.out.println(prompt);
-            }
-          }
-          // This is a polling loop. Sleep for 1 sec
-//          try {
-//        	if (!topLevelControllor.isStopped()) 
-//              Thread.sleep(1000);
-//          } catch (InterruptedException ex) {
-//          }
-        } // while
-      //}
     } catch (Exception e) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
@@ -520,13 +439,14 @@ public class UIMA_Service implements App
   
   static class ServiceShutdownHook extends Thread {
     public SpringContainerDeployer serviceDeployer;
-    BaseUIMAAsynchronousEngine_impl engine;
+
+    private UimaAsynchronousEngine client;
     
     public ServiceShutdownHook(SpringContainerDeployer serviceDeployer) {
       this.serviceDeployer = serviceDeployer;
     }
-    public ServiceShutdownHook(BaseUIMAAsynchronousEngine_impl engine) {
-        this.engine = engine;
+    public ServiceShutdownHook(UimaAsynchronousEngine client) {
+        this.client = client;
     }
     public void run() {
       try {
@@ -538,7 +458,7 @@ public class UIMA_Service implements App
                 "UIMAJMS_caught_signal__INFO", new Object[] { "TopLevelService" });
 //          "UIMAJMS_caught_signal__INFO", new Object[] { topLevelController.getComponentName() });
       	 // serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
-    	  engine.undeploy();
+    	  client.undeploy();
 
       	  Runtime.getRuntime().halt(0);
     	  //} 

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java Mon May  7 21:27:43 2018
@@ -18,6 +18,7 @@
  */
 package org.apache.uima.adapter.jms.service;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -26,9 +27,12 @@ import java.util.concurrent.ThreadFactor
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.AsynchAECasManager_impl;
 import org.apache.uima.aae.InProcessCache;
 import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.UimaAsPriorityBasedThreadFactory;
 import org.apache.uima.aae.UimaAsThreadFactory;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -44,14 +48,17 @@ import org.apache.uima.aae.handler.input
 import org.apache.uima.aae.service.AbstractUimaASService;
 import org.apache.uima.aae.service.ScaleoutSpecification;
 import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
 import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.PriorityMessageHandler;
 import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
 import org.apache.uima.adapter.jms.service.builder.ActiveMQFactory;
 import org.apache.uima.adapter.jms.service.builder.JmsMessageListenerBuilder;
 import org.apache.uima.as.client.Listener.Type;
 import org.apache.uima.resource.ResourceManager;
 import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.util.Level;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 public class UimaASJmsService extends AbstractUimaASService 
@@ -189,31 +196,76 @@ implements UimaASService {
 		}
 		return true;
 	}
-	private UimaDefaultMessageListenerContainer createListener(Type type, int scaleout) throws Exception{
+	private UimaDefaultMessageListenerContainer createListener(Type type, int consumerCount) throws Exception{
 		if ( inputChannel == null ) {
 			withInputChannel();
 		}
 		if ( outputChannel == null ) {
 			withOutputChannel();
-			outputChannel.setServerURI(getBrokerURL());
 		}
-		ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
-		if (controller.isPrimitive() && Type.ProcessCAS.equals(type)) {
+		PriorityMessageHandler h = null;
+		
+		ThreadPoolTaskExecutor jmsListenerThreadExecutor = 
+				new ThreadPoolTaskExecutor();
+		
+		
+		if ( Type.ProcessCAS.equals(type)) {
+			outputChannel.setServerURI(getBrokerURL());
+/*
 			
 			 // Create a Custom Thread Factory. Provide it with an instance of
 		      // PrimitiveController so that every thread can call it to initialize
 		      // the next available instance of a AE.
-		      ThreadFactory tf = new UimaAsThreadFactory().
-		    		  withThreadGroup(Thread.currentThread().getThreadGroup()).
-		    		  withPrimitiveController((PrimitiveAnalysisEngineController) controller).
-		    		  withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
-		    		  withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
-		      ((UimaAsThreadFactory)tf).setDaemon(true);
-		      // This ThreadExecutor will use custom thread factory instead of defult one
-		      ((ThreadPoolTaskExecutor) threadExecutor).setThreadFactory(tf);
-		}
-		threadExecutor.setCorePoolSize(scaleout);
-		threadExecutor.setMaxPoolSize(scaleout);
+//		      ThreadFactory tf = new UimaAsThreadFactory().
+//		    		  withThreadGroup(Thread.currentThread().getThreadGroup()).
+//		    		  withPrimitiveController((PrimitiveAnalysisEngineController) controller).
+//		    		  withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
+//		    		  withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
+//		      ((UimaAsThreadFactory)tf).setDaemon(true);
+		      
+		*/      
+		      if ( controller.isPrimitive() ) {
+				  h = new PriorityMessageHandler(consumerCount);
+				  ThreadPoolTaskExecutor threadExecutor = 
+						  new ThreadPoolTaskExecutor();
+	              controller.setThreadFactory(threadExecutor);
+	              
+				  latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount);
+			      // Create a Custom Thread Factory. Provide it with an instance of
+			      // PrimitiveController so that every thread can call it to initialize
+			      // the next available instance of a AE.
+				  ThreadFactory tf = 
+						  new UimaAsPriorityBasedThreadFactory(Thread.currentThread().
+								  getThreadGroup(), controller, latchToCountNumberOfTerminatedThreads)
+				          .withQueue(h.getQueue()).withChannel(controller.getInputChannel(ENDPOINT_TYPE.JMS));
+				     
+				  
+				  ((UimaAsPriorityBasedThreadFactory)tf).setDaemon(true);
+				  // This ThreadExecutor will use custom thread factory instead of default one
+				   threadExecutor.setThreadFactory(tf);
+				   threadExecutor.setCorePoolSize(consumerCount);
+				   threadExecutor.setMaxPoolSize(consumerCount);
+
+				  // Initialize the thread pool
+				  threadExecutor.initialize();
+
+				  // Make sure all threads are started. This forces each thread to call
+				  // PrimitiveController to initialize the next instance of AE
+				  threadExecutor.getThreadPoolExecutor().prestartAllCoreThreads();
+			      // This ThreadExecutor will use custom thread factory instead of default one
+//			      threadExecutor.setThreadFactory(tf);
+			     
+		    	  
+		      }
+			
+		} 
+		jmsListenerThreadExecutor.setCorePoolSize(consumerCount);
+		jmsListenerThreadExecutor.setMaxPoolSize(consumerCount);
+		jmsListenerThreadExecutor.initialize();
+		
+		
+//		threadExecutor.setCorePoolSize(consumerCount);
+//		threadExecutor.setMaxPoolSize(consumerCount);
 		
 		// destination can be NULL if this listener is meant for a 
 		// a temp queue. Such destinations are created on demand 
@@ -232,13 +284,15 @@ implements UimaASService {
 				listenerBuilder.withController(controller)
 		       			.withType(type)
 						.withConectionFactory(factory)
-						.withThreadPoolExecutor(threadExecutor)
-						.withConsumerCount(scaleout)
+						.withThreadPoolExecutor(jmsListenerThreadExecutor)
+						.withConsumerCount(consumerCount)
 						.withInputChannel(inputChannel)
+						.withPriorityMessageHandler(h)
 						.withSelector(getSelector(type))
 						.withDestination(destination)
 						.build();
 		messageListener.setReceiveTimeout(500);
+//		messageListener.setMessageListener(h);
 		return messageListener;
 	}
 	public HandlerBase getMessageHandler(AnalysisEngineController controller) {
@@ -261,14 +315,14 @@ implements UimaASService {
 		}
 		return metaHandler;
 	}
-	public UimaASJmsService build(int scaleout) throws Exception {
+	public UimaASJmsService build(int consumerCount) throws Exception {
 		// First create Connection Factory. This is needed by
 		// JMS listeners.
 		createConnectionFactory();
 		// counts number of initialized threads
-		latchToCountNumberOfInitedThreads = new CountDownLatch(scaleout);
+		latchToCountNumberOfInitedThreads = new CountDownLatch(consumerCount);
 		// counts number of terminated threads
-		latchToCountNumberOfTerminatedThreads = new CountDownLatch(scaleout);
+		latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount);
 		// Add one instance of JmsOutputChannel 
 		if ( controller.getOutputChannel(ENDPOINT_TYPE.JMS) == null ) {
 			withOutputChannel();
@@ -296,9 +350,69 @@ implements UimaASService {
 		
 		// listener to handle process CAS requests
 		UimaDefaultMessageListenerContainer processListener 
-		    = createListener(Type.ProcessCAS, scaleout);
+		    = createListener(Type.ProcessCAS, consumerCount);
 		inputChannel.addListenerContainer(processListener);
 		
+		
+		
+		 
+		
+		
+		  String targetStringSelector = "";
+		  if ( System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) != null ) {
+			  targetStringSelector = System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty);
+		  } else {
+			  // the default selector is IP:PID 
+			  String ip = InetAddress.getLocalHost().getHostAddress();
+			  targetStringSelector = ip+":"+controller.getPID();
+		  }
+		  UimaDefaultMessageListenerContainer targetedListener = 
+				  new UimaDefaultMessageListenerContainer();
+		  targetedListener.setType(Type.Target);
+		  // setup jms selector
+		  if ( controller.isCasMultiplier()) {
+			  targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+	          } else {
+				  targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+	          }
+		  
+		  // use shared ConnectionFactory
+          targetedListener.setConnectionFactory(processListener.getConnectionFactory());
+          // mark the listener as a 'Targeted' listener
+          targetedListener.setTargetedListener();
+          targetedListener.setController(controller);
+          // there will only be one delivery thread. Its job will be to
+          // add a targeted message to a BlockingQueue. Such thread will block
+          // in an enqueue if a dequeue is not available. This will be prevent
+          // the overwhelming the service with messages.
+  		  ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+		  threadExecutor.setCorePoolSize(1);
+		  threadExecutor.setMaxPoolSize(1);
+		  targetedListener.setTaskExecutor(threadExecutor);
+          targetedListener.setConcurrentConsumers(1);
+		  if ( processListener.getMessageListener() instanceof PriorityMessageHandler ) {
+			  // the targeted listener will use the same message handler as the
+			  // Process listener. This handler will add a message wrapper 
+			  // to enable prioritizing messages. 
+			  targetedListener.setMessageListener(processListener.getMessageListener());
+		  }
+		  // Same queue as the Process queue
+		  targetedListener.setDestination(processListener.getDestination());
+          //registerListener(targetedListener);
+ //         targetedListener.afterPropertiesSet();
+		  threadExecutor.initialize();
+		  
+          //targetedListener.initialize();
+          //targetedListener.start();
+          if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+            UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(),
+                    "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_TARGET_LISTENER__INFO",
+                    new Object[] {targetedListener.getMessageSelector(), controller.getComponentName() });
+          }
+		
+        inputChannel.addListenerContainer(targetedListener);
+		
 		listeners.add(processListener);
 		// listener to handle GetMeta requests
 		UimaDefaultMessageListenerContainer getMetaListener 

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java Mon May  7 21:27:43 2018
@@ -19,6 +19,7 @@
 package org.apache.uima.adapter.jms.service.builder;
 
 import javax.jms.Destination;
+import javax.jms.MessageListener;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -35,6 +36,7 @@ import org.apache.uima.aae.error.ErrorHa
 import org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener;
 import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
 import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.PriorityMessageHandler;
 import org.apache.uima.adapter.jms.activemq.TempDestinationResolver;
 import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
 import org.apache.uima.as.client.Listener.Type;
@@ -53,6 +55,7 @@ public class JmsMessageListenerBuilder {
 	private ThreadPoolTaskExecutor threadExecutor=null;
 	private Type type;
 	private TempDestinationResolver tempQueueDestinationResolver = null;
+	private PriorityMessageHandler priorityHandler = null;
 	
 	public static void main(String[] args) {
 		try {
@@ -168,7 +171,7 @@ public class JmsMessageListenerBuilder {
 			jmsGetMetaMessageListener.initialize();
 			jmsGetMetaMessageListener.start();
 			
-
+// !!!!!!!!!!!!!! WHY replyListener not added to input channel like the two above?
 			replyListener.afterPropertiesSet();
 			replyListener.initialize();
 			replyListener.start();
@@ -227,6 +230,11 @@ public class JmsMessageListenerBuilder {
 		this.selector = selector;
 		return this;
 	}
+	public JmsMessageListenerBuilder withPriorityMessageHandler(PriorityMessageHandler priorityHandler ) {
+		this.priorityHandler = priorityHandler;
+		return this;
+	}
+
 	public JmsMessageListenerBuilder withDestination(Destination destination ) {
 		this.destination = destination;
 		return this;
@@ -258,7 +266,7 @@ public class JmsMessageListenerBuilder {
 	       return (endpoint != null && endpoint.isRemote()  && endpoint.isCasMultiplier() );
 	}
 	public UimaDefaultMessageListenerContainer build() throws Exception{
-		UimaDefaultMessageListenerContainer listener = 
+		UimaDefaultMessageListenerContainer listenerContainer = 
 				new UimaDefaultMessageListenerContainer();
 		/*
 		 * 
@@ -267,17 +275,21 @@ public class JmsMessageListenerBuilder {
 		 */
 		// make sure all required properties are set
 		validate();
+		if ( type != null ) {
+			listenerContainer.setType(type);
+		}
+
 		if ( threadExecutor != null ) {
 			threadExecutor.setThreadNamePrefix(controller.getComponentName()+"-"+type.name()+"Listener-Thread");
-			listener.setTaskExecutor(threadExecutor);
+			listenerContainer.setTaskExecutor(threadExecutor);
 			
 		}
 		
-		listener.setConcurrentConsumers(consumerCount);
-		listener.setController(controller);
+		listenerContainer.setConcurrentConsumers(consumerCount);
+		listenerContainer.setController(controller);
 		
 		if ( selector != null ) {
-			listener.setMessageSelector(selector);
+			listenerContainer.setMessageSelector(selector);
 		}
 		
         if (isRemoteCasMultiplier(endpoint) ) {
@@ -291,16 +303,24 @@ public class JmsMessageListenerBuilder {
     		// is ConcurrentMessageListener which imposes order of replies (parent last) before delegating 
     		// msgs to the inputchannel. When stopping the service, all listeners must be registered with 
     		// an inputchannel which is responsible for shutting down all listeners.
-    		((JmsInputChannel)inputChannel).registerListener(listener);
-            listener.setMessageListener(concurrentListener);
+    		((JmsInputChannel)inputChannel).registerListener(listenerContainer);
+    		listenerContainer.setMessageListener(concurrentListener);
             concurrentListener.setAnalysisEngineController(controller);
         } else {
-    		((JmsInputChannel)inputChannel).registerListener(listener);
-    		listener.setMessageListener(inputChannel);
+    		((JmsInputChannel)inputChannel).registerListener(listenerContainer);
+    		// Message priority handler is an intermediary object between JMS Message Listener
+    		// and an InputChannel. Its main role is to intercept messages and add them to 
+    		// the priority queue shared with an InputChannel. This is done to support processing
+    		// of targeted messages ahead of regular priority (process) msgs.
+    		if ( priorityHandler != null ) {
+        		listenerContainer.setMessageListener(priorityHandler);
+    		} else {
+        		listenerContainer.setMessageListener(inputChannel);
+    		}
         }
 
-		listener.setTargetEndpoint(endpoint);
-		listener.setConnectionFactory(connectionFactory);
+        listenerContainer.setTargetEndpoint(endpoint);
+        listenerContainer.setConnectionFactory(connectionFactory);
 		// is this listener processing replies from a remote service. This can
 		// only be true if the controller is an aggregate. Primitive controller
 		// can only handle requests from remote services. An aggregate can send
@@ -309,25 +329,22 @@ public class JmsMessageListenerBuilder {
 			String e = Type.FreeCAS.equals(type) ? "FreeCASEndpoint" :endpoint.getDelegateKey();
 			TempDestinationResolver resolver = new
 					TempDestinationResolver(controller.getComponentName(), e);
-			resolver.setListener(listener);
+			resolver.setListener(listenerContainer);
 			resolver.setConnectionFactory(connectionFactory);
-			listener.setDestinationResolver(resolver);
-			listener.setDestinationName("");
+			listenerContainer.setDestinationResolver(resolver);
+			listenerContainer.setDestinationName("");
 			if ( Type.FreeCAS.equals(type)) {
-				listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For FreeCas Listener");
+				listenerContainer.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For FreeCas Listener");
 			} else {
-				listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For Delegate:"+endpoint.getDelegateKey());
+				listenerContainer.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For Delegate:"+endpoint.getDelegateKey());
 			}
 		} else if ( destination != null ) {
-			listener.setDestinationName(((ActiveMQDestination)destination).getPhysicalName());
-			listener.setDestination(destination);
-			listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener");
+			listenerContainer.setDestinationName(((ActiveMQDestination)destination).getPhysicalName());
+			listenerContainer.setDestination(destination);
+			listenerContainer.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener");
 
 		}
 
-		if ( type != null ) {
-			listener.setType(type);
-		}
-		return listener;
+		return listenerContainer;
 	}
 }

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java?rev=1831129&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java Mon May  7 21:27:43 2018
@@ -0,0 +1,36 @@
+package org.apache.uima.adapter.jms.service.builder;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
+
+public class JmsProcessListener {
+
+	AnalysisEngineController controller;
+	
+	JmsProcessListener(AnalysisEngineController controller) {
+		this.controller = controller;
+	}
+	public void create() throws Exception {
+		JmsMessageListenerBuilder listenerBuilder = 
+				new JmsMessageListenerBuilder();
+/*
+		UimaDefaultMessageListenerContainer messageListener =
+				listenerBuilder.withController(controller)
+		       			.withType(type)
+						.withConectionFactory(factory)
+						.withThreadPoolExecutor(threadExecutor)
+						.withConsumerCount(consumerCount)
+						.withInputChannel(inputChannel)
+						.withPriorityMessageHandler(h)
+						.withSelector(getSelector(type))
+						.withDestination(destination)
+						.build();
+		messageListener.setReceiveTimeout(500);
+	*/
+	}
+	public static void main(String[] args) {
+		// TODO Auto-generated method stub
+
+	}
+
+}