You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [3/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima...

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.service.builder;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.uima.aae.AsynchAECasManager_impl;
+import org.apache.uima.aae.InProcessCache;
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener;
+import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
+import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.TempDestinationResolver;
+import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.resource.ResourceManager;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+public class JmsMessageListenerBuilder {
+	private AnalysisEngineController controller;
+	private ActiveMQConnectionFactory connectionFactory;
+	private int consumerCount=1;
+	private InputChannel inputChannel;
+	private Endpoint endpoint;
+	private boolean isReplyListener = false;
+	private String selector=null;
+	private Destination destination=null;  // queue 
+	private ThreadPoolTaskExecutor threadExecutor=null;
+	private Type type;
+	private TempDestinationResolver tempQueueDestinationResolver = null;
+	
+	public static void main(String[] args) {
+		try {
+			String endpointName = "PersonTitleAnnotatorQueue";
+			String analysisEngineDescriptor = "C:/uima/releases/testing/uima/uima-as/2.9.0/target/uima-as-2.9.1-SNAPSHOT-bin/apache-uima-as-2.9.1-SNAPSHOT/examples/descriptors/analysis_engine/PersonTitleAnnotator.xml";
+			String broker = "tcp://localhost:61616";
+			String processSelector = "Command=2000 OR Command=2002";
+			String getMetaSelector = "Command=2001";
+			int workQueueSize = 1;
+			int processScaleout = 4;
+			int scaleout = 1;
+			
+			System.setProperty("BrokerURI",broker);
+			ErrorHandlerChain errorHandlerChain = null;
+			
+			InProcessCache inProcessCache = new InProcessCache();
+			
+			ResourceManager resourceManager =
+					UimaClassFactory.produceResourceManager();
+			
+			AsynchAECasManager_impl casManager = 
+					new AsynchAECasManager_impl(resourceManager);
+			casManager.setCasPoolSize(processScaleout);
+			
+			
+			JmsInputChannel processInputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+			JmsInputChannel getMetaInputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+			
+			JmsOutputChannel outputChannel = new JmsOutputChannel();
+			outputChannel.setServerURI(broker);
+			PrimitiveAnalysisEngineController_impl controller =
+					new PrimitiveAnalysisEngineController_impl(null, endpointName, analysisEngineDescriptor, casManager, inProcessCache, workQueueSize, scaleout);
+			
+			controller.setOutputChannel(outputChannel);
+			controller.setErrorHandlerChain(errorHandlerChain);
+						
+			
+			ActiveMQConnectionFactory factory =
+					ActiveMQFactory.newConnectionFactory(broker, 0);
+			
+			factory.setTrustAllPackages(true);
+			ActiveMQDestination destination = 
+					new ActiveMQQueue(endpointName);
+			JmsMessageListenerBuilder processListenerBuilder = 
+					new JmsMessageListenerBuilder();
+			ThreadPoolTaskExecutor threadExecutor1 = new ThreadPoolTaskExecutor();
+			
+			threadExecutor1.setCorePoolSize(processScaleout);
+			threadExecutor1.setMaxPoolSize(processScaleout);
+
+			UimaDefaultMessageListenerContainer jmsProcessMessageListener =
+			       processListenerBuilder.withController(controller)
+			       			.withType(Type.ProcessCAS)
+							.withConectionFactory(factory)
+							.withThreadPoolExecutor(threadExecutor1)
+							.withConsumerCount(processScaleout)
+							.withInputChannel(processInputChannel)
+							.withSelector(processSelector)
+							.withDestination(destination)
+							.build();
+			
+			JmsMessageListenerBuilder getMetaListenerBuilder = 
+					new JmsMessageListenerBuilder();
+			ThreadPoolTaskExecutor threadExecutor2 = new ThreadPoolTaskExecutor();
+			threadExecutor2.setCorePoolSize(scaleout);
+			threadExecutor2.setMaxPoolSize(scaleout);
+			
+			UimaDefaultMessageListenerContainer jmsGetMetaMessageListener =
+					getMetaListenerBuilder.withController(controller)
+							.withType(Type.GetMeta)
+							.withConectionFactory(factory)
+							.withThreadPoolExecutor(threadExecutor2)
+							.withConsumerCount(scaleout)
+							.withInputChannel(getMetaInputChannel)
+							.withSelector(getMetaSelector)
+							.withDestination(destination)
+							.build();
+
+			ThreadPoolTaskExecutor threadExecutor3 = new ThreadPoolTaskExecutor();
+			threadExecutor3.setCorePoolSize(scaleout);
+			threadExecutor3.setMaxPoolSize(scaleout);
+			TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName(),"");
+			resolver.setConnectionFactory(factory);
+			
+			UimaDefaultMessageListenerContainer replyListener =
+					getMetaListenerBuilder.withController(controller)
+							.withType(Type.Reply)
+							.withConectionFactory(factory)
+							.withThreadPoolExecutor(threadExecutor3)
+							.withConsumerCount(scaleout)
+							.withTempDestinationResolver(resolver)
+							.build();
+			
+			
+			processInputChannel.setController(controller);
+			processInputChannel.addListenerContainer(jmsProcessMessageListener);
+			
+			getMetaInputChannel.setController(controller);
+			getMetaInputChannel.addListenerContainer(jmsGetMetaMessageListener);
+			
+			threadExecutor1.initialize();
+			threadExecutor1.getThreadPoolExecutor().prestartAllCoreThreads();
+			threadExecutor2.initialize();
+			threadExecutor2.getThreadPoolExecutor().prestartAllCoreThreads();
+			threadExecutor3.initialize();
+			threadExecutor3.getThreadPoolExecutor().prestartAllCoreThreads();
+			
+			jmsProcessMessageListener.afterPropertiesSet();
+			jmsProcessMessageListener.initialize();
+			jmsProcessMessageListener.start();
+			
+			jmsGetMetaMessageListener.afterPropertiesSet();
+			jmsGetMetaMessageListener.initialize();
+			jmsGetMetaMessageListener.start();
+			
+
+			replyListener.afterPropertiesSet();
+			replyListener.initialize();
+			replyListener.start();
+			
+/*				
+			synchronized(inProcessCache ) {
+				inProcessCache.wait(5000);
+				System.out.println("Stopping Listeners ....");
+				jmsProcessMessageListener.setTerminating();
+				jmsProcessMessageListener.stop();
+				threadExecutor1.getThreadPoolExecutor().shutdownNow();
+				threadExecutor1.shutdown();
+				jmsProcessMessageListener.stop();
+				jmsProcessMessageListener.closeConnection();
+				jmsProcessMessageListener.destroy();
+				System.out.println("Stopped Process Listener ....");
+				
+				jmsGetMetaMessageListener.setTerminating();
+				jmsGetMetaMessageListener.stop();
+				
+				threadExecutor2.getThreadPoolExecutor().shutdownNow();
+				threadExecutor2.shutdown();
+				jmsGetMetaMessageListener.closeConnection();
+				jmsGetMetaMessageListener.destroy();
+				System.out.println("Stopped GetMeta Listener ....");
+			}
+			*/
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+
+	}
+
+	public JmsMessageListenerBuilder withController(AnalysisEngineController controller ) {
+		this.controller = controller;
+		return this;
+	}
+	
+	public JmsMessageListenerBuilder withTempDestinationResolver(TempDestinationResolver resolver ) {
+		this.tempQueueDestinationResolver = resolver;
+		return this;
+	}
+	public JmsMessageListenerBuilder withInputChannel(InputChannel inputChannel ) {
+		this.inputChannel = inputChannel;
+		return this;
+	}
+	public JmsMessageListenerBuilder withThreadPoolExecutor(ThreadPoolTaskExecutor threadExecutor) {
+		this.threadExecutor = threadExecutor;
+		return this;
+	}
+	public JmsMessageListenerBuilder withEndpoint(Endpoint endpoint ) {
+		this.endpoint = endpoint;
+		return this;
+	}
+	public JmsMessageListenerBuilder withSelector(String selector ) {
+		this.selector = selector;
+		return this;
+	}
+	public JmsMessageListenerBuilder withDestination(Destination destination ) {
+		this.destination = destination;
+		return this;
+	}
+	public JmsMessageListenerBuilder withConectionFactory(ActiveMQConnectionFactory connectionFactory ) {
+		this.connectionFactory = connectionFactory;
+		return this;
+	}
+
+	public JmsMessageListenerBuilder withConsumerCount(int howManyConsumers ) {
+		this.consumerCount = howManyConsumers;
+		return this;
+	}
+	public JmsMessageListenerBuilder asReplyListener() {
+		this.isReplyListener = true;
+		return this;
+	}
+	public JmsMessageListenerBuilder withType(Type t) {
+		this.type = t;
+		if ( Type.Reply.equals(t)) {
+			asReplyListener();
+		}
+		return this;
+	}
+	private void validate() {
+		
+	}
+	private boolean isRemoteCasMultiplier(Endpoint endpoint) {
+	       return (endpoint != null && endpoint.isRemote()  && endpoint.isCasMultiplier() );
+	}
+	public UimaDefaultMessageListenerContainer build() throws Exception{
+		UimaDefaultMessageListenerContainer listener = 
+				new UimaDefaultMessageListenerContainer();
+		/*
+		 * 
+		 * VALIDATE REQUIRED PROPERTIES
+		 * 
+		 */
+		// make sure all required properties are set
+		validate();
+		if ( threadExecutor != null ) {
+			threadExecutor.setThreadNamePrefix(controller.getComponentName()+"-"+type.name()+"Listener-Thread");
+			listener.setTaskExecutor(threadExecutor);
+			
+		}
+		
+		listener.setConcurrentConsumers(consumerCount);
+		listener.setController(controller);
+		
+		if ( selector != null ) {
+			listener.setMessageSelector(selector);
+		}
+		
+        if (isRemoteCasMultiplier(endpoint) ) {
+        	// for remote CM's we need special handling. See description of a 
+        	// possible race condition in ConcurrentMessageListener class.
+    		ThreadGroup tg = Thread.currentThread().getThreadGroup();
+            String prefix = endpoint.getDelegateKey()+" Reply Thread";
+    		ConcurrentMessageListener concurrentListener = 
+    				new ConcurrentMessageListener(consumerCount, (JmsInputChannel)inputChannel, "", tg,prefix);
+    		// register this listener with inputchannel so that we can stop it. The listener on a remote CM 
+    		// is ConcurrentMessageListener which imposes order of replies (parent last) before delegating 
+    		// msgs to the inputchannel. When stopping the service, all listeners must be registered with 
+    		// an inputchannel which is responsible for shutting down all listeners.
+    		((JmsInputChannel)inputChannel).registerListener(listener);
+            listener.setMessageListener(concurrentListener);
+            concurrentListener.setAnalysisEngineController(controller);
+        } else {
+    		((JmsInputChannel)inputChannel).registerListener(listener);
+    		listener.setMessageListener(inputChannel);
+        }
+
+		listener.setTargetEndpoint(endpoint);
+		listener.setConnectionFactory(connectionFactory);
+		// is this listener processing replies from a remote service. This can
+		// only be true if the controller is an aggregate. Primitive controller
+		// can only handle requests from remote services. An aggregate can send
+		// requests and expects replies.
+		if ( isReplyListener || Type.FreeCAS.equals(type)) {
+			String e = Type.FreeCAS.equals(type) ? "FreeCASEndpoint" :endpoint.getDelegateKey();
+			TempDestinationResolver resolver = new
+					TempDestinationResolver(controller.getComponentName(), e);
+			resolver.setListener(listener);
+			resolver.setConnectionFactory(connectionFactory);
+			listener.setDestinationResolver(resolver);
+			listener.setDestinationName("");
+			if ( Type.FreeCAS.equals(type)) {
+				listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For FreeCas Listener");
+			} else {
+				listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For Delegate:"+endpoint.getDelegateKey());
+			}
+		} else if ( destination != null ) {
+			listener.setDestinationName(((ActiveMQDestination)destination).getPhysicalName());
+			listener.setDestination(destination);
+			listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener");
+
+		}
+
+		if ( type != null ) {
+			listener.setType(type);
+		}
+		return listener;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.service.builder;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.error.ErrorHandler;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.error.Threshold;
+import org.apache.uima.aae.error.Thresholds;
+import org.apache.uima.aae.error.handler.CpcErrorHandler;
+import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
+import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
+import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.builder.AbstractUimaAsServiceBuilder;
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.RemoteAnalysisEngineDelegate;
+import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
+import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.TempDestinationResolver;
+import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
+import org.apache.uima.adapter.jms.service.UimaASJmsService;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
+import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+public class UimaAsJmsServiceBuilder extends AbstractUimaAsServiceBuilder{
+//	private static final String NoParent= "NoParent";
+//	private static enum FlowControllerType {
+//		FIXED
+//	}
+//		static Map<String, Object> ddAEMap = new HashMap<String, Object>();
+	
+	/*
+	private InProcessCache cache;
+	private AsynchAECasManager_impl casManager;
+    private ResourceManager resourceManager;
+    */
+    private int scaleout=1;
+//    private AnalysisEngineController controller;
+//    private List<ControllerStatusListener> listeners = new ArrayList<ControllerStatusListener>();
+//    private ServiceMode mode = ServiceMode.Asynchronous;   // default 
+//    private AnalysisEngineDescription topLevelAEDescriptor;
+    
+	public static void main(String[] args) {
+		try {
+			String tla = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/tutorial/ex4/MeetingDetectorTAEGovNameDetector.xml";
+			String ptDescriptor = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/analysis_engine/PersonTitleAnnotator.xml";
+			//			"C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/tutorial/ex4/MeetingDetectorTAE.xml";
+
+			//String dd1 = "C:/uima/releases/builds/uima-as/2.8.1/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelBlueJAggregateCM.xml";
+			String dd2 = "C:/uima/releases/builds/uima-as/2.8.1/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopAggregateWithInnerAggregateCM.xml";
+			String dd = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/deploy/as/Deploy_MeetingDetectorTAE.xml";
+			
+			String dd3 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml";
+			String dd4 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml";
+
+		} catch(Exception e) {
+			e.printStackTrace();
+		}
+	}
+	
+	
+	public static InputChannel createInputChannel(ChannelType type) {
+		return new JmsInputChannel(type);
+	}
+
+	
+	public OutputChannel createOutputChannel() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	protected void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception {
+		String brokerURL =  resolvePlaceholder(remoteDelegate.getBrokerURI());
+		int prefetch = remoteDelegate.getPrefetch();
+		endpoint.setEndpoint(resolvePlaceholder(remoteDelegate.getQueueName())); 
+
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		// WITH HTTP BROKER URL THE PRFETCH MUST BE > 0 
+		// OTHERWISE THE LISTENER DOES NOT GET MSGS
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		
+		if (prefetch == 0 ) {
+			prefetch = 1;
+		}
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		JmsInputChannel inputChannel;
+		if ((controller.getInputChannel(ENDPOINT_TYPE.JMS)) == null) {
+			inputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+			Handler messageHandlerChain = getMessageHandler(controller);
+			inputChannel.setMessageHandler(messageHandlerChain);
+			controller.addInputChannel(inputChannel);
+			inputChannel.setController(controller);
+		} else {
+			inputChannel = (JmsInputChannel) controller.getInputChannel(ENDPOINT_TYPE.JMS);
+		}
+		// make the name unique
+		String qname = "rmtRtrnQ_"+controller.getComponentName().replaceAll("\\s","_")+"_"+endpoint.getDelegateKey()+"_"+UUID.randomUUID();
+		endpoint.setReplyToEndpoint(qname);
+		// remote always replies to a JMS temp queue
+		endpoint.setTempReplyDestination(true);
+		ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+		int consumerCount = 1; // default reply queue consumer count
+		// check if the DD includes reply queue scaleout for this remote delegate
+		if ( remoteDelegate.getReplyScaleout() > 1 ) { 
+			
+			// in this context the scaleout just means how many consumer threads
+			// this listener will start to handle messages arriving into the 
+			// temp reply queue.
+			consumerCount = remoteDelegate.getReplyScaleout();
+			endpoint.setConcurrentReplyConsumers(remoteDelegate.getReplyScaleout());
+			if ( endpoint.isCasMultiplier() ) {
+				// for remote CM, the listener will use a single thread to receive
+				// CASes. This is done to deal with a race condition described in
+				// class ConcurrentMessageListener. 
+				// if the remote is a cas multiplier, 
+				threadExecutor.setCorePoolSize(1);
+				threadExecutor.setMaxPoolSize(1);
+			} else {
+				threadExecutor.setCorePoolSize(consumerCount);
+				threadExecutor.setMaxPoolSize(consumerCount);
+			}
+		}  else {
+			threadExecutor.setCorePoolSize(consumerCount);
+			threadExecutor.setMaxPoolSize(consumerCount);
+		}
+		JmsMessageListenerBuilder replyListenerBuilder = 
+				new JmsMessageListenerBuilder();
+
+		ActiveMQConnectionFactory factory =
+				new ActiveMQConnectionFactory(brokerURL);
+		factory.setTrustAllPackages(true);
+		ActiveMQPrefetchPolicy pp = new ActiveMQPrefetchPolicy();
+		pp.setQueuePrefetch(prefetch);
+		
+		factory.setPrefetchPolicy(pp);
+		// Need a resolver to create temp reply queue. It will be created automatically
+		// by Spring.
+		TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName(),remoteDelegate.getKey());
+		resolver.setConnectionFactory(factory);
+		
+		UimaDefaultMessageListenerContainer replyListener =
+				replyListenerBuilder.withController(controller)
+						.withType(Type.Reply)
+						.withInputChannel(inputChannel)
+						.withConectionFactory(factory)
+						.withThreadPoolExecutor(threadExecutor)
+						.withConsumerCount(consumerCount)
+						.withTempDestinationResolver(resolver)
+						.withEndpoint(endpoint)
+						.build();		
+		//replyListener.afterPropertiesSet();
+		replyListener.start();
+		
+//		replyListener.setTargetEndpoint(endpoint);
+
+		// there should be one instance of OutputChannel for JMS. Create it, if one does not exist 
+		if ( controller.getOutputChannel(ENDPOINT_TYPE.JMS) == null) {
+	  		JmsOutputChannel oc = new JmsOutputChannel();
+			oc.setController(controller);
+			oc.setServerURI(brokerURL);
+			oc.setControllerInputEndpoint("");
+			oc.setServiceInputEndpoint("");
+			oc.initialize();
+			controller.addOutputChannel(oc);
+		}
+		endpoint.setServerURI(brokerURL);
+		System.out.println("......... Service:"+controller.getComponentName()+" Reply Listener Started - Delegate:"+endpoint.getDelegateKey()+" Broker:"+endpoint.getServerURI()+" Endpoint:"+endpoint.getDestination());
+	}
+
+	public UimaASService buildAndDeploy(AnalysisEngineDeploymentDescriptionDocument doc, AnalysisEngineDelegate del,
+			UimaASJmsService service, ControllerCallbackListener callback) throws Exception {
+		// get top level CAS pool to
+		CasPoolType cp = getCasPoolConfiguration(doc);
+
+		super.addEnvironmentVariablesFromDD(doc);
+
+		System.setProperty("BrokerURI", service.getBrokerURL());
+
+		initialize(service, cp, Transport.JMS); 
+		service.withInProcessCache(super.cache);
+
+		int howMany = howManyInstances(doc);
+		AnalysisEngineController topLevelController = createController(del, service.getResourceSpecifier(),
+				service.getName(), null, howMany);
+		
+		// callback will be made when initialization succeeds or fails
+		topLevelController.addControllerCallbackListener(callback);
+
+		topLevelController.getServiceInfo().setBrokerURL(service.getBrokerURL());
+		topLevelController.setServiceId(service.getId());
+		// fetch service definition from DD
+		ServiceType s = getService(doc);
+
+		AsyncPrimitiveErrorConfigurationType pec;
+		if (s.getAnalysisEngine() != null && s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration() != null) {
+			pec = s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration();
+		} else {
+			pec = addDefaultErrorHandling(s);
+		}
+		service.withConttroller(topLevelController).withErrorHandlerChain(null);
+
+		configureTopLevelService(topLevelController, service, pec);
+
+		service.build(howMany);
+
+		return service;
+	}
+
+	private void configureTopLevelService(AnalysisEngineController topLevelController, UimaASJmsService service,
+			AsyncPrimitiveErrorConfigurationType pec) throws Exception {
+		// ResourceSpecifier resourceSpecifier = service.getResourceSpecifier();
+		if (!topLevelController.isPrimitive() && pec != null) {
+
+			ErrorHandlerChain chain = topLevelController.getErrorHandlerChain();
+			Iterator<ErrorHandler> handlers = chain.iterator();
+			while (handlers.hasNext()) {
+				ErrorHandler eh = handlers.next();
+				Map<String, Threshold> map = eh.getEndpointThresholdMap();
+				if (eh instanceof ProcessCasErrorHandler) {
+					if (pec.getProcessCasErrors() != null) {
+						map.put("", Thresholds.getThreshold(pec.getProcessCasErrors().getThresholdAction(),
+								pec.getProcessCasErrors().getMaxRetries()));
+					} else {
+						map.put("", Thresholds.newThreshold());
+					}
+				} else if (eh instanceof GetMetaErrorHandler) {
+					if (pec.getCollectionProcessCompleteErrors() != null) {
+						map.put("", Thresholds.getThreshold("terminate", 0));
+					}
+				} else if (eh instanceof CpcErrorHandler) {
+					map.put("", Thresholds.getThreshold("", 0));
+				}
+			}
+
+		}
+
+	}
+
+	public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback)
+			throws Exception {
+		// get the top level AnalysisEngine descriptor
+		String aeDescriptorPath = getAEDescriptorPath(dd);
+		// parse AE descriptor
+		ResourceSpecifier resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aeDescriptorPath);
+		validateDD(dd, resourceSpecifier);
+		ServiceType serviceDefinition = getService(dd);
+		AnalysisEngineDelegate topLevelService;
+		// in DD the analysisEngine specification is optional
+		if (serviceDefinition.getAnalysisEngine() == null) {
+			topLevelService = new AnalysisEngineDelegate("");
+			topLevelService.setResourceSpecifier((AnalysisEngineDescription) resourceSpecifier);
+		} else {
+			topLevelService = parse(getService(dd).getAnalysisEngine());
+		}
+		UimaASJmsService service = null;
+
+		String endpoint = resolvePlaceholder(serviceDefinition.getInputQueue().getEndpoint());
+		String brokerURL = resolvePlaceholder(serviceDefinition.getInputQueue().getBrokerURL());
+
+		if (resourceSpecifier instanceof AnalysisEngineDescription) {
+			AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+			// Create a Top Level Service (TLS) wrapper.
+			service = new UimaASJmsService().withName(aeDescriptor.getAnalysisEngineMetaData().getName())
+					.withResourceSpecifier(resourceSpecifier).withBrokerURL(brokerURL).withInputQueue(endpoint);
+
+			this.buildAndDeploy(dd, topLevelService, service, callback);
+		}
+		return service;
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+
+public abstract class AbstractUimaASDeployer 
+implements UimaAsServiceDeployer, ControllerCallbackListener {
+	CountDownLatch latch;
+	
+	protected AbstractUimaASDeployer(CountDownLatch latch) {
+		this.latch = latch;
+	}
+	public abstract UimaASService deploy(AnalysisEngineDeploymentDescriptionDocument dd, Map<String, String> deploymentProperties) throws Exception;
+	
+	public void waitUntilInitialized() throws InterruptedException {
+		latch.await();
+	}
+	@Override
+	public void notifyOnTermination(String aMessage, EventTrigger cause) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
+		// TODO Auto-generated method stub
+		System.out.println("------- Controller:"+aController.getName()+" Exception During Initialization - Error:\n");
+		e.printStackTrace();
+	}
+
+	@Override
+	public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
+		System.out.println("------- Controller:"+aController.getName()+" Initialized");
+		latch.countDown();
+	}
+
+	@Override
+	public void notifyOnInitializationFailure(Exception e) {
+		// TODO Auto-generated method stub
+		latch.countDown();
+
+	}
+
+	@Override
+	public void notifyOnInitializationSuccess() {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void notifyOnReconnecting(String aMessage) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void notifyOnReconnectionSuccess() {
+		// TODO Auto-generated method stub
+
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.as.deployer.direct.UimaAsDirectServiceDeployer;
+import org.apache.uima.as.deployer.jms.UimaAsJmsServiceDeployer;
+
+/*
+ * Concrete Factory class which creates instances of UimaAsServiceDeployer based
+ * type of protocol and provider. To make a new Deployer, add new protocol and
+ * provider to the enums below, and instantiate your deployer in newDeployer()
+ */
+public class ServiceDeployers {
+	public enum Protocol {
+		JAVA("java"), JMS("jms");
+		String protocol;
+
+		Protocol(String dt) {
+			protocol = dt;
+		}
+
+		public String get() {
+			return protocol;
+		}
+	}
+
+	public enum Provider {
+		JAVA("java"), ACTIVEMQ("activemq");
+		String provider;
+
+		Provider(String provider) {
+			this.provider = provider;
+		}
+
+		public String get() {
+			return provider;
+		}
+	}
+	/**
+	 * Creates instance of a deployer for a given protocol and provider.
+	 * 
+	 * @param protocol
+	 * @param provider
+	 * @return - 
+	 */
+	public static UimaAsServiceDeployer newDeployer(Protocol protocol, Provider provider) {
+
+		UimaAsServiceDeployer deployer = null;
+		if (Protocol.JAVA.equals(protocol) && Provider.JAVA.equals(provider)) {
+			deployer = new UimaAsDirectServiceDeployer(new CountDownLatch(1));
+		} else if (Protocol.JMS.equals(protocol) && Provider.ACTIVEMQ.equals(provider)) {
+			deployer = new UimaAsJmsServiceDeployer(new CountDownLatch(1));
+		}
+		return deployer;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer;
+
+import java.util.Map;
+
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+
+public interface UimaAsServiceDeployer {
+	public final String Deployment = "DEPLOYMENT";
+	
+	public enum DeploymentStrategy {
+		JMS, 
+		LOCAL;
+	};
+	public UimaASService deploy(AnalysisEngineDeploymentDescriptionDocument dd, Map<String, String> deploymentProperties) throws Exception;
+	public void waitUntilInitialized() throws InterruptedException;
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer.direct;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder;
+import org.apache.uima.as.deployer.AbstractUimaASDeployer;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+
+public class UimaAsDirectServiceDeployer  extends AbstractUimaASDeployer {
+	public static void main(String[] args) {
+		String dd4 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml";
+		try {
+			CountDownLatch latch = new CountDownLatch(1);
+
+			UimaAsDirectServiceDeployer deployer = new UimaAsDirectServiceDeployer(latch);
+
+			Map<String, String> deploymentProperties = new HashMap<String, String>();
+
+			deploymentProperties.put(Deployment, DeploymentStrategy.LOCAL.name());
+
+			AnalysisEngineDeploymentDescriptionDocument dd = AnalysisEngineDeploymentDescriptionDocument.Factory
+					.parse(new File(dd4));
+
+			deployer.deploy(dd, deploymentProperties);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+	}
+
+	public UimaAsDirectServiceDeployer(CountDownLatch latch) {
+		// pass in a latch object which will block until service
+		// is initialized. The blocking will take place in super.waitUntilInitialized()
+		super(latch);
+		System.out.println("........ UimaAsDirectServiceDeployer() - Direct Deployment");
+	}
+
+	public UimaASService deploy(AnalysisEngineDeploymentDescriptionDocument dd,
+			Map<String, String> deploymentProperties) throws Exception {
+		UimaASService uimaAsService = null;
+		try {
+			uimaAsService = new UimaAsDirectServiceBuilder().build(dd, this);
+			// start listeners
+			uimaAsService.start();
+			// 
+			waitUntilInitialized();
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+		return uimaAsService;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer.jms;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.adapter.jms.service.builder.UimaAsJmsServiceBuilder;
+import org.apache.uima.as.deployer.AbstractUimaASDeployer;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+
+public class UimaAsJmsServiceDeployer extends AbstractUimaASDeployer {
+	public UimaAsJmsServiceDeployer(CountDownLatch latch) {
+		super(latch);
+		System.out.println("........ UimaAsJmsServiceDeployer() - JMS Deployment");
+
+	}
+
+	public UimaASService deploy(AnalysisEngineDeploymentDescriptionDocument dd,
+			Map<String, String> deploymentProperties) throws Exception {
+		
+   	   UimaASService uimaAsService = null;
+		try {
+			uimaAsService = new UimaAsJmsServiceBuilder().build(dd, this);
+			// start listeners. Nothing happens unless JMS listeners start
+			uimaAsService.start();
+			// block till service is ready
+			waitUntilInitialized();
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+		return uimaAsService;
+	}
+
+	public static void main(String[] args) {
+
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.dispatcher;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.adapter.jms.JmsConstants;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl;
+import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.impl.ProcessTrace_impl;
+
+public class LocalDispatcher implements Runnable  {
+	private static final Class<LocalDispatcher> CLASS_NAME = LocalDispatcher.class;
+
+	private BlockingQueue<PendingMessage> messageQueue = null;
+	private BaseUIMAAsynchronousEngineCommon_impl client;
+	private UimaASService service;
+
+	public LocalDispatcher(BaseUIMAAsynchronousEngineCommon_impl client, UimaASService service,
+			BlockingQueue<PendingMessage> pendingMessageQueue) {
+		this.service = service;
+		this.client = client;
+		this.messageQueue = pendingMessageQueue;
+	}
+
+	private boolean reject(PendingMessage pm) {
+		return false;
+	}
+
+	private void dispatch(PendingMessage pm) throws Exception {
+		boolean doCallback = false;
+
+		switch (pm.getMessageType()) {
+		case AsynchAEMessage.GetMeta:
+			service.sendGetMetaRequest();
+			System.out.println("LocalDispatcher.dispatch()-dispatched getMeta Request");
+			break;
+
+		case AsynchAEMessage.Process:
+			doCallback = true;
+			service.process((CAS) pm.getProperty(AsynchAEMessage.CAS), pm.getPropertyAsString(AsynchAEMessage.CasReference));
+			System.out.println("LocalDispatcher.dispatch()-dispatched Process Request");
+			break;
+
+		case AsynchAEMessage.CollectionProcessComplete:
+			service.collectionProcessComplete();
+			System.out.println("LocalDispatcher.dispatch()-dispatched CPC Request");
+			break;
+		}
+        if ( doCallback ) {
+            UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),(CAS)pm.getProperty(AsynchAEMessage.CAS),
+                    pm.getPropertyAsString(AsynchAEMessage.CasReference));
+            // Notify engine before sending a message
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(
+                        Level.FINE,
+                        CLASS_NAME.getName(),
+                        "run",
+                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAJMS_calling_onBeforeMessageSend__FINE",
+                        new Object[] {
+                          pm.getPropertyAsString(AsynchAEMessage.CasReference),
+                          String.valueOf( ((CAS)(pm.getProperty(AsynchAEMessage.CAS))).hashCode())
+                        });
+              }  
+            // Note the callback is a misnomer. The callback is made *after* the send now
+            // Application receiving this callback can consider the CAS as delivere to a queue
+            client.onBeforeMessageSend(status);
+          
+          
+          }
+	}
+	public void run() {
+
+		while (client.isRunning()) {
+			PendingMessage pm = null;
+			try {
+				System.out.println("LocalDispatcher.run()- waiting for new message ...");
+				pm = messageQueue.take();
+				System.out.println("LocalDispatcher.run()-got new message to dispatch");
+			} catch (InterruptedException e) {
+				
+				return;
+			}
+			// we may have waited in the take() above, so check if the client is still running
+			if (!client.isRunning() ) {
+				break; 
+			}
+			
+			boolean rejectRequest = reject(pm);
+			if (!rejectRequest && client.isRunning()) {
+				if (client.getServiceDelegate().isAwaitingPingReply()
+						&& pm.getMessageType() == AsynchAEMessage.GetMeta) {
+					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "run",
+								JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_dispatching_getmeta_ping__INFO",
+								new Object[] {});
+					}
+				}
+				try {
+					client.beforeDispatch(pm);
+					
+					dispatch(pm);
+				} catch (Exception e) {
+					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "run",
+								UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+					}
+				}
+			}
+		}
+	}
+
+}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Mon Feb 26 18:54:11 2018
@@ -58,6 +58,7 @@ import org.apache.activemq.command.Activ
 import org.apache.log4j.Logger;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMA_IllegalStateException;
+import org.apache.uima.aae.InputChannel.ChannelType;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.client.UimaASProcessStatus;
 import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
@@ -659,7 +660,7 @@ public class TestUimaASExtended extends
   	    c.setDestinationName("TestQ");
   	    c.setConcurrentConsumers(2);
   	    c.setBeanName("testServiceWithHttpListeners() - JUnit Test Listener");
-  	    c.setMessageListener(new JmsInputChannel());
+  	    c.setMessageListener(new JmsInputChannel(ChannelType.REQUEST_REPLY));
   	    //c.initialize();
   	    //c.afterPropertiesSet();
   	    c.start();