You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [2/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima...

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Mon Feb 26 18:54:11 2018
@@ -54,9 +54,11 @@ import org.apache.uima.aae.UimaAsPriorit
 //import org.apache.uima.aae.UimaASCredentials;
 import org.apache.uima.aae.UimaAsThreadFactory;
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
 //import org.apache.uima.aae.UimaAsThreadFactory.UsedFor;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
@@ -68,6 +70,7 @@ import org.apache.uima.aae.message.Async
 import org.apache.uima.aae.message.MessageWrapper;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry;
+import org.apache.uima.as.client.Listener;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.util.Level;
 import org.springframework.core.task.TaskExecutor;
@@ -80,12 +83,15 @@ import org.springframework.jms.support.d
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 public class UimaDefaultMessageListenerContainer extends DefaultMessageListenerContainer implements
-        ExceptionListener {
+        Listener, ExceptionListener {
   private static final Class<?> CLASS_NAME = UimaDefaultMessageListenerContainer.class;
   public static final String PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR Command=2002)";
   public static final String CM_PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR Command=2002 OR Command=2005)";
   public static final String GETMETA_SELECTOR_SUFFIX = "(Command=2001)";
   
+  private Transport transport = Transport.JMS;
+  private Type type = Type.Unknown;
+  
   public static final int HIGH_PRIORITY = 9;
   
   private String destinationName = "";
@@ -153,12 +159,23 @@ public class UimaDefaultMessageListenerC
     UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
     __listenerRef = this;
     setRecoveryInterval(400);  // increase connection recovery to 30 sec
-    setAcceptMessagesWhileStopping(true);
+    //setAcceptMessagesWhileStopping(true);
     setExceptionListener(this);
     threadGroup = new ThreadGroup("ListenerThreadGroup_"
             + Thread.currentThread().getThreadGroup().getName());
   }
-
+  public Transport getTransport() {
+	  return transport;
+  }
+  public void setType(Type t) {
+	  this.type = t;
+  }
+  public Type getType() {
+	  return type;
+  }
+  protected void handleListenerException(Throwable t) {
+	  t.printStackTrace();
+  }
   public UimaDefaultMessageListenerContainer(boolean freeCasQueueListener) {
     this();
     this.freeCasQueueListener = freeCasQueueListener;
@@ -183,7 +200,38 @@ public class UimaDefaultMessageListenerC
    */
   protected void refreshConnectionUntilSuccessful() {
 	 // System.out.println("............refreshConnectionUntilSuccessful() called");
+	  while (isRunning()) {
 
+			try {
+				
+				if (sharedConnectionEnabled()) {
+					refreshSharedConnection();
+				} else {
+					Connection con = createConnection();
+					JmsUtils.closeConnection(con);
+				}
+				// Use UIMA-AS custom Destination Resolver to create a new temp queue for this reply listener
+				if( Type.Reply.equals(type) || Type.FreeCAS.equals(type)) {
+					getDestinationResolver().resolveDestinationName(getSharedConnection().createSession(false, Session.AUTO_ACKNOWLEDGE),"",false);
+				}
+				logger.info(getType().name()+" Listener Successfully refreshed JMS Connection to broker: "+getBrokerUrl()+" Endpoint:"+getDestination());
+				logListenerFailure = true;
+				break;
+			}
+			catch (Exception ex) {
+				
+				if (ex instanceof JMSException) {
+					if (logListenerFailure && ex.getCause() instanceof ConnectException) {
+						
+						logger.info(getType().name()+" Listener lost connection to broker: "+getBrokerUrl()+"- Retrying until successfull ...");
+						logListenerFailure = false;
+					} else {
+						invokeExceptionListener((JMSException) ex);
+					}
+				}
+			}
+	  }
+	  /*
 	  boolean doLogFailureMsg = true;
     try {
     	// Only one listener thread should enter to recover lost connection.
@@ -244,30 +292,11 @@ public class UimaDefaultMessageListenerC
        	            		}
     	                   }
     	                   
-    	                       	            	/*  
-    	               	if ( getMessageListener() instanceof JmsInputChannel ) {
-    	            		System.out.println("------------------- Creating new listener for the temp queue");
-    	            		try {
-    	                	((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory());
-    	                		System.out.println("------------------- New listener on temp queue is ready");
-    	            		} catch( Exception e) {
-    	                		System.out.println("------------------- Error while creating new listener on temp queue");
-    	            	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-    	            	              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-    	            	                      "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-    	            	                      "UIMAJMS_exception__WARNING", e);
-    	            	            }
-    	            			
-    	            		}
-    	            	}
-*/
+
     	               
     	               }
     	               
-    	  /*
-    	               String delegateKey = ((AggregateAnalysisEngineController) controller)
-    	                .lookUpDelegateKey(endpoint.getEndpoint());
-    		*/
+ 
     		        //}
     		        break;
     		      }
@@ -293,6 +322,7 @@ public class UimaDefaultMessageListenerC
     	}
     } catch( IllegalStateException e ) {
     }
+    */
   }
   protected void recoverAfterListenerSetupFailure() {
 	  if ( !terminating ) {
@@ -323,6 +353,7 @@ public class UimaDefaultMessageListenerC
   /**
    * Stops this Listener
    */
+  /*
   private void handleListenerFailure() {
     // If shutdown already, nothing to do
     if (awaitingShutdown) {
@@ -363,12 +394,13 @@ public class UimaDefaultMessageListenerC
       }
     }
   }
-
+*/
   /**
    * Handles failure on a temp queue
    * 
    * @param t
    */
+  /*
   private void handleTempQueueFailure(Throwable t) {
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
       if ( controller != null ) {
@@ -463,7 +495,7 @@ public class UimaDefaultMessageListenerC
     } else {
     }
   }
-
+*/
   private ErrorHandler fetchGetMetaErrorHandler() {
     ErrorHandler handler = null;
     Iterator it = controller.getErrorHandlerChain().iterator();
@@ -483,6 +515,7 @@ public class UimaDefaultMessageListenerC
    * 
    * @param t
    */
+  /*
   private void handleQueueFailure(Throwable t) {
 	//  System.out.println("............handleQueueFailure() called");
     final String endpointName = (getDestination() == null) ? ""
@@ -598,7 +631,7 @@ public class UimaDefaultMessageListenerC
     }
 
   }
-
+*/
   /**
    * This method is called by Spring when a listener fails
    */
@@ -609,9 +642,10 @@ public class UimaDefaultMessageListenerC
     }
     // If shutdown already, nothing to do
 	    // If controller is stopping no need to recover the connection
-    if (awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
+    if (!super.isRunning() || awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
       return;
     }
+    /*
     if ( controller != null ) {
       controller.changeState(ServiceState.FAILED);
     }
@@ -676,10 +710,12 @@ public class UimaDefaultMessageListenerC
       }
       failed = true;
     }
+    */
   }
   public Endpoint getEndpoint() {
 	  return endpoint;
   }
+  /*
   private void terminate(Throwable t) {
     // ****************************************
     // terminate the service
@@ -695,7 +731,7 @@ public class UimaDefaultMessageListenerC
       controller.stop();
     }
   }
-
+  
   protected void handleListenerException(Throwable t) {
 	 // System.out.println("............handleListenerException(Throwable t)");
 	  
@@ -703,17 +739,7 @@ public class UimaDefaultMessageListenerC
     if (awaitingShutdown) {
       return;
     }
-    /*
-    String endpointName = (getDestination() == null) ? ""
-            : ((ActiveMQDestination) getDestination()).getPhysicalName();
-
-    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
-              "handleListenerException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-              "UIMAJMS_jms_listener_failed_WARNING",
-              new Object[] { endpointName, getBrokerUrl(), t });
-    }
-    */
+   
     super.handleListenerException(t);
   }
 
@@ -731,14 +757,16 @@ public class UimaDefaultMessageListenerC
     super.setConnectionFactory(connectionFactory);
   }
 
-
+*/
   public boolean isGetMetaListener() {
-	  
+	  return Type.GetMeta.equals(type);
+	  /*
     return getMessageSelector() != null
             && __listenerRef.getMessageSelector().endsWith(GETMETA_SELECTOR_SUFFIX);
 //    && __listenerRef.getMessageSelector().endsWith("(Command=2001)");
+ */
   }
-  
+  /*
   private boolean isActiveMQDestination() {
     return getDestination() != null && getDestination() instanceof ActiveMQDestination;
   }
@@ -762,24 +790,26 @@ public class UimaDefaultMessageListenerC
       }
     }
   }
-
+*/
   /**
    * Intercept Spring call to increment number of consumer threads. If the value > 1, don't
    * propagate to Spring. A new listener will be injected and it will use provided number of
    * consumer threads.
    **/
