You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [3/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima...
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.service.builder;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.uima.aae.AsynchAECasManager_impl;
+import org.apache.uima.aae.InProcessCache;
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener;
+import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
+import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.TempDestinationResolver;
+import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.resource.ResourceManager;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+public class JmsMessageListenerBuilder {
+ private AnalysisEngineController controller;
+ private ActiveMQConnectionFactory connectionFactory;
+ private int consumerCount=1;
+ private InputChannel inputChannel;
+ private Endpoint endpoint;
+ private boolean isReplyListener = false;
+ private String selector=null;
+ private Destination destination=null; // queue
+ private ThreadPoolTaskExecutor threadExecutor=null;
+ private Type type;
+ private TempDestinationResolver tempQueueDestinationResolver = null;
+
+ public static void main(String[] args) {
+ try {
+ String endpointName = "PersonTitleAnnotatorQueue";
+ String analysisEngineDescriptor = "C:/uima/releases/testing/uima/uima-as/2.9.0/target/uima-as-2.9.1-SNAPSHOT-bin/apache-uima-as-2.9.1-SNAPSHOT/examples/descriptors/analysis_engine/PersonTitleAnnotator.xml";
+ String broker = "tcp://localhost:61616";
+ String processSelector = "Command=2000 OR Command=2002";
+ String getMetaSelector = "Command=2001";
+ int workQueueSize = 1;
+ int processScaleout = 4;
+ int scaleout = 1;
+
+ System.setProperty("BrokerURI",broker);
+ ErrorHandlerChain errorHandlerChain = null;
+
+ InProcessCache inProcessCache = new InProcessCache();
+
+ ResourceManager resourceManager =
+ UimaClassFactory.produceResourceManager();
+
+ AsynchAECasManager_impl casManager =
+ new AsynchAECasManager_impl(resourceManager);
+ casManager.setCasPoolSize(processScaleout);
+
+
+ JmsInputChannel processInputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+ JmsInputChannel getMetaInputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+
+ JmsOutputChannel outputChannel = new JmsOutputChannel();
+ outputChannel.setServerURI(broker);
+ PrimitiveAnalysisEngineController_impl controller =
+ new PrimitiveAnalysisEngineController_impl(null, endpointName, analysisEngineDescriptor, casManager, inProcessCache, workQueueSize, scaleout);
+
+ controller.setOutputChannel(outputChannel);
+ controller.setErrorHandlerChain(errorHandlerChain);
+
+
+ ActiveMQConnectionFactory factory =
+ ActiveMQFactory.newConnectionFactory(broker, 0);
+
+ factory.setTrustAllPackages(true);
+ ActiveMQDestination destination =
+ new ActiveMQQueue(endpointName);
+ JmsMessageListenerBuilder processListenerBuilder =
+ new JmsMessageListenerBuilder();
+ ThreadPoolTaskExecutor threadExecutor1 = new ThreadPoolTaskExecutor();
+
+ threadExecutor1.setCorePoolSize(processScaleout);
+ threadExecutor1.setMaxPoolSize(processScaleout);
+
+ UimaDefaultMessageListenerContainer jmsProcessMessageListener =
+ processListenerBuilder.withController(controller)
+ .withType(Type.ProcessCAS)
+ .withConectionFactory(factory)
+ .withThreadPoolExecutor(threadExecutor1)
+ .withConsumerCount(processScaleout)
+ .withInputChannel(processInputChannel)
+ .withSelector(processSelector)
+ .withDestination(destination)
+ .build();
+
+ JmsMessageListenerBuilder getMetaListenerBuilder =
+ new JmsMessageListenerBuilder();
+ ThreadPoolTaskExecutor threadExecutor2 = new ThreadPoolTaskExecutor();
+ threadExecutor2.setCorePoolSize(scaleout);
+ threadExecutor2.setMaxPoolSize(scaleout);
+
+ UimaDefaultMessageListenerContainer jmsGetMetaMessageListener =
+ getMetaListenerBuilder.withController(controller)
+ .withType(Type.GetMeta)
+ .withConectionFactory(factory)
+ .withThreadPoolExecutor(threadExecutor2)
+ .withConsumerCount(scaleout)
+ .withInputChannel(getMetaInputChannel)
+ .withSelector(getMetaSelector)
+ .withDestination(destination)
+ .build();
+
+ ThreadPoolTaskExecutor threadExecutor3 = new ThreadPoolTaskExecutor();
+ threadExecutor3.setCorePoolSize(scaleout);
+ threadExecutor3.setMaxPoolSize(scaleout);
+ TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName(),"");
+ resolver.setConnectionFactory(factory);
+
+ UimaDefaultMessageListenerContainer replyListener =
+ getMetaListenerBuilder.withController(controller)
+ .withType(Type.Reply)
+ .withConectionFactory(factory)
+ .withThreadPoolExecutor(threadExecutor3)
+ .withConsumerCount(scaleout)
+ .withTempDestinationResolver(resolver)
+ .build();
+
+
+ processInputChannel.setController(controller);
+ processInputChannel.addListenerContainer(jmsProcessMessageListener);
+
+ getMetaInputChannel.setController(controller);
+ getMetaInputChannel.addListenerContainer(jmsGetMetaMessageListener);
+
+ threadExecutor1.initialize();
+ threadExecutor1.getThreadPoolExecutor().prestartAllCoreThreads();
+ threadExecutor2.initialize();
+ threadExecutor2.getThreadPoolExecutor().prestartAllCoreThreads();
+ threadExecutor3.initialize();
+ threadExecutor3.getThreadPoolExecutor().prestartAllCoreThreads();
+
+ jmsProcessMessageListener.afterPropertiesSet();
+ jmsProcessMessageListener.initialize();
+ jmsProcessMessageListener.start();
+
+ jmsGetMetaMessageListener.afterPropertiesSet();
+ jmsGetMetaMessageListener.initialize();
+ jmsGetMetaMessageListener.start();
+
+
+ replyListener.afterPropertiesSet();
+ replyListener.initialize();
+ replyListener.start();
+
+/*
+ synchronized(inProcessCache ) {
+ inProcessCache.wait(5000);
+ System.out.println("Stopping Listeners ....");
+ jmsProcessMessageListener.setTerminating();
+ jmsProcessMessageListener.stop();
+ threadExecutor1.getThreadPoolExecutor().shutdownNow();
+ threadExecutor1.shutdown();
+ jmsProcessMessageListener.stop();
+ jmsProcessMessageListener.closeConnection();
+ jmsProcessMessageListener.destroy();
+ System.out.println("Stopped Process Listener ....");
+
+ jmsGetMetaMessageListener.setTerminating();
+ jmsGetMetaMessageListener.stop();
+
+ threadExecutor2.getThreadPoolExecutor().shutdownNow();
+ threadExecutor2.shutdown();
+ jmsGetMetaMessageListener.closeConnection();
+ jmsGetMetaMessageListener.destroy();
+ System.out.println("Stopped GetMeta Listener ....");
+ }
+ */
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public JmsMessageListenerBuilder withController(AnalysisEngineController controller ) {
+ this.controller = controller;
+ return this;
+ }
+
+ public JmsMessageListenerBuilder withTempDestinationResolver(TempDestinationResolver resolver ) {
+ this.tempQueueDestinationResolver = resolver;
+ return this;
+ }
+ public JmsMessageListenerBuilder withInputChannel(InputChannel inputChannel ) {
+ this.inputChannel = inputChannel;
+ return this;
+ }
+ public JmsMessageListenerBuilder withThreadPoolExecutor(ThreadPoolTaskExecutor threadExecutor) {
+ this.threadExecutor = threadExecutor;
+ return this;
+ }
+ public JmsMessageListenerBuilder withEndpoint(Endpoint endpoint ) {
+ this.endpoint = endpoint;
+ return this;
+ }
+ public JmsMessageListenerBuilder withSelector(String selector ) {
+ this.selector = selector;
+ return this;
+ }
+ public JmsMessageListenerBuilder withDestination(Destination destination ) {
+ this.destination = destination;
+ return this;
+ }
+ public JmsMessageListenerBuilder withConectionFactory(ActiveMQConnectionFactory connectionFactory ) {
+ this.connectionFactory = connectionFactory;
+ return this;
+ }
+
+ public JmsMessageListenerBuilder withConsumerCount(int howManyConsumers ) {
+ this.consumerCount = howManyConsumers;
+ return this;
+ }
+ public JmsMessageListenerBuilder asReplyListener() {
+ this.isReplyListener = true;
+ return this;
+ }
+ public JmsMessageListenerBuilder withType(Type t) {
+ this.type = t;
+ if ( Type.Reply.equals(t)) {
+ asReplyListener();
+ }
+ return this;
+ }
+ private void validate() {
+
+ }
+ private boolean isRemoteCasMultiplier(Endpoint endpoint) {
+ return (endpoint != null && endpoint.isRemote() && endpoint.isCasMultiplier() );
+ }
+ public UimaDefaultMessageListenerContainer build() throws Exception{
+ UimaDefaultMessageListenerContainer listener =
+ new UimaDefaultMessageListenerContainer();
+ /*
+ *
+ * VALIDATE REQUIRED PROPERTIES
+ *
+ */
+ // make sure all required properties are set
+ validate();
+ if ( threadExecutor != null ) {
+ threadExecutor.setThreadNamePrefix(controller.getComponentName()+"-"+type.name()+"Listener-Thread");
+ listener.setTaskExecutor(threadExecutor);
+
+ }
+
+ listener.setConcurrentConsumers(consumerCount);
+ listener.setController(controller);
+
+ if ( selector != null ) {
+ listener.setMessageSelector(selector);
+ }
+
+ if (isRemoteCasMultiplier(endpoint) ) {
+ // for remote CM's we need special handling. See description of a
+ // possible race condition in ConcurrentMessageListener class.
+ ThreadGroup tg = Thread.currentThread().getThreadGroup();
+ String prefix = endpoint.getDelegateKey()+" Reply Thread";
+ ConcurrentMessageListener concurrentListener =
+ new ConcurrentMessageListener(consumerCount, (JmsInputChannel)inputChannel, "", tg,prefix);
+ // register this listener with inputchannel so that we can stop it. The listener on a remote CM
+ // is ConcurrentMessageListener which imposes order of replies (parent last) before delegating
+ // msgs to the inputchannel. When stopping the service, all listeners must be registered with
+ // an inputchannel which is responsible for shutting down all listeners.
+ ((JmsInputChannel)inputChannel).registerListener(listener);
+ listener.setMessageListener(concurrentListener);
+ concurrentListener.setAnalysisEngineController(controller);
+ } else {
+ ((JmsInputChannel)inputChannel).registerListener(listener);
+ listener.setMessageListener(inputChannel);
+ }
+
+ listener.setTargetEndpoint(endpoint);
+ listener.setConnectionFactory(connectionFactory);
+ // is this listener processing replies from a remote service. This can
+ // only be true if the controller is an aggregate. Primitive controller
+ // can only handle requests from remote services. An aggregate can send
+ // requests and expects replies.
+ if ( isReplyListener || Type.FreeCAS.equals(type)) {
+ String e = Type.FreeCAS.equals(type) ? "FreeCASEndpoint" :endpoint.getDelegateKey();
+ TempDestinationResolver resolver = new
+ TempDestinationResolver(controller.getComponentName(), e);
+ resolver.setListener(listener);
+ resolver.setConnectionFactory(connectionFactory);
+ listener.setDestinationResolver(resolver);
+ listener.setDestinationName("");
+ if ( Type.FreeCAS.equals(type)) {
+ listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For FreeCas Listener");
+ } else {
+ listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For Delegate:"+endpoint.getDelegateKey());
+ }
+ } else if ( destination != null ) {
+ listener.setDestinationName(((ActiveMQDestination)destination).getPhysicalName());
+ listener.setDestination(destination);
+ listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener");
+
+ }
+
+ if ( type != null ) {
+ listener.setType(type);
+ }
+ return listener;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.service.builder;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.error.ErrorHandler;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.error.Threshold;
+import org.apache.uima.aae.error.Thresholds;
+import org.apache.uima.aae.error.handler.CpcErrorHandler;
+import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
+import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
+import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.builder.AbstractUimaAsServiceBuilder;
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.RemoteAnalysisEngineDelegate;
+import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
+import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.TempDestinationResolver;
+import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
+import org.apache.uima.adapter.jms.service.UimaASJmsService;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
+import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+public class UimaAsJmsServiceBuilder extends AbstractUimaAsServiceBuilder{
+// private static final String NoParent= "NoParent";
+// private static enum FlowControllerType {
+// FIXED
+// }
+// static Map<String, Object> ddAEMap = new HashMap<String, Object>();
+
+ /*
+ private InProcessCache cache;
+ private AsynchAECasManager_impl casManager;
+ private ResourceManager resourceManager;
+ */
+ private int scaleout=1;
+// private AnalysisEngineController controller;
+// private List<ControllerStatusListener> listeners = new ArrayList<ControllerStatusListener>();
+// private ServiceMode mode = ServiceMode.Asynchronous; // default
+// private AnalysisEngineDescription topLevelAEDescriptor;
+
+ public static void main(String[] args) {
+ try {
+ String tla = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/tutorial/ex4/MeetingDetectorTAEGovNameDetector.xml";
+ String ptDescriptor = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/analysis_engine/PersonTitleAnnotator.xml";
+ // "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/tutorial/ex4/MeetingDetectorTAE.xml";
+
+ //String dd1 = "C:/uima/releases/builds/uima-as/2.8.1/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelBlueJAggregateCM.xml";
+ String dd2 = "C:/uima/releases/builds/uima-as/2.8.1/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopAggregateWithInnerAggregateCM.xml";
+ String dd = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/deploy/as/Deploy_MeetingDetectorTAE.xml";
+
+ String dd3 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml";
+ String dd4 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml";
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ public static InputChannel createInputChannel(ChannelType type) {
+ return new JmsInputChannel(type);
+ }
+
+
+ public OutputChannel createOutputChannel() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ protected void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception {
+ String brokerURL = resolvePlaceholder(remoteDelegate.getBrokerURI());
+ int prefetch = remoteDelegate.getPrefetch();
+ endpoint.setEndpoint(resolvePlaceholder(remoteDelegate.getQueueName()));
+
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ // WITH HTTP BROKER URL THE PRFETCH MUST BE > 0
+ // OTHERWISE THE LISTENER DOES NOT GET MSGS
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+
+ if (prefetch == 0 ) {
+ prefetch = 1;
+ }
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ JmsInputChannel inputChannel;
+ if ((controller.getInputChannel(ENDPOINT_TYPE.JMS)) == null) {
+ inputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+ Handler messageHandlerChain = getMessageHandler(controller);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ controller.addInputChannel(inputChannel);
+ inputChannel.setController(controller);
+ } else {
+ inputChannel = (JmsInputChannel) controller.getInputChannel(ENDPOINT_TYPE.JMS);
+ }
+ // make the name unique
+ String qname = "rmtRtrnQ_"+controller.getComponentName().replaceAll("\\s","_")+"_"+endpoint.getDelegateKey()+"_"+UUID.randomUUID();
+ endpoint.setReplyToEndpoint(qname);
+ // remote always replies to a JMS temp queue
+ endpoint.setTempReplyDestination(true);
+ ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+ int consumerCount = 1; // default reply queue consumer count
+ // check if the DD includes reply queue scaleout for this remote delegate
+ if ( remoteDelegate.getReplyScaleout() > 1 ) {
+
+ // in this context the scaleout just means how many consumer threads
+ // this listener will start to handle messages arriving into the
+ // temp reply queue.
+ consumerCount = remoteDelegate.getReplyScaleout();
+ endpoint.setConcurrentReplyConsumers(remoteDelegate.getReplyScaleout());
+ if ( endpoint.isCasMultiplier() ) {
+ // for remote CM, the listener will use a single thread to receive
+ // CASes. This is done to deal with a race condition described in
+ // class ConcurrentMessageListener.
+ // if the remote is a cas multiplier,
+ threadExecutor.setCorePoolSize(1);
+ threadExecutor.setMaxPoolSize(1);
+ } else {
+ threadExecutor.setCorePoolSize(consumerCount);
+ threadExecutor.setMaxPoolSize(consumerCount);
+ }
+ } else {
+ threadExecutor.setCorePoolSize(consumerCount);
+ threadExecutor.setMaxPoolSize(consumerCount);
+ }
+ JmsMessageListenerBuilder replyListenerBuilder =
+ new JmsMessageListenerBuilder();
+
+ ActiveMQConnectionFactory factory =
+ new ActiveMQConnectionFactory(brokerURL);
+ factory.setTrustAllPackages(true);
+ ActiveMQPrefetchPolicy pp = new ActiveMQPrefetchPolicy();
+ pp.setQueuePrefetch(prefetch);
+
+ factory.setPrefetchPolicy(pp);
+ // Need a resolver to create temp reply queue. It will be created automatically
+ // by Spring.
+ TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName(),remoteDelegate.getKey());
+ resolver.setConnectionFactory(factory);
+
+ UimaDefaultMessageListenerContainer replyListener =
+ replyListenerBuilder.withController(controller)
+ .withType(Type.Reply)
+ .withInputChannel(inputChannel)
+ .withConectionFactory(factory)
+ .withThreadPoolExecutor(threadExecutor)
+ .withConsumerCount(consumerCount)
+ .withTempDestinationResolver(resolver)
+ .withEndpoint(endpoint)
+ .build();
+ //replyListener.afterPropertiesSet();
+ replyListener.start();
+
+// replyListener.setTargetEndpoint(endpoint);
+
+ // there should be one instance of OutputChannel for JMS. Create it, if one does not exist
+ if ( controller.getOutputChannel(ENDPOINT_TYPE.JMS) == null) {
+ JmsOutputChannel oc = new JmsOutputChannel();
+ oc.setController(controller);
+ oc.setServerURI(brokerURL);
+ oc.setControllerInputEndpoint("");
+ oc.setServiceInputEndpoint("");
+ oc.initialize();
+ controller.addOutputChannel(oc);
+ }
+ endpoint.setServerURI(brokerURL);
+ System.out.println("......... Service:"+controller.getComponentName()+" Reply Listener Started - Delegate:"+endpoint.getDelegateKey()+" Broker:"+endpoint.getServerURI()+" Endpoint:"+endpoint.getDestination());
+ }
+
+ public UimaASService buildAndDeploy(AnalysisEngineDeploymentDescriptionDocument doc, AnalysisEngineDelegate del,
+ UimaASJmsService service, ControllerCallbackListener callback) throws Exception {
+ // get top level CAS pool to
+ CasPoolType cp = getCasPoolConfiguration(doc);
+
+ super.addEnvironmentVariablesFromDD(doc);
+
+ System.setProperty("BrokerURI", service.getBrokerURL());
+
+ initialize(service, cp, Transport.JMS);
+ service.withInProcessCache(super.cache);
+
+ int howMany = howManyInstances(doc);
+ AnalysisEngineController topLevelController = createController(del, service.getResourceSpecifier(),
+ service.getName(), null, howMany);
+
+ // callback will be made when initialization succeeds or fails
+ topLevelController.addControllerCallbackListener(callback);
+
+ topLevelController.getServiceInfo().setBrokerURL(service.getBrokerURL());
+ topLevelController.setServiceId(service.getId());
+ // fetch service definition from DD
+ ServiceType s = getService(doc);
+
+ AsyncPrimitiveErrorConfigurationType pec;
+ if (s.getAnalysisEngine() != null && s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration() != null) {
+ pec = s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration();
+ } else {
+ pec = addDefaultErrorHandling(s);
+ }
+ service.withConttroller(topLevelController).withErrorHandlerChain(null);
+
+ configureTopLevelService(topLevelController, service, pec);
+
+ service.build(howMany);
+
+ return service;
+ }
+
+ private void configureTopLevelService(AnalysisEngineController topLevelController, UimaASJmsService service,
+ AsyncPrimitiveErrorConfigurationType pec) throws Exception {
+ // ResourceSpecifier resourceSpecifier = service.getResourceSpecifier();
+ if (!topLevelController.isPrimitive() && pec != null) {
+
+ ErrorHandlerChain chain = topLevelController.getErrorHandlerChain();
+ Iterator<ErrorHandler> handlers = chain.iterator();
+ while (handlers.hasNext()) {
+ ErrorHandler eh = handlers.next();
+ Map<String, Threshold> map = eh.getEndpointThresholdMap();
+ if (eh instanceof ProcessCasErrorHandler) {
+ if (pec.getProcessCasErrors() != null) {
+ map.put("", Thresholds.getThreshold(pec.getProcessCasErrors().getThresholdAction(),
+ pec.getProcessCasErrors().getMaxRetries()));
+ } else {
+ map.put("", Thresholds.newThreshold());
+ }
+ } else if (eh instanceof GetMetaErrorHandler) {
+ if (pec.getCollectionProcessCompleteErrors() != null) {
+ map.put("", Thresholds.getThreshold("terminate", 0));
+ }
+ } else if (eh instanceof CpcErrorHandler) {
+ map.put("", Thresholds.getThreshold("", 0));
+ }
+ }
+
+ }
+
+ }
+
+ public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback)
+ throws Exception {
+ // get the top level AnalysisEngine descriptor
+ String aeDescriptorPath = getAEDescriptorPath(dd);
+ // parse AE descriptor
+ ResourceSpecifier resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aeDescriptorPath);
+ validateDD(dd, resourceSpecifier);
+ ServiceType serviceDefinition = getService(dd);
+ AnalysisEngineDelegate topLevelService;
+ // in DD the analysisEngine specification is optional
+ if (serviceDefinition.getAnalysisEngine() == null) {
+ topLevelService = new AnalysisEngineDelegate("");
+ topLevelService.setResourceSpecifier((AnalysisEngineDescription) resourceSpecifier);
+ } else {
+ topLevelService = parse(getService(dd).getAnalysisEngine());
+ }
+ UimaASJmsService service = null;
+
+ String endpoint = resolvePlaceholder(serviceDefinition.getInputQueue().getEndpoint());
+ String brokerURL = resolvePlaceholder(serviceDefinition.getInputQueue().getBrokerURL());
+
+ if (resourceSpecifier instanceof AnalysisEngineDescription) {
+ AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+ // Create a Top Level Service (TLS) wrapper.
+ service = new UimaASJmsService().withName(aeDescriptor.getAnalysisEngineMetaData().getName())
+ .withResourceSpecifier(resourceSpecifier).withBrokerURL(brokerURL).withInputQueue(endpoint);
+
+ this.buildAndDeploy(dd, topLevelService, service, callback);
+ }
+ return service;
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+
+public abstract class AbstractUimaASDeployer
+implements UimaAsServiceDeployer, ControllerCallbackListener {
+ CountDownLatch latch;
+
+ protected AbstractUimaASDeployer(CountDownLatch latch) {
+ this.latch = latch;
+ }
+ public abstract UimaASService deploy(AnalysisEngineDeploymentDescriptionDocument dd, Map<String, String> deploymentProperties) throws Exception;
+
+ public void waitUntilInitialized() throws InterruptedException {
+ latch.await();
+ }
+ @Override
+ public void notifyOnTermination(String aMessage, EventTrigger cause) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
+ // TODO Auto-generated method stub
+ System.out.println("------- Controller:"+aController.getName()+" Exception During Initialization - Error:\n");
+ e.printStackTrace();
+ }
+
+ @Override
+ public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
+ System.out.println("------- Controller:"+aController.getName()+" Initialized");
+ latch.countDown();
+ }
+
+ @Override
+ public void notifyOnInitializationFailure(Exception e) {
+ // TODO Auto-generated method stub
+ latch.countDown();
+
+ }
+
+ @Override
+ public void notifyOnInitializationSuccess() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void notifyOnReconnecting(String aMessage) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void notifyOnReconnectionSuccess() {
+ // TODO Auto-generated method stub
+
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.as.deployer.direct.UimaAsDirectServiceDeployer;
+import org.apache.uima.as.deployer.jms.UimaAsJmsServiceDeployer;
+
+/*
+ * Concrete Factory class which creates instances of UimaAsServiceDeployer based
+ * type of protocol and provider. To make a new Deployer, add new protocol and
+ * provider to the enums below, and instantiate your deployer in newDeployer()
+ */
+public class ServiceDeployers {
+ public enum Protocol {
+ JAVA("java"), JMS("jms");
+ String protocol;
+
+ Protocol(String dt) {
+ protocol = dt;
+ }
+
+ public String get() {
+ return protocol;
+ }
+ }
+
+ public enum Provider {
+ JAVA("java"), ACTIVEMQ("activemq");
+ String provider;
+
+ Provider(String provider) {
+ this.provider = provider;
+ }
+
+ public String get() {
+ return provider;
+ }
+ }
+ /**
+ * Creates instance of a deployer for a given protocol and provider.
+ *
+ * @param protocol
+ * @param provider
+ * @return -
+ */
+ public static UimaAsServiceDeployer newDeployer(Protocol protocol, Provider provider) {
+
+ UimaAsServiceDeployer deployer = null;
+ if (Protocol.JAVA.equals(protocol) && Provider.JAVA.equals(provider)) {
+ deployer = new UimaAsDirectServiceDeployer(new CountDownLatch(1));
+ } else if (Protocol.JMS.equals(protocol) && Provider.ACTIVEMQ.equals(provider)) {
+ deployer = new UimaAsJmsServiceDeployer(new CountDownLatch(1));
+ }
+ return deployer;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer;
+
+import java.util.Map;
+
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+
+public interface UimaAsServiceDeployer {
+ public final String Deployment = "DEPLOYMENT";
+
+ public enum DeploymentStrategy {
+ JMS,
+ LOCAL;
+ };
+ public UimaASService deploy(AnalysisEngineDeploymentDescriptionDocument dd, Map<String, String> deploymentProperties) throws Exception;
+ public void waitUntilInitialized() throws InterruptedException;
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer.direct;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder;
+import org.apache.uima.as.deployer.AbstractUimaASDeployer;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+
+public class UimaAsDirectServiceDeployer extends AbstractUimaASDeployer {
+ public static void main(String[] args) {
+ String dd4 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml";
+ try {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ UimaAsDirectServiceDeployer deployer = new UimaAsDirectServiceDeployer(latch);
+
+ Map<String, String> deploymentProperties = new HashMap<String, String>();
+
+ deploymentProperties.put(Deployment, DeploymentStrategy.LOCAL.name());
+
+ AnalysisEngineDeploymentDescriptionDocument dd = AnalysisEngineDeploymentDescriptionDocument.Factory
+ .parse(new File(dd4));
+
+ deployer.deploy(dd, deploymentProperties);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public UimaAsDirectServiceDeployer(CountDownLatch latch) {
+ // pass in a latch object which will block until service
+ // is initialized. The blocking will take place in super.waitUntilInitialized()
+ super(latch);
+ System.out.println("........ UimaAsDirectServiceDeployer() - Direct Deployment");
+ }
+
+ public UimaASService deploy(AnalysisEngineDeploymentDescriptionDocument dd,
+ Map<String, String> deploymentProperties) throws Exception {
+ UimaASService uimaAsService = null;
+ try {
+ uimaAsService = new UimaAsDirectServiceBuilder().build(dd, this);
+ // start listeners
+ uimaAsService.start();
+ //
+ waitUntilInitialized();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ return uimaAsService;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.deployer.jms;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.adapter.jms.service.builder.UimaAsJmsServiceBuilder;
+import org.apache.uima.as.deployer.AbstractUimaASDeployer;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+
+public class UimaAsJmsServiceDeployer extends AbstractUimaASDeployer {
+ public UimaAsJmsServiceDeployer(CountDownLatch latch) {
+ super(latch);
+ System.out.println("........ UimaAsJmsServiceDeployer() - JMS Deployment");
+
+ }
+
+ public UimaASService deploy(AnalysisEngineDeploymentDescriptionDocument dd,
+ Map<String, String> deploymentProperties) throws Exception {
+
+ UimaASService uimaAsService = null;
+ try {
+ uimaAsService = new UimaAsJmsServiceBuilder().build(dd, this);
+ // start listeners. Nothing happens unless JMS listeners start
+ uimaAsService.start();
+ // block till service is ready
+ waitUntilInitialized();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ return uimaAsService;
+ }
+
+ public static void main(String[] args) {
+
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.dispatcher;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.adapter.jms.JmsConstants;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl;
+import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.impl.ProcessTrace_impl;
+
+public class LocalDispatcher implements Runnable {
+ private static final Class<LocalDispatcher> CLASS_NAME = LocalDispatcher.class;
+
+ private BlockingQueue<PendingMessage> messageQueue = null;
+ private BaseUIMAAsynchronousEngineCommon_impl client;
+ private UimaASService service;
+
+ public LocalDispatcher(BaseUIMAAsynchronousEngineCommon_impl client, UimaASService service,
+ BlockingQueue<PendingMessage> pendingMessageQueue) {
+ this.service = service;
+ this.client = client;
+ this.messageQueue = pendingMessageQueue;
+ }
+
+ private boolean reject(PendingMessage pm) {
+ return false;
+ }
+
+ private void dispatch(PendingMessage pm) throws Exception {
+ boolean doCallback = false;
+
+ switch (pm.getMessageType()) {
+ case AsynchAEMessage.GetMeta:
+ service.sendGetMetaRequest();
+ System.out.println("LocalDispatcher.dispatch()-dispatched getMeta Request");
+ break;
+
+ case AsynchAEMessage.Process:
+ doCallback = true;
+ service.process((CAS) pm.getProperty(AsynchAEMessage.CAS), pm.getPropertyAsString(AsynchAEMessage.CasReference));
+ System.out.println("LocalDispatcher.dispatch()-dispatched Process Request");
+ break;
+
+ case AsynchAEMessage.CollectionProcessComplete:
+ service.collectionProcessComplete();
+ System.out.println("LocalDispatcher.dispatch()-dispatched CPC Request");
+ break;
+ }
+ if ( doCallback ) {
+ UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),(CAS)pm.getProperty(AsynchAEMessage.CAS),
+ pm.getPropertyAsString(AsynchAEMessage.CasReference));
+ // Notify engine before sending a message
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_calling_onBeforeMessageSend__FINE",
+ new Object[] {
+ pm.getPropertyAsString(AsynchAEMessage.CasReference),
+ String.valueOf( ((CAS)(pm.getProperty(AsynchAEMessage.CAS))).hashCode())
+ });
+ }
+ // Note the callback is a misnomer. The callback is made *after* the send now
+ // Application receiving this callback can consider the CAS as delivere to a queue
+ client.onBeforeMessageSend(status);
+
+
+ }
+ }
+ public void run() {
+
+ while (client.isRunning()) {
+ PendingMessage pm = null;
+ try {
+ System.out.println("LocalDispatcher.run()- waiting for new message ...");
+ pm = messageQueue.take();
+ System.out.println("LocalDispatcher.run()-got new message to dispatch");
+ } catch (InterruptedException e) {
+
+ return;
+ }
+ // we may have waited in the take() above, so check if the client is still running
+ if (!client.isRunning() ) {
+ break;
+ }
+
+ boolean rejectRequest = reject(pm);
+ if (!rejectRequest && client.isRunning()) {
+ if (client.getServiceDelegate().isAwaitingPingReply()
+ && pm.getMessageType() == AsynchAEMessage.GetMeta) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_dispatching_getmeta_ping__INFO",
+ new Object[] {});
+ }
+ }
+ try {
+ client.beforeDispatch(pm);
+
+ dispatch(pm);
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "run",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ }
+ }
+ }
+ }
+
+}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Mon Feb 26 18:54:11 2018
@@ -58,6 +58,7 @@ import org.apache.activemq.command.Activ
import org.apache.log4j.Logger;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMA_IllegalStateException;
+import org.apache.uima.aae.InputChannel.ChannelType;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.client.UimaASProcessStatus;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
@@ -659,7 +660,7 @@ public class TestUimaASExtended extends
c.setDestinationName("TestQ");
c.setConcurrentConsumers(2);
c.setBeanName("testServiceWithHttpListeners() - JUnit Test Listener");
- c.setMessageListener(new JmsInputChannel());
+ c.setMessageListener(new JmsInputChannel(ChannelType.REQUEST_REPLY));
//c.initialize();
//c.afterPropertiesSet();
c.start();