+  /*
   public void setConcurrentConsumers(int concurrentConsumers) {
     cc = concurrentConsumers;
     if (this.freeCasQueueListener) {
       super.setConcurrentConsumers(concurrentConsumers);
     }
   }
-
+*/
   /**
    * Intercept Spring call to inject application Pojo listener. Don't propagate the listener up to
    * Spring just yet. If more than one consumer thread is used, a different listener will be
    * injected.
    **/
+  /*
   public void setMessageListener(Object messageListener) {
     ml = messageListener;
     if (this.freeCasQueueListener || targetedListener ) {
@@ -791,10 +821,12 @@ public class UimaDefaultMessageListenerC
   public void afterPropertiesSet() {
     afterPropertiesSet(true);
   }
+  */
   /**
    * Called by Spring and some Uima AS components when all properties have been set. This method
    * spins a thread in which the listener is initialized.
    */
+  /*
   public void afterPropertiesSet(final boolean propagate) {
     if (endpoint != null) {
 			// Override the prefetch size. The dd2spring always sets this to 1 which 
@@ -938,12 +970,13 @@ public class UimaDefaultMessageListenerC
     });
     t.start();
   }
-
+*/
   /**
    * Inject instance of this listener into the InputChannel
    * 
    * @throws Exception
    */
+  /*
   private void connectWithInputChannel() throws Exception {
     Object pojoListener = getPojoListener();
 
@@ -961,7 +994,7 @@ public class UimaDefaultMessageListenerC
       ((ModifiableListener) pojoListener).setListener(__listenerRef);
     }
   }
-
+*/
   public String getDestinationName() {
 
     return destinationName;
@@ -975,7 +1008,9 @@ public class UimaDefaultMessageListenerC
   }
 
   public String getBrokerUrl() {
-    return ((ActiveMQConnectionFactory) connectionFactory).getBrokerURL();
+		return ((ActiveMQConnectionFactory)super.getConnectionFactory()).getBrokerURL();
+
+//    return ((ActiveMQConnectionFactory) connectionFactory).getBrokerURL();
   }
 
   /*
@@ -984,28 +1019,17 @@ public class UimaDefaultMessageListenerC
    * deployment descriptor and create a new one with rewritten Broker URL. We will inject the
    * prefetch policy to the new CF based on what is found in the CF in the deployment descriptor.
    */
-
+/*
   public void setConnectionFactory(ConnectionFactory aConnectionFactory)  {
     connectionFactory = aConnectionFactory;
-    /*
-    if ( System.getProperty("uima.as.broker.credentials.file") != null ) {
-    	UimaASCredentials credentials = new UimaASCredentials();
-    	try {
-        	credentials.readCredentials(System.getProperty("uima.as.broker.credentials.file"));
-    	} catch( IOException e) {
-    		throw new RuntimeException(e);
-    	}
-    	((ActiveMQConnectionFactory)connectionFactory).setUserName(credentials.getUsername());
-        ((ActiveMQConnectionFactory)connectionFactory).setPassword(credentials.getPassword());
-    }
-    */
+   
     ConnectionFactoryIniter cfIniter =
             new ConnectionFactoryIniter((ActiveMQConnectionFactory)connectionFactory);
     cfIniter.whiteListPackages();
     ((ActiveMQConnectionFactory)connectionFactory).setTrustAllPackages(true);
     super.setConnectionFactory(connectionFactory);
   }
-
+*/
   public void setDestinationResolver(DestinationResolver resolver) {
     ((TempDestinationResolver) resolver).setListener(this);
     super.setDestinationResolver(resolver);
@@ -1057,8 +1081,17 @@ public class UimaDefaultMessageListenerC
 
   public void setDestination(Destination aDestination) {
     super.setDestination(aDestination);
+   
+    System.out.println("............................. "+endpoint+" Destination:"+aDestination.toString());
+    if ( Type.FreeCAS.equals(type)) {
+    	((JmsOutputChannel)controller.getOutputChannel(ENDPOINT_TYPE.JMS)).setFreeCasQueue(aDestination);
+    }
     if (endpoint != null) {
       endpoint.setDestination(aDestination);
+      if (aDestination instanceof TemporaryQueue ) {
+    	  endpoint.setTempReplyDestination(true);
+      }
+      /*
       //  Get the prefetch size. If > 1, it has been previously overriden. The override is done in
       // the code since dd2spring alwys sets the prefetch on a reply queue to 1. This may slow down
       // a throughput of a service.
@@ -1079,10 +1112,28 @@ public class UimaDefaultMessageListenerC
           ((JmsInputChannel) pojoListener).setListenerContainer(this);
         }
       }
+      */
       endpoint.setServerURI(getBrokerUrl());
     }
   }
+  public void start() {
+	  if ( isRunning()) {
+		  return;
+	  }
+	  int consumerThreadCount=-1;
+	  if ( getTaskExecutor() instanceof ThreadPoolTaskExecutor) {
+		  ((ThreadPoolTaskExecutor)getTaskExecutor()).initialize();
+		  // if this listener is a handling Process requests, the prestartAllCoreThreads() below
+		  // will force initialization of AEs if this is a primitive service.
+		  ((ThreadPoolTaskExecutor)getTaskExecutor()).getThreadPoolExecutor().prestartAllCoreThreads();
+	  }
+	  super.afterPropertiesSet();
+	  super.initialize();
+	  super.start();
+	  
+      System.out.println(">>>>>>> Listener Service:"+controller.getComponentName()+" Broker URL:"+getBrokerUrl()+" Endpoint:"+__listenerRef.getEndpoint()+" ConsumerThreadCount:"+consumerThreadCount);
 
+  }
   private Object getPojoListener() {
     Object pojoListener = null;
     if (ml != null) {
@@ -1098,6 +1149,7 @@ public class UimaDefaultMessageListenerC
   }
 
   public void onException(JMSException arg0) {
+	  /*
     if (awaitingShutdown) {
       return;
     }
@@ -1121,7 +1173,7 @@ public class UimaDefaultMessageListenerC
     if ( getDestination() != null && ((ActiveMQDestination)getDestination()).isTemporary() ) {
       handleTempQueueFailure(arg0);
     }
-  
+  */
   }
 
   public void setTargetEndpoint(Endpoint anEndpoint) {
@@ -1129,7 +1181,8 @@ public class UimaDefaultMessageListenerC
   }
 
   public boolean isFreeCasQueueListener() {
-    return freeCasQueueListener;
+	  return Type.FreeCAS.equals(type);
+    //return freeCasQueueListener;
   }
 
   protected void setModifiedTaskExecutor(TaskExecutor taskExecutor) {
@@ -1185,6 +1238,7 @@ public class UimaDefaultMessageListenerC
                   "UIMAJMS_exception__WARNING", t);
 	  }
   }
+  /*
   public void shutdownTaskExecutor(ThreadPoolExecutor tpe, boolean stopImmediate) throws InterruptedException {
     tpe.awaitTermination(50, TimeUnit.MILLISECONDS);
     
@@ -1203,10 +1257,12 @@ public class UimaDefaultMessageListenerC
   public void destroy() {
 	  destroy(true); 
   }
+  */
   /**
    * Spins a shutdown thread and stops Sprint and ActiveMQ threads.
    * 
    */
+  /*
   public void destroy(final boolean stopImmediate) {
 	  
     if (awaitingShutdown) {
@@ -1432,26 +1488,9 @@ public class UimaDefaultMessageListenerC
           super.setTaskExecutor(es);
       }
     }
-    /*
-    else {
-        UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
-        tf.setDaemon(true);
-        if ( isFreeCasQueueListener()) {
-          tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
-        } else if ( isGetMetaListener()  ) {
-          tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
-        } else if ( getDestination() != null && getMessageSelector() != null ) {
-          tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
-        } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
-          tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
-        } else { 
-          throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
-        }
-        
-    }
-    */
+   
   }
-
+*/
   private boolean isPrimitiveService() {
 	  return controller != null && controller instanceof PrimitiveAnalysisEngineController &&
 			  controller.getInputChannel() != null;
@@ -1462,6 +1501,7 @@ public class UimaDefaultMessageListenerC
    */
   public void setTaskExecutor(TaskExecutor aTaskExecutor) {
     taskExecutor = aTaskExecutor;
+    super.setTaskExecutor(aTaskExecutor);
   }
 
   public TaskExecutor getTaskExecutor() {
@@ -1476,6 +1516,7 @@ public class UimaDefaultMessageListenerC
    * 
    * @throws Exception
    */
+  /*
   private void initializeTaskExecutor(int consumers) throws Exception {
     // TaskExecutor is only used with primitives
     if (controller instanceof PrimitiveAnalysisEngineController) {
@@ -1493,10 +1534,18 @@ public class UimaDefaultMessageListenerC
     	threadPoolExecutor.prestartAllCoreThreads();
     }
   }
+  */
   public void delegateStop() {
     super.stop();
   }
+  /*
   public void stop() throws JmsException {
     destroy();
   }
+  */
+  @Override
+  public String getName() {
+  	// TODO Auto-generated method stub
+  	return null;
+  }
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Mon Feb 26 18:54:11 2018
@@ -44,6 +44,7 @@ import org.apache.uima.adapter.jms.JmsCo
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
 import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.adapter.jms.message.PendingMessageImpl;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.util.Level;
@@ -284,11 +285,11 @@ public class ActiveMQMessageSender exten
           // instead of a temp queue. Regular queues can be recovered in case of
           // a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
           // Code in JmsOutputChannel will add the selector if the service is a CM.
-          if (pm.get(AsynchAEMessage.TargetingSelector) != null) {
-        	  selector = (String)pm.get(AsynchAEMessage.TargetingSelector);
+          if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) {
+        	  selector = (String)pm.getPropertyAsString(AsynchAEMessage.TargetingSelector);
           }
           if ( selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS || pm.getMessageType() == AsynchAEMessage.Stop) ) {
-        	  d = (Destination)pm.get(AsynchAEMessage.Destination);
+        	  d = (Destination)pm.getProperty(AsynchAEMessage.Destination);
               
           } else {
               d = jmsSession.createQueue(destinationName);
@@ -313,7 +314,7 @@ public class ActiveMQMessageSender exten
           }
           if (casProcessRequest) {
             cacheEntry = (ClientRequest) engine.getCache().get(
-                    pm.get(AsynchAEMessage.CasReference));
+                    pm.getPropertyAsString(AsynchAEMessage.CasReference));
             if (cacheEntry != null) {
             //    CAS cas = cacheEntry.getCAS();
                 // enable logging 
@@ -356,7 +357,7 @@ public class ActiveMQMessageSender exten
                           JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                           "UIMAJMS_failed_cache_lookup__WARNING",
                           new Object[] {
-                         	 pm.get(AsynchAEMessage.CasReference),
+                         	 pm.getPropertyAsString(AsynchAEMessage.CasReference),
                               UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
                                       .getIntProperty(AsynchAEMessage.Command)),
                               UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
@@ -403,7 +404,7 @@ public class ActiveMQMessageSender exten
                         JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                         "UIMAJMS_calling_onBeforeMessageSend__FINE",
                         new Object[] {
-                          pm.get(AsynchAEMessage.CasReference),
+                          pm.getPropertyAsString(AsynchAEMessage.CasReference),
                           String.valueOf(cacheEntry.getCAS().hashCode())
                         });
               }  

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Mon Feb 26 18:54:11 2018
@@ -20,10 +20,15 @@
 package org.apache.uima.adapter.jms.client;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -74,13 +79,24 @@ import org.apache.uima.aae.error.UimaASM
 import org.apache.uima.aae.jmx.JmxManager;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.UIMAMessage;
+import org.apache.uima.aae.service.AsynchronousUimaASService;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.UimaAsServiceRegistry;
+import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.ConnectionFactoryIniter;
 import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
 import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
 import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.adapter.jms.message.PendingMessageImpl;
 import org.apache.uima.adapter.jms.service.Dd2spring;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
+import org.apache.uima.as.client.DirectMessage;
+import org.apache.uima.as.deployer.ServiceDeployers;
+import org.apache.uima.as.deployer.ServiceDeployers.Protocol;
+import org.apache.uima.as.deployer.ServiceDeployers.Provider;
+import org.apache.uima.as.deployer.UimaAsServiceDeployer;
+import org.apache.uima.as.dispatcher.LocalDispatcher;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.impl.UimaVersion;
@@ -90,11 +106,15 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.resource.ResourceManager;
 import org.apache.uima.resource.ResourceProcessException;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
 import org.apache.uima.util.Level;
+import org.apache.xmlbeans.XmlDocumentProperties;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationEvent;
 import org.springframework.context.ApplicationListener;
 import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.XMLReaderFactory;
 
 public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineCommon_impl
         implements UimaAsynchronousEngine, MessageListener, ControllerCallbackListener, ApplicationListener<ApplicationEvent>{
@@ -138,7 +158,12 @@ public class BaseUIMAAsynchronousEngine_
   
   protected static Lock globalLock = new ReentrantLock();
   
-  //private String serviceTargetSelector = null;
+  protected UimaASService service = null;
+  
+  ExecutorService consumerService = null;
+  private int casPoolSize = 1;
+  
+  private Thread dispatchThread;
   
   protected volatile boolean stopped = false;
   public BaseUIMAAsynchronousEngine_impl() {
@@ -148,7 +173,11 @@ public class BaseUIMAAsynchronousEngine_
     " UIMA-AS Version " + UimaAsVersion.getVersionString());
   }
 
-
+  protected void beforeProcessReply(String casReferenceId) {
+	  if ( service != null ) {
+		  service.removeFromCache(casReferenceId);
+	  }
+  }
   protected TextMessage createTextMessage() throws ResourceInitializationException {
     return new ActiveMQTextMessage();
   }
@@ -352,20 +381,30 @@ public class BaseUIMAAsynchronousEngine_
   }
 	public void stop() {
 		try {
-			  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
-						Level.INFO)) {
-			     UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
-							CLASS_NAME.getName(), "stop",
-							JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-							"UIMAJMS_stopping_as_client_INFO");
-		      }
-			  stopConnection();
+			  if ( brokerURI != null && !brokerURI.equals("java")) {
+				  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
+							Level.INFO)) {
+				     UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
+								CLASS_NAME.getName(), "stop",
+								JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+								"UIMAJMS_stopping_as_client_INFO");
+			      }
+				  stopConnection();
+			  }
+
 
 		      super.doStop();
 		      if (!running) {
 		        return;
 		      }
 		      running = false;
+		      if ( consumerService != null ) {
+		    	  DirectMessage poisonPillMsg = new DirectMessage().withCommand(AsynchAEMessage.Stop);
+		    	  ((AsynchronousUimaASService)service).getReplyQueue().put(poisonPillMsg);
+		    	  // stop thread pool for local queue listener
+		    	 // consumerService.shutdown();
+		    	  consumerService.shutdown();
+		      }
 			  if (super.serviceDelegate != null) {
 				// Cancel all timers and purge lists
 				super.serviceDelegate.cleanup();
@@ -378,6 +417,9 @@ public class BaseUIMAAsynchronousEngine_
 //				stopConnection();
 				// Undeploy all containers
 				undeploy();
+				if ( dispatchThread != null ) {
+					dispatchThread.interrupt();
+				}
 		 	    clientCache.clear();
 				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
 								Level.INFO)) {
@@ -407,7 +449,7 @@ public class BaseUIMAAsynchronousEngine_
 		} 
 	}
 
-  protected void setCPCMessage(Message msg) throws Exception {
+  public void setCPCMessage(Message msg) throws Exception {
     msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
     msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
     msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
@@ -419,6 +461,183 @@ public class BaseUIMAAsynchronousEngine_
       ((TextMessage) msg).setText("");
     }
   }
+  protected boolean isServiceRemote() {
+	  return transport.equals(Transport.JMS);
+//	  return (service instanceof UimaASJmsService);
+//	  return service == null;
+  }
+  private void startLocalConsumer(Map anApplicationContext) {
+
+	  consumerService = Executors.newFixedThreadPool(1);
+	  	  consumerService.execute( new Runnable() {
+		      
+		      public void run() {
+		        try {
+		        	while( running ) {
+			        	DirectMessage message =
+				    			  ((AsynchronousUimaASService)service).getReplyQueue().take();
+			        	if ( message.getAsInt(AsynchAEMessage.Command) == AsynchAEMessage.Stop) {
+				        	System.out.println("BaseUIMAAsynchronousEngine_impl.startLocalConsumer().run() - Direct Consumer Recv'd Stop Msg - Terminating");
+			        		return;
+			        	}
+			        	System.out.println("Client Direct Local Consumer() Recv'd Reply");
+			        	onMessage(message);
+		        	}
+
+		        } catch( InterruptedException e) {
+		        	System.out.println("BaseUIMAAsynchronousEngine_impl.startLocalConsumer().run() - Stopped Direct Consumer");
+		        	return;
+		        	
+		        } catch( Exception e) {
+		        	e.printStackTrace();
+		        	
+		        }
+		      }
+	   });
+	  
+	  
+  }
+  private void initializeLocal(Map anApplicationContext) throws ResourceInitializationException {
+    if ( dispatchThread == null ) {
+  	  // make sure we are in the running state. The local consumer depends on it
+      	running = true;
+      
+    	// start message consumer to handle replies  
+    	startLocalConsumer(anApplicationContext);
+    	// start dispatcher in its own thread. It will fetch messages from a shared 'pendingMessageQueue'
+      	LocalDispatcher dispatcher =
+      			new LocalDispatcher(this, service, pendingMessageQueue);
+      	dispatchThread = new Thread(dispatcher);
+      	dispatchThread.start();
+    }
+
+  }
+  private void initializeJMS(Map anApplicationContext) throws ResourceInitializationException {
+      if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri)) {
+          throw new ResourceInitializationException();
+        }
+        if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ENDPOINT)) {
+          throw new ResourceInitializationException();
+        }
+        if (anApplicationContext.containsKey(UimaAsynchronousEngine.SERIALIZATION_STRATEGY)) {
+            final String serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SERIALIZATION_STRATEGY);
+            // change this to support compressed filitered as the default
+            setSerialFormat((serializationStrategy.equalsIgnoreCase("xmi")) ? SerialFormat.XMI : SerialFormat.BINARY);
+            clientSideJmxStats.setSerialization(getSerialFormat());
+        }
+        if (anApplicationContext.containsKey(UimaAsynchronousEngine.userName)) {
+            amqUser = (String) anApplicationContext
+                    .get(UimaAsynchronousEngine.userName);
+        }
+        if (anApplicationContext.containsKey(UimaAsynchronousEngine.password)) {
+        	amqPassword = (String) anApplicationContext
+        	.get(UimaAsynchronousEngine.password);
+        }
+        
+        brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
+        endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.ENDPOINT);
+        
+        //  Check if a placeholder is passed in instead of actual broker URL or endpoint. 
+        //  The placeholder has the syntax ${placeholderName} and may be imbedded in text.
+        //  A system property with placeholderName must exist for successful placeholder resolution.
+        //  Throws ResourceInitializationException if placeholder is not in the System properties.
+        brokerURI = replacePlaceholder(brokerURI); 
+        endpoint = replacePlaceholder(endpoint); 
+        // Check if sharedConnection exists. If not create a new one. The sharedConnection
+        // is static and shared by all instances of UIMA AS client in a jvm. The check
+        // is made in a critical section by first acquiring a global static semaphore to
+        // prevent a race condition.
+        try {
+            createSharedConnection(brokerURI);
+            running = true;
+            //  This is done to give the broker enough time to 'finalize' creation of
+            //  temp reply queue. It's been observed (on MAC OS only) that AMQ
+            //  broker QueueSession.createTemporaryQueue() call is not synchronous. Meaning,
+            //  return from createTemporaryQueue() does not guarantee immediate availability
+            //  of the temp queue. It seems like this operation is asynchronous, causing: 
+            //  "InvalidDestinationException: Cannot publish to a deleted Destination..."
+            //  on the service side when it tries to reply to the client.
+            wait(100);
+        } catch( InterruptedException e) {
+      	  
+        } catch (Exception e) {
+          state = ClientState.FAILED;
+          notifyOnInitializationFailure(e);
+          throw new ResourceInitializationException(e);
+    }
+
+  }
+  private void fetchRequiredProperties(Map anApplicationContext, Properties performanceTuningSettings) throws ResourceInitializationException {
+	    ResourceManager rm = null;
+	    if (anApplicationContext.containsKey(Resource.PARAM_RESOURCE_MANAGER)) {
+	      rm = (ResourceManager) anApplicationContext.get(Resource.PARAM_RESOURCE_MANAGER);
+	    } else {
+	      rm = UIMAFramework.newDefaultResourceManager();
+	    }
+	   
+	    if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE)) {
+	      String cas_initial_heap_size = (String) anApplicationContext
+	              .get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
+	      performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
+	    }
+	    asynchManager = new AsynchAECasManager_impl(rm);
+
+
+	    clientSideJmxStats.setEndpointName(endpoint);
+	    
+
+	    if (anApplicationContext.containsKey(UimaAsynchronousEngine.CasPoolSize)) {
+	      casPoolSize = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CasPoolSize))
+	              .intValue();
+	      clientSideJmxStats.setCasPoolSize(casPoolSize);
+	    }
+
+	    if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) {
+	      processTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.Timeout))
+	              .intValue();
+	    }
+
+	    if (anApplicationContext.containsKey(UimaAsynchronousEngine.GetMetaTimeout)) {
+	      metadataTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.GetMetaTimeout))
+	              .intValue();
+	    }
+
+	    if (anApplicationContext.containsKey(UimaAsynchronousEngine.CpcTimeout)) {
+	      cpcTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CpcTimeout))
+	              .intValue();
+	    }
+	    if (anApplicationContext.containsKey(UimaAsynchronousEngine.ApplicationName)) {
+	      applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
+	    }
+
+	    if (anApplicationContext.containsKey(UimaAsynchronousEngine.TimerPerCAS)) {
+	        timerPerCAS = ((Boolean) anApplicationContext.get(UimaAsynchronousEngine.TimerPerCAS))
+	                .booleanValue();
+	    }
+     
+        brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
+        endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.ENDPOINT);
+        
+        //  Check if a placeholder is passed in instead of actual broker URL or endpoint. 
+        //  The placeholder has the syntax ${placeholderName} and may be imbedded in text.
+        //  A system property with placeholderName must exist for successful placeholder resolution.
+        //  Throws ResourceInitializationException if placeholder is not in the System properties.
+        brokerURI = replacePlaceholder(brokerURI); 
+        endpoint = replacePlaceholder(endpoint); 
+
+	    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
+	      UIMAFramework.getLogger(CLASS_NAME)
+	              .logrb(
+	                      Level.CONFIG,
+	                      CLASS_NAME.getName(),
+	                      "initialize",
+	                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                      "UIMAJMS_init_uimaee_client__CONFIG",
+	                      new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout,
+	                          cpcTimeout,timerPerCAS });
+	    }
+
+}
   protected void setFreeCasMessage(Message msg, String aCasReferenceId, String selector) throws Exception {
 	    msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
 	    msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
@@ -750,7 +969,27 @@ public class BaseUIMAAsynchronousEngine_
       throw new ResourceInitializationException(new UIMA_IllegalStateException());
     }
     reset();
-    Properties performanceTuningSettings = null;
+    Properties performanceTuningSettings = new Properties();
+    fetchRequiredProperties(anApplicationContext, performanceTuningSettings);
+    transport = (Transport)anApplicationContext.get(UimaAsynchronousEngine.ClientTransport);
+    if ( Transport.JMS.equals(transport)) {
+    	initializeJMS(anApplicationContext);
+    } else if ( Transport.Java.equals(transport)) {
+    	if ( service == null ) {
+    		service = UimaAsServiceRegistry.getInstance().lookupByEndpoint(endpoint);
+    	}
+    	brokerURI = "java";
+    	initializeLocal(anApplicationContext);
+    	
+    } else if ( transport == null ){
+    	throw new IllegalArgumentException("Client Transport Not Specified - Add Transport.JMS or Transport.Java to ApplicationContext");
+    } else {
+    	throw new IllegalArgumentException("Unsupported Client Transport Specified - "+transport.toString()+" Expected Transport.JMS or Transport.Java");
+    }
+//    Properties performanceTuningSettings = new Properties();
+//    fetchRequiredProperties(anApplicationContext, performanceTuningSettings);
+ 
+    //Properties performanceTuningSettings = null;
 
     if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri)) {
       throw new ResourceInitializationException();
@@ -847,33 +1086,7 @@ public class BaseUIMAAsynchronousEngine_
     super.serviceDelegate.setCasProcessTimeout(processTimeout);
     super.serviceDelegate.setGetMetaTimeout(metadataTimeout);
     try {
-      // Generate unique identifier
-      String uuid = UUIDGenerator.generate();
-      // JMX does not allow ':' in the ObjectName so replace these with underscore
-      uuid = uuid.replaceAll(":", "_");
-      uuid = uuid.replaceAll("-", "_");
-      applicationName += "_" + uuid;
-      jmxManager = new JmxManager("org.apache.uima");
-      clientSideJmxStats.setApplicationName(applicationName);
-      clientJmxObjectName = new ObjectName("org.apache.uima:name=" + applicationName);
-      jmxManager.registerMBean(clientSideJmxStats, clientJmxObjectName);
-
-      // Check if sharedConnection exists. If not create a new one. The sharedConnection
-      // is static and shared by all instances of UIMA AS client in a jvm. The check
-      // is made in a critical section by first acquiring a global static semaphore to
-      // prevent a race condition.
-      createSharedConnection(brokerURI);
-      running = true;
-      //  This is done to give the broker enough time to 'finalize' creation of
-      //  temp reply queue. It's been observed (on MAC OS only) that AMQ
-      //  broker QueueSession.createTemporaryQueue() call is not synchronous. Meaning,
-      //  return from createTemporaryQueue() does not guarantee immediate availability
-      //  of the temp queue. It seems like this operation is asynchronous, causing: 
-      //  "InvalidDestinationException: Cannot publish to a deleted Destination..."
-      //  on the service side when it tries to reply to the client.
-      try {
-        wait(100);
-      } catch( InterruptedException e) {}
+    	initJMX();
       sendMetaRequest();
       waitForMetadataReply();
       if (abort || !running) {
@@ -902,7 +1115,8 @@ public class BaseUIMAAsynchronousEngine_
           }
         }
         initialized = true;
-        remoteService = true;
+        //remoteService = true;
+        remoteService =  isServiceRemote();
         // running = true;
 
         for (int i = 0; listeners != null && i < listeners.size(); i++) {
@@ -929,7 +1143,71 @@ public class BaseUIMAAsynchronousEngine_
     super.acquireCpcReadySemaphore();
     state = ClientState.RUNNING;
   }
-
+	private AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
+		org.apache.xmlbeans.XmlOptions options = new org.apache.xmlbeans.XmlOptions();
+		
+		
+		XMLReader xmlReader = XMLReaderFactory.createXMLReader();
+		xmlReader.setFeature("http://xml.org/sax/features/external-general-entities", false);
+		xmlReader.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
+		xmlReader.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd",false);
+		xmlReader.setFeature("http://apache.org/xml/features/disallow-doctype-decl",true);
+		options.setLoadUseXMLReader(xmlReader);
+		
+		return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath), options);
+//		return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath));	
+	}
+	
+	private Provider provider( AnalysisEngineDeploymentDescriptionDocument dd) {
+		 String provider =
+			  dd.getAnalysisEngineDeploymentDescription().getDeployment().getProvider();
+		 
+		 provider = UimaAsDirectServiceBuilder.resolvePlaceholder(provider);
+		 System.out.println("...... provider() - "+provider);
+
+		 if (Provider.JAVA.get().equals(provider)) {
+			 return Provider.JAVA;
+		 } else if (Provider.ACTIVEMQ.get().equals(provider)) {
+			 return Provider.ACTIVEMQ;
+		 } else {
+			 throw new RuntimeException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
+		 }
+
+		 /*
+		 if (Provider.JAVA.get().equals(provider)) {
+			 return Provider.JAVA;
+		 } else if (Provider.ACTIVEMQ.get().equals(provider)) {
+			 return Provider.ACTIVEMQ;
+		 } else {
+			 throw new RuntimeException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
+		 }
+		 */
+	}
+	private Protocol protocol( AnalysisEngineDeploymentDescriptionDocument dd) {
+	     String protocol =
+			  dd.getAnalysisEngineDeploymentDescription().getDeployment().getProtocol();
+	     
+		 protocol = UimaAsDirectServiceBuilder.resolvePlaceholder(protocol);
+		 
+		 System.out.println("...... protocol() - "+protocol);
+	     if (Protocol.JAVA.get().equalsIgnoreCase(protocol) ) {
+			 return Protocol.JAVA;
+		 } else if (Protocol.JMS.get().equalsIgnoreCase(protocol)  ) {
+			 return Protocol.JMS;
+		 } else {
+			 throw new RuntimeException("Invalid protocol attribute value in Deployment Descriptor :{"+protocol+"} please check <deployment> element. Expected \"java\" or \"jms\"");
+		 }
+
+		 /*
+	     if (Protocol.JAVA.get().equals(protocol)) {
+			 return Protocol.JAVA;
+		 } else if (Protocol.JMS.get().equals(protocol)) {
+			 return Protocol.JMS;
+		 } else {
+			 throw new RuntimeException("Invalid protocol attribute value in Deployment Descriptor :{"+protocol+"} please check <deployment> element. Expected \"java\" or \"jms\"");
+		 }
+		 */
+	}
   /**
    * First generates a Spring context from a given deploy descriptor and than deploys the context
    * into a Spring Container.
@@ -942,34 +1220,29 @@ public class BaseUIMAAsynchronousEngine_
    * @return - a unique spring container id
    * 
    */
-  public String deploy(String aDeploymentDescriptor, Map anApplicationContext) throws Exception {
-	  String springContext = null;
-	  try {
-		springContext = generateSpringContext(aDeploymentDescriptor, anApplicationContext);
-
-        SpringContainerDeployer springDeployer = new SpringContainerDeployer(springContainerRegistry, this);
+  public String deploy(String deploymentDescriptorPath, Map anApplicationContext) throws Exception {
+	  AnalysisEngineDeploymentDescriptionDocument dd =
+	            parseDD(deploymentDescriptorPath);
+		  
+		  XmlDocumentProperties dp = dd.documentProperties();
+		  System.out.println(dp.getSourceName());
+
+		  // Use factory to create deployer instance for a given protocol and provider
+		  UimaAsServiceDeployer deployer = 
+				  ServiceDeployers.newDeployer(protocol(dd), provider(dd));
+		  
+		  service = deployer.deploy(dd, anApplicationContext);
+		  
+		  UimaAsServiceRegistry.getInstance().register(service);
 
-    	String id = springDeployer.deploy(springContext);
-      if ( springDeployer.isInitialized() ) {
-        springDeployer.startListeners();
-      }
-      return id;
-    } catch (Exception e) {
-      running = true;
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                  "main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_exception__WARNING", e);
-      }
+		  return service.getId();
 
-      throw e;
-    } finally {
-	  String uimaAsDebug = (String) anApplicationContext.get(UimaAsynchronousEngine.UimaEeDebug);
-	  if ( springContext != null && (null == uimaAsDebug || uimaAsDebug.equals("") ) ) {
-           disposeContextFiles(springContext);
-	  }
-    }
   }
+  
+  protected UimaASService getServiceReference()  {
+	  return service;
+  }
+  
   private void disposeContextFiles(String ...contextFiles) {
     for( String contextFile: contextFiles) {
       File file = new File(contextFile);
@@ -983,6 +1256,7 @@ public class BaseUIMAAsynchronousEngine_
 	 */
   public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
           throws Exception {
+	  /*
     if (aDeploymentDescriptorList == null) {
       throw new ResourceConfigurationException(UIMA_IllegalArgumentException.ILLEGAL_ARGUMENT,
               new Object[] { "Null", "DeploymentDescriptorList", "deploy()" });
@@ -1018,11 +1292,29 @@ public class BaseUIMAAsynchronousEngine_
 
 	//      disposeContextFiles(springContextFiles);
     }
-
+*/
+	  return "";
   }
 
   public void undeploy() throws Exception {
-    Iterator containerIterator = springContainerRegistry.keySet().iterator();
+		Iterator<Entry<String, List<UimaASService>>> iterator = 
+				UimaAsServiceRegistry.getInstance().getServiceList().entrySet().iterator();
+		// Need a separate list to hold service ids to prevent ConcurrentModificationException
+		List<String> serviceIdList = new ArrayList<String>();
+		while( iterator.hasNext() ) {
+			Iterator<UimaASService> listIterator = iterator.next().getValue().iterator();
+			while( listIterator.hasNext()) {
+				UimaASService service = listIterator.next();
+				serviceIdList.add(service.getId());
+			}	
+		}
+		// Now undeploy all services
+		for( String serviceId : serviceIdList ) {
+			undeploy(serviceId);
+		}
+		/*
+	  
+	  Iterator containerIterator = springContainerRegistry.keySet().iterator();
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
         String msg ="undeploying "+springContainerRegistry.size()+" Containers";
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
@@ -1039,6 +1331,7 @@ public class BaseUIMAAsynchronousEngine_
       }
       undeploy(containerId);
     }
+    */
   }
 
   public void undeploy(String aSpringContainerId) throws Exception {
@@ -1055,18 +1348,41 @@ public class BaseUIMAAsynchronousEngine_
       return;
     }
     
-    UimaEEAdminSpringContext adminContext = null;
-    if (!springContainerRegistry.containsKey(aSpringContainerId)) {
-        return;
-        // throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId +
-        // ". Unable to undeploy the Spring container");
-      }
-      // Fetch an administrative context which contains a Spring Container
-      adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
-      if (adminContext == null) {
+//    UimaEEAdminSpringContext adminContext = null;
+    final UimaASService deployedService =
+    		UimaAsServiceRegistry.getInstance().lookupById(aSpringContainerId);
+    if ( deployedService == null ) {
         throw new InvalidContainerException(
                 "Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
       }
+    switch (stop_level) {
+    case SpringContainerDeployer.QUIESCE_AND_STOP:
+      //((AnalysisEngineController) ctrer).quiesceAndStop();
+        //service.stop();
+    	deployedService.quiesce();
+
+      break;
+    case SpringContainerDeployer.STOP_NOW:
+     // ((AnalysisEngineController) ctrer).terminate();
+        //service.stop();
+        deployedService.stop();
+        
+ 
+      break;
+  }
+    UimaAsServiceRegistry.getInstance().unregister(deployedService);
+    /*
+//    if (!springContainerRegistry.containsKey(aSpringContainerId)) {
+//        return;
+//        // throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId +
+//        // ". Unable to undeploy the Spring container");
+//      }
+//      // Fetch an administrative context which contains a Spring Container
+//      adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
+//      if (adminContext == null) {
+//        throw new InvalidContainerException(
+//                "Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
+//      }
       // Fetch instance of the Container from its context
       ApplicationContext ctx = adminContext.getSpringContainer();
       // Query the container for objects that implement
@@ -1144,8 +1460,21 @@ public class BaseUIMAAsynchronousEngine_
       }
       // Remove the container from a local registry
       springContainerRegistry.remove(aSpringContainerId);
+      */
   }
+  private void initJMX() throws Exception {
+	  	// Generate unique identifier
+	      String uuid = UUIDGenerator.generate();
+	      // JMX does not allow ':' in the ObjectName so replace these with underscore
+	      uuid = uuid.replaceAll(":", "_");
+	      uuid = uuid.replaceAll("-", "_");
+	      applicationName += "_" + uuid;
+	      jmxManager = new JmxManager("org.apache.uima");
+	      clientSideJmxStats.setApplicationName(applicationName);
+	      clientJmxObjectName = new ObjectName("org.apache.uima:name=" + applicationName);
+	      jmxManager.registerMBean(clientSideJmxStats, clientJmxObjectName);
 
+	  }
   /**
    * Use dd2spring to generate Spring context file from a given deployment descriptor file.
    * 
@@ -1340,9 +1669,56 @@ public class BaseUIMAAsynchronousEngine_
 
 //  private void stopProducingCases(ClientRequest clientCachedRequest) {
   private void stopProducingCases(String casReferenceId, Destination cmFreeCasQueue) {
-	     PendingMessage msg = new PendingMessage(AsynchAEMessage.Stop);
-	     msg.put(AsynchAEMessage.Destination, cmFreeCasQueue);
-		 msg.put(AsynchAEMessage.CasReference, casReferenceId);
+	    try {
+//	      if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
+	      if (cmFreeCasQueue != null) {
+	        TextMessage msg = createTextMessage();
+	        msg.setText("");
+	        msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+//	        msg.setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest.getCasReferenceId());
+	        msg.setStringProperty(AsynchAEMessage.CasReference, casReferenceId);
+	        msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+	        msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
+	        msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+	        try {
+	          MessageProducer msgProducer = getMessageProducer(cmFreeCasQueue);
+	          if (msgProducer != null) {
+	        	  
+	              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+	                          "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                          "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue});
+	              }
+	            // Send STOP message to Cas Multiplier Service
+	            msgProducer.send(msg);
+	          } else {
+	              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+	                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                          "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                          "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
+	              }
+	          }
+
+	        } catch (Exception ex) {
+	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                    "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAJMS_exception__WARNING",
+	                    ex);
+	          }
+	        }
+	      }
+	    } catch (Exception e) {
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+	                "UIMAJMS_exception__WARNING", e);
+	      }
+	    }
+	    /*
+	  PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.Stop);
+	     msg.addProperty(AsynchAEMessage.Destination, cmFreeCasQueue);
+		 msg.addProperty(AsynchAEMessage.CasReference, casReferenceId);
 	     try {
 	    	 if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -1365,64 +1741,17 @@ public class BaseUIMAAsynchronousEngine_
               }
 	     }
 
-	  /*
-    try {
-//      if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
-      if (cmFreeCasQueue != null) {
-        TextMessage msg = createTextMessage();
-        msg.setText("");
-        msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
-//        msg.setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest.getCasReferenceId());
-        msg.setStringProperty(AsynchAEMessage.CasReference, casReferenceId);
-        msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-        msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
-        msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
-        try {
-          MessageProducer msgProducer = getMessageProducer(cmFreeCasQueue);
-          if (msgProducer != null) {
-        	  
-              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                          "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue});
-              }
-            // Send STOP message to Cas Multiplier Service
-            msgProducer.send(msg);
-          } else {
-              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                          "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
-              }
-          }
-
-        } catch (Exception ex) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_exception__WARNING",
-                    ex);
-          }
-        }
-      }
-    } catch (Exception e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_exception__WARNING", e);
-      }
-    }
-    */
+	 */
   }
   protected void dispatchFreeCasRequest(String casReferenceId, Message message) throws Exception {
-     PendingMessage msg = new PendingMessage(AsynchAEMessage.ReleaseCAS);
+     PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.ReleaseCAS);
 //     if ( message.getStringProperty(AsynchAEMessage.TargetingSelector) != null ) {
 //    	 msg.put(AsynchAEMessage.TargetingSelector,message.getStringProperty(AsynchAEMessage.TargetingSelector) );
 //     } else {
 //         msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo());
 //     }
-     msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo());
-	 msg.put(AsynchAEMessage.CasReference, casReferenceId);
+     msg.addProperty(AsynchAEMessage.Destination, message.getJMSReplyTo());
+	 msg.addProperty(AsynchAEMessage.CasReference, casReferenceId);
      sender.dispatchMessage(msg, this, false);
   }
   protected MessageSender getDispatcher() {

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java Mon Feb 26 18:54:11 2018
@@ -23,6 +23,10 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InvalidClassException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UimaASApplicationExitEvent;
@@ -31,8 +35,11 @@ import org.apache.uima.aae.controller.An
 import org.apache.uima.aae.jmx.monitor.BasicUimaJmxMonitorListener;
 import org.apache.uima.aae.jmx.monitor.JmxMonitor;
 import org.apache.uima.aae.jmx.monitor.JmxMonitorListener;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.UimaAsServiceRegistry;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.util.Level;
 import org.springframework.context.ApplicationEvent;
@@ -74,6 +81,7 @@ public class UIMA_Service implements App
       // allow multiple args for one key
       deploymentDescriptors = getMultipleArg2("-dd", args);
     }
+    /*
     String saxonURL = getArg("-saxonURL", args);
     String xslTransform = getArg("-xslt", args);
     String uimaAsDebug = getArg("-uimaEeDebug", args);
@@ -84,6 +92,7 @@ public class UIMA_Service implements App
       printUsageMessage();
       return null;
     }
+    */
     String brokerURL = getArg("-brokerURL", args);
     // Check if broker URL is specified on the command line. If it is not, use the default
     // localhost:61616. In either case, set the System property defaultBrokerURL. It will be used
@@ -99,6 +108,7 @@ public class UIMA_Service implements App
       System.out.println(">>> Setting Inactivity Timeout To: "
               + System.getProperty(JmsConstants.SessionTimeoutOverride));
     }
+    /*
     if (deploymentDescriptors.length == 0) {
       // array of context files passed in
       springConfigFileArray = args;
@@ -149,8 +159,10 @@ public class UIMA_Service implements App
         }
       }
     }
+    
     return springConfigFileArray;
-
+    */
+    return deploymentDescriptors;
   }
   public SpringContainerDeployer deploy(String[] springContextFiles, ApplicationListener<ApplicationEvent> listener) throws Exception {
 	    SpringContainerDeployer springDeployer;
@@ -415,6 +427,7 @@ public class UIMA_Service implements App
   public static void main(String[] args) {
     try {
       UIMA_Service service = new UIMA_Service();
+      /*
       // parse command args and run dd2spring to generate spring context
       // files from deployment descriptors
       String contextFiles[] = service.initialize(args);
@@ -422,6 +435,9 @@ public class UIMA_Service implements App
       if (contextFiles == null) {
         return;
       }
+      */
+      String dd[] = service.initialize(args);
+      /*
       // Deploy components defined in Spring context files. This method blocks until
       // the container is fully initialized and all UIMA-AS components are succefully
       // deployed.
@@ -430,6 +446,7 @@ public class UIMA_Service implements App
         System.out.println(">>> Failed to Deploy UIMA Service. Check Logs for Details");
         System.exit(0);
       }
+      
       // remove temporary spring context files generated from DD
       for( String contextFile: contextFiles) {
         File file = new File(contextFile);
@@ -437,8 +454,16 @@ public class UIMA_Service implements App
           file.delete();
         }
       }
+      */
+      BaseUIMAAsynchronousEngine_impl engine = 
+    		  new BaseUIMAAsynchronousEngine_impl();
+      
+      for( String deploymentDescriptorPath : dd ) {
+    	  engine.deploy(deploymentDescriptorPath, new HashMap<>());
+      }
       // Add a shutdown hook to catch kill signal and to force quiesce and stop
-      ServiceShutdownHook shutdownHook = new ServiceShutdownHook(serviceDeployer);
+      ServiceShutdownHook shutdownHook = new ServiceShutdownHook(engine);
+//      ServiceShutdownHook shutdownHook = new ServiceShutdownHook(serviceDeployer);
       Runtime.getRuntime().addShutdownHook(shutdownHook);
       // Check if we should start an optional JMX-based monitor that will provide service metrics
       // The monitor is enabled by existence of -Duima.jmx.monitor.interval=<number> parameter. By
@@ -451,21 +476,25 @@ public class UIMA_Service implements App
         service.startMonitor(Long.parseLong(monitorCheckpointFrequency));
       }
       
-      AnalysisEngineController topLevelControllor = serviceDeployer.getTopLevelController();
+//      AnalysisEngineController topLevelControllor = serviceDeployer.getTopLevelController();
       String prompt = "Press 'q'+'Enter' to quiesce and stop the service or 's'+'Enter' to stop it now.\nNote: selected option is not echoed on the console.";
-      if (topLevelControllor != null) {
+  //    if (topLevelControllor != null) {
         System.out.println(prompt);
         // Loop forever or until the service is stopped
-        while (!topLevelControllor.isStopped()) {
+        while ( engine.isRunning() ) {
+//            while (!topLevelControllor.isStopped()) {
+
           if (System.in.available() > 0) {
             int c = System.in.read();
             if (c == 's') {
               service.stopMonitor();
-              serviceDeployer.undeploy(SpringContainerDeployer.STOP_NOW);
+              engine.undeploy();
+              //serviceDeployer.undeploy(SpringContainerDeployer.STOP_NOW);
               System.exit(0);
             } else if (c == 'q') {
-              service.stopMonitor();
-              serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+            	engine.undeploy();
+              //service.stopMonitor();
+             // serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
               System.exit(0);
 
             } else if (Character.isLetter(c) || Character.isDigit(c)) {
@@ -473,13 +502,13 @@ public class UIMA_Service implements App
             }
           }
           // This is a polling loop. Sleep for 1 sec
-          try {
-        	if (!topLevelControllor.isStopped()) 
-              Thread.sleep(1000);
-          } catch (InterruptedException ex) {
-          }
+//          try {
+//        	if (!topLevelControllor.isStopped()) 
+//              Thread.sleep(1000);
+//          } catch (InterruptedException ex) {
+//          }
         } // while
-      }
+      //}
     } catch (Exception e) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
@@ -491,21 +520,28 @@ public class UIMA_Service implements App
   
   static class ServiceShutdownHook extends Thread {
     public SpringContainerDeployer serviceDeployer;
-
+    BaseUIMAAsynchronousEngine_impl engine;
+    
     public ServiceShutdownHook(SpringContainerDeployer serviceDeployer) {
       this.serviceDeployer = serviceDeployer;
     }
-
+    public ServiceShutdownHook(BaseUIMAAsynchronousEngine_impl engine) {
+        this.engine = engine;
+    }
     public void run() {
       try {
-      	AnalysisEngineController topLevelController = serviceDeployer.getTopLevelController();
-      	if (topLevelController != null && !topLevelController.isStopped() ) {
+    	  
+      	//AnalysisEngineController topLevelController = serviceDeployer.getTopLevelController();
+      	//if (topLevelController != null && !topLevelController.isStopped() ) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                 "run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_caught_signal__INFO", new Object[] { topLevelController.getComponentName() });
-      	  serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+                "UIMAJMS_caught_signal__INFO", new Object[] { "TopLevelService" });
+//          "UIMAJMS_caught_signal__INFO", new Object[] { topLevelController.getComponentName() });
+      	 // serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+    	  engine.undeploy();
+
       	  Runtime.getRuntime().halt(0);
-    	  } 
+    	  //} 
       } catch( Exception e) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.uima.aae.AsynchAECasManager_impl;
+import org.apache.uima.aae.InProcessCache;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.UimaAsThreadFactory;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.handler.HandlerBase;
+import org.apache.uima.aae.handler.input.MetadataRequestHandler_impl;
+import org.apache.uima.aae.handler.input.MetadataResponseHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessRequestHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessResponseHandler;
+import org.apache.uima.aae.service.AbstractUimaASService;
+import org.apache.uima.aae.service.ScaleoutSpecification;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
+import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
+import org.apache.uima.adapter.jms.service.builder.ActiveMQFactory;
+import org.apache.uima.adapter.jms.service.builder.JmsMessageListenerBuilder;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.resource.ResourceManager;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+public class UimaASJmsService extends AbstractUimaASService 
+implements UimaASService {
+	ActiveMQConnectionFactory factory = null;
+	private String brokerURL;
+	private String queueName;
+
+	private JmsOutputChannel outputChannel;
+	private JmsInputChannel inputChannel;
+	private CountDownLatch latchToCountNumberOfTerminatedThreads;
+	private CountDownLatch latchToCountNumberOfInitedThreads;
+	
+	ErrorHandlerChain errorHandlerChain;
+	private List<UimaDefaultMessageListenerContainer> listeners = 
+			new ArrayList<>();
+	
+	public static void main(String[] args) {
+		try {
+			
+			String queueName = "PersonTitleAnnotatorQueue";
+			String analysisEngineDescriptor = "C:/uima/releases/testing/uima/uima-as/2.9.0/target/uima-as-2.9.1-SNAPSHOT-bin/apache-uima-as-2.9.1-SNAPSHOT/examples/descriptors/analysis_engine/PersonTitleAnnotator.xml";
+			String brokerURL = "tcp://localhost:61616";
+			UimaASJmsService service = 
+					new UimaASJmsService();
+
+			
+			ScaleoutSpecification spec = 
+					new ScaleoutSpecification();
+			spec.withProcessScaleout(4).withGetMetaScaleout(1).withFreeCasScaleout(1);
+			
+			ErrorHandlerChain errorHandlerChain = null;
+			
+			InProcessCache inProcessCache = new InProcessCache();
+			
+			ResourceManager resourceManager =
+					UimaClassFactory.produceResourceManager();
+			
+			AsynchAECasManager_impl casManager = 
+					new AsynchAECasManager_impl(resourceManager);
+			casManager.setCasPoolSize(4);
+
+			PrimitiveAnalysisEngineController_impl controller =
+					new PrimitiveAnalysisEngineController_impl(null, queueName, analysisEngineDescriptor, casManager, inProcessCache, 1, 4);
+			
+			controller.setErrorHandlerChain(errorHandlerChain);
+						
+
+			service.withConttroller(controller)
+				.withErrorHandlerChain(errorHandlerChain)
+				.withBrokerURL(brokerURL)
+				.withInputChannel()
+				.withInputQueue(queueName)
+				.withOutputChannel()
+			    .build(4);
+			
+			service.start();
+			
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+
+	}
+	public UimaASJmsService withConttroller(AnalysisEngineController controller) {
+		this.controller = controller;
+		return this;
+	}
+	
+	public UimaASJmsService withInProcessCache(InProcessCache cache) {
+		this.inProcessCache = cache;
+		return this;
+	}
+	public UimaASJmsService withResourceSpecifier(ResourceSpecifier resourceSpecifier) {
+		this.resourceSpecifier = resourceSpecifier;
+		return this;
+	}
+	
+	public UimaASJmsService withBrokerURL(String brokerURL) {
+		this.brokerURL = brokerURL;
+		return this;
+	}
+	public UimaASJmsService withName(String name) {
+		this.name = name;
+		return this;
+	}
+	public UimaASJmsService withInputQueue(String queueName) {
+		this.queueName = queueName;
+		return this;
+	}
+	public UimaASJmsService withErrorHandlerChain(ErrorHandlerChain errorHandlerChain) {
+		this.errorHandlerChain = errorHandlerChain;
+		return this;
+	}
+	
+	private UimaASJmsService withInputChannel(){
+		inputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+		return this;
+	}
+	private UimaASJmsService withOutputChannel() {
+		outputChannel = new JmsOutputChannel();
+		outputChannel.setController(controller);
+
+		return this;
+	}
+	private void createConnectionFactory() {
+		if ( factory == null ) {
+			factory = ActiveMQFactory.newConnectionFactory(brokerURL, 0);
+			factory.setTrustAllPackages(true);
+		}
+	}
+	public String getBrokerURL() {
+		return brokerURL;
+	}
+	private String getSelector(Type type) {
+		String selector = null;
+		switch(type) {
+		case ProcessCAS:
+			selector = "Command=2000 OR Command=2002";
+			break;
+			
+		case GetMeta:
+			selector = "Command=2001";
+			break;
+			
+		case FreeCAS:
+		case Unknown:
+		case Reply:
+			break;
+		}
+		return selector;  // OK to return NULL. This means no selector will be used
+	}
+	private boolean isTempQueueListener(Type type) {
+		if ( Type.ProcessCAS.equals(type) || Type.GetMeta.equals(type)) {
+			return false;
+		}
+		return true;
+	}
+	private UimaDefaultMessageListenerContainer createListener(Type type, int scaleout) throws Exception{
+		if ( inputChannel == null ) {
+			withInputChannel();
+		}
+		if ( outputChannel == null ) {
+			withOutputChannel();
+			outputChannel.setServerURI(getBrokerURL());
+		}
+		ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+		if (controller.isPrimitive() && Type.ProcessCAS.equals(type)) {
+			
+			 // Create a Custom Thread Factory. Provide it with an instance of
+		      // PrimitiveController so that every thread can call it to initialize
+		      // the next available instance of a AE.
+		      ThreadFactory tf = new UimaAsThreadFactory().
+		    		  withThreadGroup(Thread.currentThread().getThreadGroup()).
+		    		  withPrimitiveController((PrimitiveAnalysisEngineController) controller).
+		    		  withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
+		    		  withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
+		      ((UimaAsThreadFactory)tf).setDaemon(true);
+		      // This ThreadExecutor will use custom thread factory instead of defult one
+		      ((ThreadPoolTaskExecutor) threadExecutor).setThreadFactory(tf);
+		}
+		threadExecutor.setCorePoolSize(scaleout);
+		threadExecutor.setMaxPoolSize(scaleout);
+		
+		// destination can be NULL if this listener is meant for a 
+		// a temp queue. Such destinations are created on demand 
+		// using destination resolver which is plugged into the 
+		// listener. The resolver creates a temp queue lazily on
+		// listener startup.
+		ActiveMQDestination destination = null;
+		
+		if ( !isTempQueueListener(type) ) {
+			destination = new ActiveMQQueue(queueName);
+		}
+		JmsMessageListenerBuilder listenerBuilder = 
+				new JmsMessageListenerBuilder();
+
+		UimaDefaultMessageListenerContainer messageListener =
+				listenerBuilder.withController(controller)
+		       			.withType(type)
+						.withConectionFactory(factory)
+						.withThreadPoolExecutor(threadExecutor)
+						.withConsumerCount(scaleout)
+						.withInputChannel(inputChannel)
+						.withSelector(getSelector(type))
+						.withDestination(destination)
+						.build();
+		messageListener.setReceiveTimeout(500);
+		return messageListener;
+	}
+	public HandlerBase getMessageHandler(AnalysisEngineController controller) {
+		MetadataRequestHandler_impl metaHandler = new MetadataRequestHandler_impl("MetadataRequestHandler");
+		metaHandler.setController(controller);
+		ProcessRequestHandler_impl processHandler = new ProcessRequestHandler_impl("ProcessRequestHandler");
+		processHandler.setController(controller);
+		metaHandler.setDelegate(processHandler);
+		if ( !controller.isPrimitive() ) {
+			MetadataResponseHandler_impl metaResponseHandler = 
+					new MetadataResponseHandler_impl("MetadataResponseHandler");
+			metaResponseHandler.setController(controller);
+			processHandler.setDelegate(metaResponseHandler);
+			
+			ProcessResponseHandler processResponseHandler = 
+					new ProcessResponseHandler("ProcessResponseHandler");
+			processResponseHandler.setController(controller);
+			metaResponseHandler.setDelegate(processResponseHandler);
+			
+		}
+		return metaHandler;
+	}
+	public UimaASJmsService build(int scaleout) throws Exception {
+		// First create Connection Factory. This is needed by
+		// JMS listeners.
+		createConnectionFactory();
+		// counts number of initialized threads
+		latchToCountNumberOfInitedThreads = new CountDownLatch(scaleout);
+		// counts number of terminated threads
+		latchToCountNumberOfTerminatedThreads = new CountDownLatch(scaleout);
+		// Add one instance of JmsOutputChannel 
+		if ( controller.getOutputChannel(ENDPOINT_TYPE.JMS) == null ) {
+			withOutputChannel();
+			outputChannel.setServerURI(brokerURL);
+			outputChannel.setServiceInputEndpoint(queueName);
+			controller.addOutputChannel(outputChannel);
+		} else {
+			outputChannel = (JmsOutputChannel)controller.getOutputChannel(ENDPOINT_TYPE.JMS);
+			outputChannel.setServiceInputEndpoint(queueName);
+		}
+		// Add one instance of JmsInputChannel
+		if ( controller.getInputChannel(ENDPOINT_TYPE.JMS) == null ) {
+			withInputChannel();   // one input channel instance
+			controller.setInputChannel(inputChannel);
+		} else {
+			inputChannel = (JmsInputChannel)controller.getInputChannel(ENDPOINT_TYPE.JMS);
+		}
+		
+		inputChannel.setController(controller);
+		
+		inputChannel.setMessageHandler(getMessageHandler(controller));
+		
+		// Create service JMS listeners to handle Process, GetMeta and optional FreeCas
+		// requests.
+		
+		// listener to handle process CAS requests
+		UimaDefaultMessageListenerContainer processListener 
+		    = createListener(Type.ProcessCAS, scaleout);
+		inputChannel.addListenerContainer(processListener);
+		
+		listeners.add(processListener);
+		// listener to handle GetMeta requests
+		UimaDefaultMessageListenerContainer getMetaListener 
+	        = createListener(Type.GetMeta, 1);
+		inputChannel.addListenerContainer(getMetaListener);
+		listeners.add(getMetaListener);
+		
+		if ( controller.isCasMultiplier()) {
+			// listener to handle Free CAS requests
+			UimaDefaultMessageListenerContainer freeCasListener 
+		        = createListener(Type.FreeCAS, 1);
+			inputChannel.addListenerContainer(freeCasListener);
+			listeners.add(freeCasListener);
+		}
+		
+		return this;
+	}
+	public void quiesce() throws Exception {
+		controller.quiesceAndStop();
+	}
+
+	public void stop() throws Exception {
+	//	controller.stop();
+		
+		controller.terminate();
+/*
+		for( UimaDefaultMessageListenerContainer listener : listeners ) {
+			listener.setTerminating();
+			listener.stop();
+			// wait for all process threads to exit
+			if ( controller.isPrimitive() && Type.ProcessCAS.equals(listener.getType())) {
+				latchToCountNumberOfTerminatedThreads.await();
+			}
+			if ( listener.getTaskExecutor() != null ) {
+				if ( listener.getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
+					ThreadPoolTaskExecutor threadExecutor =
+							(ThreadPoolTaskExecutor)listener.getTaskExecutor();
+					threadExecutor.getThreadPoolExecutor().shutdownNow();
+					threadExecutor.shutdown();
+				}
+			}
+//			listener.closeConnection();
+//			listener.destroy();
+			System.out.println("Stopped Process Listener ....");
+		}
+*/
+	}
+	@Override
+	public String getEndpoint() {
+		return queueName;
+	}
+	public ResourceSpecifier getResourceSpecifier( ){
+		return resourceSpecifier;
+	}
+	@Override
+	public String getId() {
+		// TODO Auto-generated method stub
+		return id;
+	}
+	@Override
+	public String getName() {
+		return name;
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.service.builder;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ActiveMQFactory {
+	private ActiveMQFactory() {
+		
+	}
+	public static ActiveMQQueue newQueue(String withName) {
+		return new ActiveMQQueue(withName);
+	}
+	
+	public static ActiveMQPrefetchPolicy newPrefetchPolicy(int howMany) {
+		ActiveMQPrefetchPolicy prefetchPolicy =
+				new ActiveMQPrefetchPolicy();
+		prefetchPolicy.setQueuePrefetch(howMany);
+		return prefetchPolicy;
+	}
+	
+	public static ActiveMQConnectionFactory newConnectionFactory(String broker, int prefetch) {
+		ActiveMQConnectionFactory factory = 
+				new ActiveMQConnectionFactory();
+		factory.setBrokerURL(broker);
+		factory.setPrefetchPolicy(newPrefetchPolicy(prefetch));
+
+		return factory;
+	}
+}