You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2020/06/10 13:15:03 UTC
[uima-async-scaleout] 19/34: uima-5501
This is an automated email from the ASF dual-hosted git repository.
cwiklik pushed a commit to branch uima-as-3
in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git
commit 94134546e63f2c107b171f7dc72217a333823bac
Author: cwiklik <cwiklik>
AuthorDate: Thu Oct 18 14:12:00 2018 +0000
uima-5501
---
aggregate-uima-as/pom.xml | 1 +
uima-as-parent/pom.xml | 1 -
.../client/BaseUIMAAsynchronousEngine_impl.java | 51 ++--
.../uima/adapter/jms/service/UimaASJmsService.java | 5 +
.../service/builder/UimaAsJmsServiceBuilder.java | 293 ++++++++++++++++++++-
.../direct/UimaAsDirectServiceDeployer.java | 18 +-
.../as/deployer/jms/UimaAsJmsServiceDeployer.java | 31 +++
.../apache/uima/as/dispatcher/LocalDispatcher.java | 5 +-
.../apache/uima/ee/test/TestUimaASNoErrors.java | 49 +++-
.../apache/uima/ee/test/utils/BaseTestSupport.java | 1 +
.../resources/deployment/Deploy_AsyncAggregate.xml | 49 ++++
.../main/java/org/apache/uima/aae/Lifecycle.java | 6 +
.../org/apache/uima/aae/UimaAsThreadFactory.java | 7 +-
.../AggregateAnalysisEngineComponent.java | 23 ++
.../aae/component/AnalysisEngineComponent.java | 147 +++++++++++
.../uima/aae/component/CasMultiplierComponent.java | 35 +++
.../uima/aae/component/CasMultiplierNature.java | 8 +
.../uima/aae/component/ComponentCasPool.java | 22 ++
.../PrimitiveAnalysisEngineComponent.java | 42 +++
.../component/RemoteAnalysisEngineComponent.java | 74 ++++++
.../apache/uima/aae/component/TestGenerator.java | 133 ++++++++++
.../aae/component/TopLevelServiceComponent.java | 248 +++++++++++++++++
.../dd/DeploymentDescriptorProcessor.java | 267 +++++++++++++++++++
.../factory/AnalysisEngineComponentFactory.java | 55 ++++
.../AggregateAnalysisEngineController_impl.java | 67 +++--
.../aae/controller/AnalysisEngineController.java | 3 +
.../controller/BaseAnalysisEngineController.java | 11 +-
.../org/apache/uima/aae/controller/Endpoint.java | 7 +
.../apache/uima/aae/controller/Endpoint_impl.java | 16 ++
.../connectors/AbstractUimaAsConsumer.java | 9 +
.../definition/connectors/ComponentConnector.java | 5 +
.../definition/connectors/ConnectorFactory.java | 31 +++
.../definition/connectors/ListenerCallback.java | 7 +
.../aae/definition/connectors/UimaAsConnector.java | 11 +
.../aae/definition/connectors/UimaAsConsumer.java | 17 ++
.../aae/definition/connectors/UimaAsEndpoint.java | 15 ++
.../aae/definition/connectors/UimaAsProducer.java | 13 +
.../connectors/basic/BasicConnector.java | 12 +
.../connectors/basic/DirectConnector.java | 13 +
.../connectors/jms/ActiveMqConnector.java | 36 +++
.../apache/uima/aae/message/MessageProcessor.java | 9 +
.../java/org/apache/uima/aae/message/Origin.java | 6 +
.../org/apache/uima/aae/message/UimaAsMessage.java | 36 +++
.../org/apache/uima/aae/message/UimaAsOrigin.java | 55 ++++
.../org/apache/uima/aae/service/UimaASService.java | 4 +-
.../builder/AbstractUimaAsServiceBuilder.java | 270 ++++++++++++++++++-
.../builder/UimaAsDirectServiceBuilder.java | 144 +++++++++-
.../aae/service/command/AbstractUimaAsCommand.java | 99 +++++--
.../CollectionProcessCompleteRequestCommand.java | 10 +-
.../CollectionProcessCompleteResponseCommand.java | 10 +-
.../uima/aae/service/command/CommandFactory.java | 143 ++++++++++
.../aae/service/command/GetMetaRequestCommand.java | 8 +-
.../service/command/GetMetaResponseCommand.java | 30 ++-
.../uima/aae/service/command/NoOpCommand.java | 10 +-
.../aae/service/command/PingRequestCommand.java | 10 +-
.../command/ProcessChildCasRequestCommand.java | 116 ++++----
.../command/ProcessChildCasResponseCommand.java | 30 +--
.../command/ProcessInputCasRequestCommand.java | 50 ++--
.../command/ProcessInputCasResponseCommand.java | 209 ++++++++-------
.../command/ProcessServiceInfoResponseCommand.java | 12 +-
.../service/command/ReleaseCASRequestCommand.java | 8 +-
.../aae/service/command/StopRequestCommand.java | 8 +-
.../service/command/UimaAsMessageProcessor.java | 22 ++
.../org/apache/uima/as/client/DirectListener.java | 1 -
.../org/apache/uima/as/client/DirectMessage.java | 77 +++++-
.../uima/as/client/DirectMessageContext.java | 3 +
.../BaseUIMAAsynchronousEngineCommon_impl.java | 30 ++-
67 files changed, 2876 insertions(+), 378 deletions(-)
diff --git a/aggregate-uima-as/pom.xml b/aggregate-uima-as/pom.xml
index f738e26..a5c097e 100644
--- a/aggregate-uima-as/pom.xml
+++ b/aggregate-uima-as/pom.xml
@@ -66,5 +66,6 @@
<module>../uimaj-as-jms</module>
<module>../aggregate-uima-as-eclipse-plugins</module>
<module>../uima-as-docbooks</module>
+ <module>../uimaj-as-connectors</module>
</modules>
</project>
diff --git a/uima-as-parent/pom.xml b/uima-as-parent/pom.xml
index c1d55b3..b46be6f 100644
--- a/uima-as-parent/pom.xml
+++ b/uima-as-parent/pom.xml
@@ -190,7 +190,6 @@ ${uimaASNoticeText}
<scope>compile</scope>
</dependency>
-
<!-- Active MQ Stuff -->
<dependency>
<groupId>org.apache.activemq</groupId>
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
index c41a5f1..f0623fb 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
@@ -58,23 +58,18 @@ import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.uima.UIMAFramework;
-import org.apache.uima.UIMA_IllegalArgumentException;
import org.apache.uima.UIMA_IllegalStateException;
import org.apache.uima.aae.AsynchAECasManager_impl;
-import org.apache.uima.aae.UIMAEE_Constants;
-import org.apache.uima.aae.VersionCompatibilityChecker;
import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
import org.apache.uima.aae.UimaASApplicationExitEvent;
import org.apache.uima.aae.UimaAsVersion;
+import org.apache.uima.aae.VersionCompatibilityChecker;
import org.apache.uima.aae.client.UimaASStatusCallbackListener;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.ControllerCallbackListener;
-import org.apache.uima.aae.controller.ControllerLifecycle;
import org.apache.uima.aae.controller.Endpoint;
-import org.apache.uima.aae.controller.UimacppServiceController;
-import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
import org.apache.uima.aae.jmx.JmxManager;
import org.apache.uima.aae.message.AsynchAEMessage;
@@ -84,9 +79,7 @@ import org.apache.uima.aae.service.UimaASService;
import org.apache.uima.aae.service.UimaAsServiceRegistry;
import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder;
import org.apache.uima.adapter.jms.JmsConstants;
-import org.apache.uima.adapter.jms.activemq.ConnectionFactoryIniter;
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
-import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
import org.apache.uima.adapter.jms.message.PendingMessage;
import org.apache.uima.adapter.jms.message.PendingMessageImpl;
import org.apache.uima.adapter.jms.service.Dd2spring;
@@ -99,7 +92,6 @@ import org.apache.uima.as.deployer.UimaAsServiceDeployer;
import org.apache.uima.as.dispatcher.LocalDispatcher;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
-import org.apache.uima.impl.UimaVersion;
import org.apache.uima.internal.util.UUIDGenerator;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceConfigurationException;
@@ -109,11 +101,8 @@ import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
import org.apache.uima.util.Level;
import org.apache.xmlbeans.XmlDocumentProperties;
-import org.apache.xmlbeans.XmlOptions;
-import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.XMLReaderFactory;
@@ -171,12 +160,6 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
protected volatile boolean stopped = false;
public BaseUIMAAsynchronousEngine_impl() {
this(Transport.JMS); // default
- /*
- super();
- UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
- "UIMA Version " + UIMAFramework.getVersionString() +
- " UIMA-AS Version " + UimaAsVersion.getVersionString());
- */
}
public BaseUIMAAsynchronousEngine_impl(Transport transport) {
super();
@@ -393,7 +376,8 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
}
public void stop() {
try {
- if ( brokerURI != null && !brokerURI.equals("java")) {
+ if ( isServiceRemote() ) {
+// if ( brokerURI != null && !brokerURI.equals("java")) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
@@ -475,8 +459,6 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
}
protected boolean isServiceRemote() {
return transport.equals(Transport.JMS);
-// return (service instanceof UimaASJmsService);
-// return service == null;
}
private void startLocalConsumer(Map anApplicationContext) {
@@ -519,7 +501,7 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
// start dispatcher in its own thread. It will fetch messages from a shared 'pendingMessageQueue'
LocalDispatcher dispatcher =
new LocalDispatcher(this, service, pendingMessageQueue);
- dispatchThread = new Thread(dispatcher);
+ dispatchThread = new Thread(dispatcher,"LocalDispatcher");
dispatchThread.start();
}
@@ -1231,9 +1213,21 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
throw new AsynchAEException("*** ERROR deployment descriptor validation failed");
}
*/
+ // use xmlbeans framework to parse dd and create java beans for it
XmlDocumentProperties dp = dd.documentProperties();
System.out.println(dp.getSourceName());
-
+ // based on deployment options create relevant deployer
+ UimaAsServiceDeployer deployer = newServiceDeployer(dd, anApplicationContext);
+ // deploy (instantiate) uima-as service(s)
+ service = deployer.deploy(dd, anApplicationContext);
+
+ UimaAsServiceRegistry.getInstance().register(service);
+
+ return service.getId();
+
+ }
+
+ private UimaAsServiceDeployer newServiceDeployer(AnalysisEngineDeploymentDescriptionDocument dd, Map anApplicationContext) throws Exception {
String protocolOverride = null;
if ( anApplicationContext!= null && anApplicationContext.containsKey(UimaAsynchronousEngine.Protocol) ) {
protocolOverride = (String)anApplicationContext.get(UimaAsynchronousEngine.Protocol);
@@ -1247,7 +1241,7 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
// the DD settings
if ( protocolOverride == null && providerOverride == null) {
// Use factory to create deployer instance for a given
- // protocol and provider defined in the DD
+ // protocol and provider
deployer =
ServiceDeployers.newDeployer(protocol(dd), provider(dd));
} else {
@@ -1272,15 +1266,8 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
deployer =
ServiceDeployers.newDeployer(deploymentProtocol, deploymentProvider);
}
-
- service = deployer.deploy(dd, anApplicationContext);
-
- UimaAsServiceRegistry.getInstance().register(service);
-
- return service.getId();
-
+ return deployer;
}
-
protected UimaASService getServiceReference() {
return service;
}
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
index b379565..8c542f2 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
@@ -476,5 +476,10 @@ implements UimaASService {
public String getName() {
return name;
}
+ @Override
+ public int getScaleout() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
}
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
index b2c780c..4779f67 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
@@ -18,20 +18,29 @@
*/
package org.apache.uima.adapter.jms.service.builder;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
import java.util.UUID;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.UimaAsPriorityBasedThreadFactory;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -46,12 +55,20 @@ import org.apache.uima.aae.error.handler.CpcErrorHandler;
import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.handler.HandlerBase;
+import org.apache.uima.aae.handler.input.MetadataRequestHandler_impl;
+import org.apache.uima.aae.handler.input.MetadataResponseHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessRequestHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessResponseHandler;
+import org.apache.uima.aae.service.AsynchronousUimaASService;
import org.apache.uima.aae.service.UimaASService;
import org.apache.uima.aae.service.builder.AbstractUimaAsServiceBuilder;
import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
import org.apache.uima.aae.service.delegate.RemoteAnalysisEngineDelegate;
+import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.PriorityMessageHandler;
import org.apache.uima.adapter.jms.activemq.TempDestinationResolver;
import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
import org.apache.uima.adapter.jms.service.UimaASJmsService;
@@ -65,6 +82,7 @@ import org.apache.uima.resourceSpecifier.CasPoolType;
import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
import org.apache.uima.resourceSpecifier.ServiceType;
+import org.apache.uima.util.Level;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class UimaAsJmsServiceBuilder extends AbstractUimaAsServiceBuilder{
@@ -103,6 +121,276 @@ public class UimaAsJmsServiceBuilder extends AbstractUimaAsServiceBuilder{
}
}
+ /*** NEW CODE */
+
+ public UimaASService build(TopLevelServiceComponent topLevelComponent, ControllerCallbackListener callback)
+ throws Exception {
+ UimaASService service = null;
+
+ // is this the only one resource specifier type supported by the current uima-as?
+ if (topLevelComponent.getResourceSpecifier() instanceof AnalysisEngineDescription) {
+ AnalysisEngineDescription aeDescriptor =
+ (AnalysisEngineDescription) topLevelComponent.getResourceSpecifier();
+ String endpoint = resolvePlaceholder(topLevelComponent.getEndpoint().getEndpoint());
+ // Create a Top Level Service (TLS) wrapper. This wrapper may contain
+ // references to multiple TLS service instances if the TLS is scaled
+ // up.
+ service = new UimaASJmsService().
+ withName(aeDescriptor.
+ getAnalysisEngineMetaData().getName())
+ .withResourceSpecifier(aeDescriptor).
+ withBrokerURL(topLevelComponent.getEndpoint().getServerURI()).
+ withInputQueue(endpoint);
+
+ this.buildAndDeploy(topLevelComponent, service, callback);
+
+
+ }
+ return service;
+ }
+
+ public UimaASService buildAndDeploy(TopLevelServiceComponent topLevelComponent, UimaASService service, ControllerCallbackListener callback) throws Exception {
+ // create ResourceManager, CasManager, and InProcessCache
+ initialize(service, topLevelComponent.getComponentCasPool(), Transport.Java);
+
+ AnalysisEngineController topLevelController =
+ createController(topLevelComponent, callback, service.getId());
+
+ service.withInProcessCache(super.cache);
+ System.setProperty("BrokerURI", "Direct");
+ configureTopLevelService(topLevelController, service /*, topLevelComponent.getScaleout() */);
+ return service;
+
+ }
+
+
+ private void configureTopLevelService(AnalysisEngineController topLevelController, UimaASService service) throws Exception {
+ // First create Connection Factory. This is needed by
+ // JMS listeners.
+ createConnectionFactory();
+ // counts number of initialized threads
+ CountDownLatch latchToCountNumberOfInitedThreads =
+ new CountDownLatch(service.getScaleout());
+ // counts number of terminated threads
+ CountDownLatch latchToCountNumberOfTerminatedThreads =
+ new CountDownLatch(service.getScaleout());
+ OutputChannel outputChannel;
+ ;
+ // Add one instance of JmsOutputChannel
+ if ( topLevelController.getOutputChannel(ENDPOINT_TYPE.JMS) == null ) {
+ outputChannel = new JmsOutputChannel();
+ outputChannel.setController(topLevelController);
+ outputChannel.setServerURI(brokerURL);
+ outputChannel.setServiceInputEndpoint(service.getEndpoint());
+ topLevelController.addOutputChannel(outputChannel);
+ } else {
+ outputChannel = (JmsOutputChannel)topLevelController.getOutputChannel(ENDPOINT_TYPE.JMS);
+ outputChannel.setServiceInputEndpoint(service.getEndpoint());
+ }
+ JmsInputChannel inputChannel;
+ // Add one instance of JmsInputChannel
+ if ( topLevelController.getInputChannel(ENDPOINT_TYPE.JMS) == null ) {
+ inputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+ topLevelController.setInputChannel(inputChannel);
+ } else {
+ inputChannel = (JmsInputChannel)topLevelController.getInputChannel(ENDPOINT_TYPE.JMS);
+ }
+
+ inputChannel.setController(topLevelController);
+
+ inputChannel.setMessageHandler(getMessageHandler(topLevelController));
+
+ // Create service JMS listeners to handle Process, GetMeta and optional FreeCas
+ // requests.
+
+ // listener to handle process CAS requests
+ UimaDefaultMessageListenerContainer processListener
+ = createListener(Type.ProcessCAS, service.getScaleout(), inputChannel, outputChannel);
+ inputChannel.addListenerContainer(processListener);
+
+
+
+
+
+
+ String targetStringSelector = "";
+ if ( System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) != null ) {
+ targetStringSelector = System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty);
+ } else {
+ // the default selector is IP:PID
+ String ip = InetAddress.getLocalHost().getHostAddress();
+ targetStringSelector = ip+":"+topLevelController.getPID();
+ }
+ UimaDefaultMessageListenerContainer targetedListener =
+ new UimaDefaultMessageListenerContainer();
+ targetedListener.setType(Type.Target);
+ // setup jms selector
+ if ( topLevelController.isCasMultiplier()) {
+ targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+ } else {
+ targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+ }
+
+ // use shared ConnectionFactory
+ targetedListener.setConnectionFactory(processListener.getConnectionFactory());
+ // mark the listener as a 'Targeted' listener
+ targetedListener.setTargetedListener();
+ targetedListener.setController(topLevelController);
+ // there will only be one delivery thread. Its job will be to
+ // add a targeted message to a BlockingQueue. Such thread will block
+ // in an enqueue if a dequeue is not available. This will be prevent
+ // the overwhelming the service with messages.
+ ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+ threadExecutor.setCorePoolSize(1);
+ threadExecutor.setMaxPoolSize(1);
+ targetedListener.setTaskExecutor(threadExecutor);
+ targetedListener.setConcurrentConsumers(1);
+ if ( processListener.getMessageListener() instanceof PriorityMessageHandler ) {
+ // the targeted listener will use the same message handler as the
+ // Process listener. This handler will add a message wrapper
+ // to enable prioritizing messages.
+ targetedListener.setMessageListener(processListener.getMessageListener());
+ }
+ // Same queue as the Process queue
+ targetedListener.setDestination(processListener.getDestination());
+ //registerListener(targetedListener);
+ // targetedListener.afterPropertiesSet();
+ threadExecutor.initialize();
+
+ //targetedListener.initialize();
+ //targetedListener.start();
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(),
+ "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_TARGET_LISTENER__INFO",
+ new Object[] {targetedListener.getMessageSelector(), topLevelController.getComponentName() });
+ }
+
+ inputChannel.addListenerContainer(targetedListener);
+
+ //listeners.add(processListener);
+ // listener to handle GetMeta requests
+ UimaDefaultMessageListenerContainer getMetaListener
+ = createListener(Type.GetMeta, 1);
+ inputChannel.addListenerContainer(getMetaListener);
+ //listeners.add(getMetaListener);
+
+ if ( topLevelController.isCasMultiplier()) {
+ // listener to handle Free CAS requests
+ UimaDefaultMessageListenerContainer freeCasListener
+ = createListener(Type.FreeCAS, 1);
+ inputChannel.addListenerContainer(freeCasListener);
+ //listeners.add(freeCasListener);
+ }
+ }
+
+ private UimaDefaultMessageListenerContainer createListener(Type type, int consumerCount, InputChannel inputChannel, OutputChannel outputChannel) throws Exception{
+ PriorityMessageHandler h = null;
+
+ ThreadPoolTaskExecutor jmsListenerThreadExecutor =
+ new ThreadPoolTaskExecutor();
+
+ if ( Type.ProcessCAS.equals(type)) {
+ outputChannel.setServerURI(getBrokerURL());
+ if ( controller.isPrimitive() ) {
+ h = new PriorityMessageHandler(consumerCount);
+ ThreadPoolTaskExecutor threadExecutor =
+ new ThreadPoolTaskExecutor();
+ controller.setThreadFactory(threadExecutor);
+
+ CountDownLatch latchToCountNumberOfTerminatedThreads =
+ new CountDownLatch(consumerCount);
+ // Create a Custom Thread Factory. Provide it with an instance of
+ // PrimitiveController so that every thread can call it to initialize
+ // the next available instance of a AE.
+ ThreadFactory tf =
+ new UimaAsPriorityBasedThreadFactory(Thread.currentThread().
+ getThreadGroup(), controller, latchToCountNumberOfTerminatedThreads)
+ .withQueue(h.getQueue()).withChannel(controller.getInputChannel(ENDPOINT_TYPE.JMS));
+
+
+ ((UimaAsPriorityBasedThreadFactory)tf).setDaemon(true);
+ // This ThreadExecutor will use custom thread factory instead of default one
+ threadExecutor.setThreadFactory(tf);
+ threadExecutor.setCorePoolSize(consumerCount);
+ threadExecutor.setMaxPoolSize(consumerCount);
+
+ // Initialize the thread pool
+ threadExecutor.initialize();
+
+ // Make sure all threads are started. This forces each thread to call
+ // PrimitiveController to initialize the next instance of AE
+ threadExecutor.getThreadPoolExecutor().prestartAllCoreThreads();
+ // This ThreadExecutor will use custom thread factory instead of default one
+
+ }
+
+ }
+ jmsListenerThreadExecutor.setCorePoolSize(consumerCount);
+ jmsListenerThreadExecutor.setMaxPoolSize(consumerCount);
+ jmsListenerThreadExecutor.initialize();
+
+
+// threadExecutor.setCorePoolSize(consumerCount);
+// threadExecutor.setMaxPoolSize(consumerCount);
+
+ // destination can be NULL if this listener is meant for a
+ // a temp queue. Such destinations are created on demand
+ // using destination resolver which is plugged into the
+ // listener. The resolver creates a temp queue lazily on
+ // listener startup.
+ ActiveMQDestination destination = null;
+
+ if ( !isTempQueueListener(type) ) {
+ destination = new ActiveMQQueue(queueName);
+ }
+ JmsMessageListenerBuilder listenerBuilder =
+ new JmsMessageListenerBuilder();
+
+ UimaDefaultMessageListenerContainer messageListener =
+ listenerBuilder.withController(controller)
+ .withType(type)
+ .withConectionFactory(factory)
+ .withThreadPoolExecutor(jmsListenerThreadExecutor)
+ .withConsumerCount(consumerCount)
+ .withInputChannel(inputChannel)
+ .withPriorityMessageHandler(h)
+ .withSelector(getSelector(type))
+ .withDestination(destination)
+ .build();
+ messageListener.setReceiveTimeout(500);
+// messageListener.setMessageListener(h);
+ return messageListener;
+ }
+ public HandlerBase getMessageHandler(AnalysisEngineController controller) {
+ MetadataRequestHandler_impl metaHandler = new MetadataRequestHandler_impl("MetadataRequestHandler");
+ metaHandler.setController(controller);
+ ProcessRequestHandler_impl processHandler = new ProcessRequestHandler_impl("ProcessRequestHandler");
+ processHandler.setController(controller);
+ metaHandler.setDelegate(processHandler);
+ if ( !controller.isPrimitive() ) {
+ MetadataResponseHandler_impl metaResponseHandler =
+ new MetadataResponseHandler_impl("MetadataResponseHandler");
+ metaResponseHandler.setController(controller);
+ processHandler.setDelegate(metaResponseHandler);
+
+ ProcessResponseHandler processResponseHandler =
+ new ProcessResponseHandler("ProcessResponseHandler");
+ processResponseHandler.setController(controller);
+ metaResponseHandler.setDelegate(processResponseHandler);
+
+ }
+ return metaHandler;
+ }
+
+
+
+
+ /* OLD CODE */
+
+
+
+
public static InputChannel createInputChannel(ChannelType type) {
return new JmsInputChannel(type);
@@ -388,7 +676,10 @@ public class UimaAsJmsServiceBuilder extends AbstractUimaAsServiceBuilder{
}
public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback)
- throws Exception {
+ throws Exception {
+
+
+
// get the top level AnalysisEngine descriptor
String aeDescriptorPath = getAEDescriptorPath(dd);
// parse AE descriptor
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java b/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
index c73ba5f..3e7a7a3 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
+import org.apache.uima.aae.component.dd.DeploymentDescriptorProcessor;
import org.apache.uima.aae.service.UimaASService;
import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder;
import org.apache.uima.as.deployer.AbstractUimaASDeployer;
@@ -42,7 +44,21 @@ public class UimaAsDirectServiceDeployer extends AbstractUimaASDeployer {
Map<String, String> deploymentProperties) throws Exception {
UimaASService uimaAsService = null;
try {
- uimaAsService = new UimaAsDirectServiceBuilder().build(dd, this);
+ DeploymentDescriptorProcessor ddProcessor =
+ new DeploymentDescriptorProcessor(dd);
+
+ // process dd producing TopLevelServiceComponent. If the dd
+ // is an aggregate, the component object will include a tree
+ // of delegates. It basically combines information from both
+ // a dd and resource specifier for all parts of the pipeline
+ // aggregating instances of AnalysisEngineComponent created
+ // for every delegate.
+ TopLevelServiceComponent topLevelComponent =
+ ddProcessor.newComponent();
+
+ // create an instance of a service for the client to use
+ uimaAsService = new UimaAsDirectServiceBuilder().build(topLevelComponent, this);
+
// start listeners
uimaAsService.start();
// block until all internal components initialize and are ready to process
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java b/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
index a32e8f8..ec8e966 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
@@ -21,7 +21,10 @@ package org.apache.uima.as.deployer.jms;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
+import org.apache.uima.aae.component.dd.DeploymentDescriptorProcessor;
import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder;
import org.apache.uima.adapter.jms.service.builder.UimaAsJmsServiceBuilder;
import org.apache.uima.as.deployer.AbstractUimaASDeployer;
import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
@@ -37,6 +40,33 @@ public class UimaAsJmsServiceDeployer extends AbstractUimaASDeployer {
Map<String, String> deploymentProperties) throws Exception {
UimaASService uimaAsService = null;
+
+ try {
+ DeploymentDescriptorProcessor ddProcessor =
+ new DeploymentDescriptorProcessor(dd);
+
+ // process dd producing TopLevelServiceComponent. If the dd
+ // is an aggregate, the component object will include a tree
+ // of delegates. It basically combines information from both
+ // a dd and resource specifier for all parts of the pipeline
+ // aggregating instances of AnalysisEngineComponent created
+ // for every delegate.
+ TopLevelServiceComponent topLevelComponent =
+ ddProcessor.newComponent();
+
+ // create an instance of a service for the client to use
+ //uimaAsService = new UimaAsDirectServiceBuilder().build(topLevelComponent, this);
+ uimaAsService = new UimaAsJmsServiceBuilder().build(topLevelComponent, this);
+ // start listeners
+ uimaAsService.start();
+ // block until all internal components initialize and are ready to process
+ waitUntilInitialized();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ /*
try {
uimaAsService = new UimaAsJmsServiceBuilder().build(dd, this);
// start listeners. Nothing happens unless JMS listeners start
@@ -57,6 +87,7 @@ public class UimaAsJmsServiceDeployer extends AbstractUimaASDeployer {
throw e;
}
+ */
return uimaAsService;
}
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
index 646cd6e..f3e03a4 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
@@ -99,7 +99,7 @@ public class LocalDispatcher implements Runnable {
while (client.isRunning()) {
PendingMessage pm = null;
try {
- System.out.println("LocalDispatcher.run()- waiting for new message ...");
+ System.out.println("LocalDispatcher.run()- waiting for new message ... queue hashcode:"+messageQueue.hashCode());
pm = messageQueue.take();
System.out.println("LocalDispatcher.run()-got new message to dispatch");
} catch (InterruptedException e) {
@@ -122,9 +122,12 @@ public class LocalDispatcher implements Runnable {
}
}
try {
+ System.out.println(".................... calling LocalDispatch.beforeDispatch()");
client.beforeDispatch(pm);
+ System.out.println(".................... calling LocalDispatch.dispatch()");
dispatch(pm);
+ System.out.println(".................... LocalDispatch.dispatch() returned");
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "run",
diff --git a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
index c25d29f..1976e2f 100644
--- a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
+++ b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
@@ -51,6 +51,7 @@ import org.apache.uima.aae.service.UimaASService;
import org.apache.uima.aae.service.UimaAsServiceRegistry;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.TypeSystem;
import org.apache.uima.collection.CollectionReader;
@@ -129,12 +130,16 @@ public class TestUimaASNoErrors extends BaseTestSupport {
@Test
public void testDeploy() throws Exception {
+
UimaAsynchronousEngine uimaAS = getClient(Transport.Java);
Map ctx = new HashMap<>();
-
+ ctx.put(UimaAsynchronousEngine.Provider,"java");
+ ctx.put(UimaAsynchronousEngine.Protocol,"java");
+/*
ctx.put(UimaAsynchronousEngine.Provider,"activemq");
ctx.put(UimaAsynchronousEngine.Protocol,"jms");
+ */
uimaAS.deploy(relativePath + "/Deploy_NoOpAnnotator.xml", ctx);
runTest2(null, uimaAS, getMasterConnectorURI(broker),
@@ -143,6 +148,21 @@ public class TestUimaASNoErrors extends BaseTestSupport {
uimaAS.stop();
}
+ @Test
+ public void testJmsServiceAdapter() throws Exception {
+ Logger.getLogger(this.getClass()).info("-------------- testJmsServiceAdapter -------------");
+ //setUp();
+ BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+ try {
+ deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
+ deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsService.xml");
+ runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
+ 0, PROCESS_LATCH);
+
+ } catch( Exception e ) {
+ throw e;
+ }
+ }
/*
*
*
@@ -1419,6 +1439,27 @@ public class TestUimaASNoErrors extends BaseTestSupport {
uimaAsClient.stop();
}
+ @Test
+ public void testDeployAsyncAggregateServiceOverJava() throws Exception {
+ testDeployAsyncAggregateService(Transport.Java);
+ }
+
+ public void testDeployAsyncAggregateService(Transport transport) throws Exception {
+ System.out.println("-------------- testDeployAggregateService -------------");
+ UimaAsynchronousEngine uimaAsClient = getClient(transport);
+ System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
+ Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue");
+ deployTopLevelService(appCtx, transport, uimaAsClient, relativePath + "/Deploy_AsyncAggregate.xml",
+ "TopLevelTaeQueue");
+
+ appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
+
+ addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
+
+ runTest(appCtx, uimaAsClient, "tcp://localhost:61616", "TopLevelTaeQueue", 200, PROCESS_LATCH);
+ }
+
@Test
public void testDeployAggregateServiceOverJava() throws Exception {
testDeployAggregateService(Transport.Java);
@@ -1433,10 +1474,12 @@ public class TestUimaASNoErrors extends BaseTestSupport {
//BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
- // System.setProperty("BrokerURL", "tcp::/localhost:61616");
+ System.setProperty("NoOpBroker", "tcp::/localhost:61616");
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
// deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
- Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue");
+ deployJmsService(uimaAsClient, relativePath + "/Deploy_NoOpAnnotatorUsingPlaceholder.xml");
+
+ Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue");
deployTopLevelService(appCtx, transport, uimaAsClient, relativePath + "/Deploy_AggregateAnnotator.xml","TopLevelTaeQueue");
// deployJavaService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
diff --git a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
index 40ba9a6..f8c1d4b 100644
--- a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
+++ b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
@@ -659,6 +659,7 @@ public abstract class BaseTestSupport extends ActiveMQSupport
}
// Send CPC
+ System.out.println("............. Sending CPC");
uimaAsClient.collectionProcessingComplete();
}
}
diff --git a/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregate.xml b/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregate.xml
new file mode 100644
index 0000000..3d445eb
--- /dev/null
+++ b/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregate.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+
+<analysisEngineDeploymentDescription xmlns="http://uima.apache.org/resourceSpecifier">
+
+ <name>Top Level TAE</name>
+ <description></description>
+
+ <deployment protocol="${Protocol}" provider="${Provider}">
+ <casPool numberOfCASes="5" initialFsHeapSize="500"/>
+ <service>
+ <inputQueue endpoint="TopLevelTaeQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
+ <topDescriptor>
+ <!--import location="../descriptors/analysis_engine/SimpleTestAggregate.xml"/-->
+ <import location="../descriptors/analysis_engine/ComplexNestedAggregate_TAE.xml"/>
+ </topDescriptor>
+ <analysisEngine async="true">
+ <!--delegates>
+
+ <analysisEngine key="TestMultiplier">
+ <casMultiplier poolSize="5"/>
+ </analysisEngine>
+
+ </delegates-->
+ </analysisEngine>
+ </service>
+ </deployment>
+
+</analysisEngineDeploymentDescription>
\ No newline at end of file
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/Lifecycle.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/Lifecycle.java
new file mode 100644
index 0000000..83850da
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/Lifecycle.java
@@ -0,0 +1,6 @@
+package org.apache.uima.aae;
+
+public interface Lifecycle {
+ public void start() throws Exception;
+ public void stop() throws Exception;
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
index f53f10b..036da9c 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
@@ -27,7 +27,7 @@ import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
-import org.apache.uima.as.client.DirectListener.DirectListenerCallback;
+import org.apache.uima.aae.definition.connectors.ListenerCallback;
import org.apache.uima.util.Level;
/**
@@ -60,7 +60,7 @@ public class UimaAsThreadFactory implements ThreadFactory {
private CountDownLatch latchToCountNumberOfInitedThreads;
- private DirectListenerCallback callback = null;
+ private ListenerCallback callback = null;
public UimaAsThreadFactory() {
@@ -79,7 +79,7 @@ public class UimaAsThreadFactory implements ThreadFactory {
this.latchToCountNumberOfInitedThreads = latchToCountNumberOfInitedThreads;
}
- public UimaAsThreadFactory withCallback(DirectListenerCallback c) {
+ public UimaAsThreadFactory withCallback(ListenerCallback c) {
callback = c;
return this;
}
@@ -163,6 +163,7 @@ public class UimaAsThreadFactory implements ThreadFactory {
// TaskExecutor is terminated.
r.run();
} catch (Throwable e) {
+ e.printStackTrace();
if ( !(e instanceof Exception) ) {
// try to log. If this is OOM, logging may not succeed and we
// get another OOM.
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AggregateAnalysisEngineComponent.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AggregateAnalysisEngineComponent.java
new file mode 100644
index 0000000..28c59b8
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AggregateAnalysisEngineComponent.java
@@ -0,0 +1,23 @@
+package org.apache.uima.aae.component;
+
+import org.apache.uima.aae.definition.connectors.basic.BasicConnector;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public class AggregateAnalysisEngineComponent extends AnalysisEngineComponent {
+
+ public AggregateAnalysisEngineComponent(String key, ResourceSpecifier rs) {
+ super(key, rs);
+ }
+
+ @Override
+ public boolean isPrimitive() {
+ return false;
+ }
+
+ @Override
+ public Object getConnector() {
+ return new BasicConnector();
+ }
+
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AnalysisEngineComponent.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AnalysisEngineComponent.java
new file mode 100644
index 0000000..6049cb9
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AnalysisEngineComponent.java
@@ -0,0 +1,147 @@
+package org.apache.uima.aae.component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.uima.aae.controller.DelegateEndpoint;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.DelegateEndpoint.Builder;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public abstract class AnalysisEngineComponent {
+
+ protected List<AnalysisEngineComponent> delegateList = new ArrayList<>();
+ private boolean isCasMultiplier = false;
+ private boolean scaleable = false;
+ private boolean async = false;
+ private String componentKey;
+ private int scaleout = 1;
+ private ResourceSpecifier resourceSpecifier;
+ private CasMultiplierNature casMultiplier;
+ private int requestThreadPoolSize=1;
+ private int responseThreadPoolSize=1;
+ private Endpoint endpoint = null;
+
+
+ public abstract Object getConnector();
+
+ public AnalysisEngineComponent() {}
+ public AnalysisEngineComponent(String key, ResourceSpecifier rs) {
+ componentKey = key;
+ resourceSpecifier = rs;
+ }
+
+
+ public CasMultiplierNature getCasMultiplierNature() {
+ return casMultiplier;
+ }
+ public ResourceSpecifier getResourceSpecifier() {
+ return resourceSpecifier;
+ }
+
+ public boolean isScaleable() {
+ return scaleable;
+ }
+
+ public int getScaleout() {
+ return scaleout;
+ }
+
+ public boolean isCasMultiplier() {
+ return isCasMultiplier;
+ }
+ public boolean isAsync() {
+ return async;
+ }
+ public boolean isCasConsumer() {
+ return false;
+ }
+
+ public boolean isPrimitive() {
+ return true;
+ }
+
+ public boolean isRemote() {
+ return false;
+ }
+ public String getKey() {
+ return componentKey;
+
+ }
+ public Endpoint getEndpoint() {
+ if ( endpoint == null ) {
+ String serviceEndpoint = getKey();
+ String server = "java";
+ if ( this instanceof RemoteAnalysisEngineComponent ) {
+ serviceEndpoint = ((RemoteAnalysisEngineComponent)this).getEndpointName();
+ server = ((RemoteAnalysisEngineComponent)this).getServer();
+ }
+ endpoint = new DelegateEndpoint(). new Builder().
+ withDelegateKey(getKey()).
+ withEndpointName(serviceEndpoint).
+ setRemote(isRemote()).
+ setServerURI(server).
+ withResourceSpecifier(getResourceSpecifier()).
+ build();
+ if ( isCasMultiplier ) {
+ endpoint.setIsCasMultiplier(true);
+ endpoint.setProcessParentLast(casMultiplier.processParentLast());
+ if ( casMultiplier.getPoolSize() > 1) {
+ endpoint.setShadowCasPoolSize(casMultiplier.getPoolSize());
+ }
+ if ( casMultiplier.getInitialFsHeapSize() > 0 ) {
+ endpoint.setInitialFsHeapSize( (int) casMultiplier.getInitialFsHeapSize());
+ }
+ endpoint.setDisableJCasCache(casMultiplier.disableJCasCache());
+ }
+ if ( isRemote()) {
+ endpoint.setMetadataRequestTimeout(((RemoteAnalysisEngineComponent)this).getMetaTimeout());
+ endpoint.setProcessRequestTimeout(((RemoteAnalysisEngineComponent)this).getProcessTimeout());
+ endpoint.setCollectionProcessCompleteTimeout(((RemoteAnalysisEngineComponent)this).getCollectionProcessCompleteTimeout());
+ }
+ }
+
+ return endpoint;
+ }
+ public void add(AnalysisEngineComponent component) {
+ delegateList.add(component);
+ }
+ public AnalysisEngineComponent getChild(int index) {
+ return delegateList.get(index);
+ }
+ public List<AnalysisEngineComponent> getChildren() {
+ return delegateList;
+ }
+ public AnalysisEngineComponent enableScaleout() {
+ scaleable = true;
+ return this;
+ }
+ public AnalysisEngineComponent withScaleout(int howManyInstances) {
+ scaleout = howManyInstances;
+ return this;
+ }
+
+ public AnalysisEngineComponent withRequestThreadPoolSize(int howManyThreads) {
+ requestThreadPoolSize = howManyThreads;
+ return this;
+ }
+
+ public AnalysisEngineComponent withReplyThreadPoolSize(int howManyThreads) {
+ responseThreadPoolSize = howManyThreads;
+ return this;
+ }
+
+ public AnalysisEngineComponent enableCasMultiplierNatureWith(CasMultiplierNature cm) {
+ this.casMultiplier = cm;
+ return this;
+ }
+ public AnalysisEngineComponent enableCasMultipler() {
+ isCasMultiplier = true;
+ return this;
+ }
+ public AnalysisEngineComponent enableAsync() {
+ async = true;
+ return this;
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierComponent.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierComponent.java
new file mode 100644
index 0000000..70ad9e6
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierComponent.java
@@ -0,0 +1,35 @@
+package org.apache.uima.aae.component;
+
+public class CasMultiplierComponent implements CasMultiplierNature {
+ boolean disableJCasCache;
+ long initialFsHeapSize;
+ int casPoolSize;
+ boolean processParentLast;
+
+ public CasMultiplierComponent(boolean disableJCasCache,long initialFsHeapSize,int casPoolSize,boolean processParentLast ) {
+ this.disableJCasCache = disableJCasCache;
+ this.initialFsHeapSize = initialFsHeapSize;
+ this.casPoolSize = casPoolSize;
+ this.processParentLast = processParentLast;
+ }
+ @Override
+ public boolean disableJCasCache() {
+ return false;
+ }
+
+ @Override
+ public long getInitialFsHeapSize() {
+ return 0;
+ }
+
+ @Override
+ public int getPoolSize() {
+ return 0;
+ }
+
+ @Override
+ public boolean processParentLast() {
+ return false;
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierNature.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierNature.java
new file mode 100644
index 0000000..f562078
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierNature.java
@@ -0,0 +1,8 @@
+package org.apache.uima.aae.component;
+
+public interface CasMultiplierNature {
+ boolean disableJCasCache();
+ long getInitialFsHeapSize();
+ int getPoolSize();
+ boolean processParentLast();
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ComponentCasPool.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ComponentCasPool.java
new file mode 100644
index 0000000..81b4dbe
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ComponentCasPool.java
@@ -0,0 +1,22 @@
+package org.apache.uima.aae.component;
+
+public class ComponentCasPool {
+ boolean disableJCasCache;
+ int initialHeapSize=1000;
+ int poolSize=1;
+ public ComponentCasPool(boolean disableJCasCache, int initialHeapSize, int poolSize) {
+ this.disableJCasCache = disableJCasCache;
+ this.initialHeapSize = initialHeapSize;
+ this.poolSize = poolSize;
+ }
+ public boolean isDisableJCasCache() {
+ return disableJCasCache;
+ }
+ public int getInitialHeapSize() {
+ return initialHeapSize;
+ }
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/PrimitiveAnalysisEngineComponent.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/PrimitiveAnalysisEngineComponent.java
new file mode 100644
index 0000000..55e4daa
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/PrimitiveAnalysisEngineComponent.java
@@ -0,0 +1,42 @@
+package org.apache.uima.aae.component;
+
+import org.apache.uima.aae.definition.connectors.basic.BasicConnector;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public class PrimitiveAnalysisEngineComponent extends AnalysisEngineComponent {
+
+ public PrimitiveAnalysisEngineComponent(String key, ResourceSpecifier rs) {
+ super(key, rs);
+ }
+ /*
+ @Override
+ public boolean isScaleable() {
+ return false;
+ }
+
+ @Override
+ public boolean isCasMultiplier() {
+ return false;
+ }
+
+ @Override
+ public boolean isCasConsumer() {
+ return false;
+ }
+*/
+ @Override
+ public boolean isPrimitive() {
+ return true;
+ }
+/*
+ @Override
+ public boolean isRemote() {
+ return false;
+ }
+ */
+ @Override
+ public Object getConnector() {
+ return new BasicConnector();
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/RemoteAnalysisEngineComponent.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/RemoteAnalysisEngineComponent.java
new file mode 100644
index 0000000..5f37d35
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/RemoteAnalysisEngineComponent.java
@@ -0,0 +1,74 @@
+package org.apache.uima.aae.component;
+
+import java.util.Objects;
+
+import org.apache.uima.resourceSpecifier.AsyncAggregateErrorConfigurationType;
+import org.apache.uima.resourceSpecifier.InputQueueType;
+import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.SerializerType;
+
+public class RemoteAnalysisEngineComponent extends AnalysisEngineComponent {
+
+ private Object connector;
+ private AnalysisEngineComponent decoratedComponent;
+ private SerializerType serializer;
+ private int replyChannelScaleout;
+ private InputQueueType remoteEndpoint;
+ private AsyncAggregateErrorConfigurationType errorConfiguration;
+
+ public RemoteAnalysisEngineComponent(AnalysisEngineComponent component, RemoteAnalysisEngineType remoteDelegate) {
+ super(component.getKey(), null);
+ decoratedComponent = component;
+ errorConfiguration =
+ remoteDelegate.getAsyncAggregateErrorConfiguration();
+ remoteEndpoint =
+ remoteDelegate.getInputQueue();
+ replyChannelScaleout = remoteDelegate.getRemoteReplyQueueScaleout();
+ serializer =
+ remoteDelegate.getSerializer();
+ }
+
+ @Override
+ public boolean isRemote() {
+ return true;
+ }
+ public String getServer() {
+ return remoteEndpoint.getBrokerURL();
+ }
+
+ public String getEndpointName() {
+ return remoteEndpoint.getEndpoint();
+ }
+ public int getPrefetch() {
+ return remoteEndpoint.getPrefetch();
+ }
+
+ @Override
+ public Object getConnector() {
+ return connector;
+ }
+
+ public int getProcessTimeout() {
+ return errorConfiguration.getProcessCasErrors().getTimeout();
+ }
+ public int getMetaTimeout() {
+ return errorConfiguration.getGetMetadataErrors().getTimeout();
+ }
+ public int getCollectionProcessCompleteTimeout() {
+ return errorConfiguration.getCollectionProcessCompleteErrors().getTimeout();
+ }
+ public String getSupportedSerialization() {
+ if ( Objects.isNull( serializer ) ) {
+ return "xmi";
+ }
+ return serializer.getStringValue().trim();
+ }
+ public int getReplyConsumerCount() {
+ return replyChannelScaleout;
+ }
+ public RemoteAnalysisEngineComponent withConnector(Object connector) {
+ this.connector = connector;
+ return this;
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TestGenerator.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TestGenerator.java
new file mode 100644
index 0000000..92f77d5
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TestGenerator.java
@@ -0,0 +1,133 @@
+package org.apache.uima.aae.component;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.service.delegate.AggregateAnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AnalysisEngineType;
+import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+
+public class TestGenerator {
+
+ public AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
+ return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath));
+
+ }
+ private boolean isAggregate(AnalysisEngineType aet) {
+ return ("true".equals(aet.getAsync()) || aet.isSetAsync() || aet.isSetDelegates());
+ }
+ private boolean isAggregate( ResourceSpecifier resourceSpecifier) {
+ boolean aggregate = false;
+ if (resourceSpecifier instanceof AnalysisEngineDescription ) {
+ AnalysisEngineDescription aeDescriptor =
+ (AnalysisEngineDescription) resourceSpecifier;
+
+ if ( !aeDescriptor.isPrimitive() ) {
+ aggregate = true;
+ }
+ // if ( d != null ) {
+// if ((d instanceof AggregateAnalysisEngineDelegate) ||
+// (d.isAsync() && !d.isPrimitive()) ) {
+// aggregate = true;
+// }
+// } else if ( !aeDescriptor.isPrimitive() ) {
+// aggregate = true;
+// }
+ }
+ return aggregate;
+ }
+
+ private AnalysisEngineType findMatchInDD(String key) throws Exception {
+
+ return null;
+ }
+ private AnalysisEngineDescription getAeDescription(ResourceSpecifier rs) {
+ return (AnalysisEngineDescription) rs;
+ }
+ public AnalysisEngineComponent parse(ResourceSpecifier rs, String key) throws Exception {
+ AnalysisEngineDescription aeDescriptor = getAeDescription(rs);
+ AnalysisEngineComponent component = null;
+
+ //AnalysisEngineType aet = findMatchInDD(rs.)
+ if ( isAggregate(rs) ) {
+ component =
+ new AggregateAnalysisEngineComponent(key, rs);
+
+ Map<String, ResourceSpecifier> delegates =
+ aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+
+ for(Entry<String, ResourceSpecifier> delegateEntry: delegates.entrySet() ) {
+ component.add(parse(delegateEntry.getValue(), delegateEntry.getKey() ));
+ }
+
+
+
+/*
+
+
+
+
+
+ // The DD object maintains two arrays, one for co-located delegates and the other for remotes.
+ // First handle co-located delegates.
+ if ( aet.getDelegates().getAnalysisEngineArray().length > 0 ) {
+ DelegateAnalysisEngineType[] localAnalysisEngineArray =
+ aet.getDelegates().getAnalysisEngineArray();
+
+ // Add default error handling to each co-located delegate
+ for( DelegateAnalysisEngineType delegate : localAnalysisEngineArray ) {
+ String key = delegate.getKey();
+ // recursively iterate over delegates until no more aggregates found
+ aggregate.add(walk(delegate));
+ }
+
+
+
+ addColocatedDelegates(localAnalysisEngineArray,(AggregateAnalysisEngineDelegate)delegate);
+ }
+ // Next add remote delegates of this aggregate
+ if ( hasRemoteDelegates(aet) ) {
+ RemoteAnalysisEngineType[] remoteAnalysisEngineArray =
+ aet.getDelegates().getRemoteAnalysisEngineArray();
+ addRemoteDelegates(remoteAnalysisEngineArray, (AggregateAnalysisEngineDelegate)delegate);
+ }
+*/
+ } else {
+ component = new PrimitiveAnalysisEngineComponent(key, rs);
+ }
+
+ if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed() ) {
+ component.enableCasMultipler();
+ }
+ if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() ) {
+ component.enableScaleout();
+ }
+
+ return component;
+ }
+ public static void main(String[] args) {
+ try {
+
+ TestGenerator generator = new TestGenerator();
+ AnalysisEngineDeploymentDescriptionDocument dd =
+ generator.parseDD(args[0]);
+ ServiceType service =
+ dd.getAnalysisEngineDeploymentDescription().getDeployment().getService();
+ ResourceSpecifier resourceSpecifier =
+ UimaClassFactory.produceResourceSpecifier(service.getTopDescriptor().getImport().getLocation());
+ generator.walk(resourceSpecifier, null); // null= top level
+
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TopLevelServiceComponent.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TopLevelServiceComponent.java
new file mode 100644
index 0000000..459bc89
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TopLevelServiceComponent.java
@@ -0,0 +1,248 @@
+package org.apache.uima.aae.component;
+
+import java.io.InvalidObjectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.uima.aae.controller.DelegateEndpoint;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
+import org.apache.uima.resourceSpecifier.CasMultiplierType;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.DeploymentType;
+import org.apache.uima.resourceSpecifier.EnvironmentVariableType;
+import org.apache.uima.resourceSpecifier.EnvironmentVariablesType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.apache.uima.resourceSpecifier.TopLevelAnalysisEngineType;
+
+public class TopLevelServiceComponent extends AnalysisEngineComponent{
+ private String name;
+ private String description;
+ private String version;
+ private String vendor;
+
+ private String protocol;
+ private String provider;
+
+ private int poolSize=1;
+ private boolean processParentLast;
+ private boolean disableJCasCache;
+ private int initialHeapSize=500;
+
+ private Endpoint endpoint;
+
+ private List<EnvironmentVariable> envVariables = new ArrayList<>();
+ private ComponentCasPool casPool = null;
+
+ private AnalysisEngineComponent decoratedComponent;
+
+ public TopLevelServiceComponent(AnalysisEngineComponent decorated, AnalysisEngineDeploymentDescriptionDocument dd) {
+ super(decorated.getKey(),decorated.getResourceSpecifier());
+ this.decoratedComponent = decorated;
+ DeploymentType deployment =
+ dd.getAnalysisEngineDeploymentDescription().getDeployment();
+
+ ServiceType service =
+ dd.getAnalysisEngineDeploymentDescription().getDeployment().getService();
+
+ if ( Objects.nonNull(service.getEnvironmentVariables()) ) {
+ configureEnvironmentVariables(service.getEnvironmentVariables());
+ }
+
+ if ( Objects.nonNull(deployment.getCasPool()) ) {
+ configureCasPool(deployment.getCasPool());
+ }
+
+ if ( Objects.nonNull(service.getAnalysisEngine())&&
+ Objects.nonNull(service.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration()) ) {
+ AsyncPrimitiveErrorConfigurationType topLevelErrorConfiguration =
+ service.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration();
+ configureErrorHandling(topLevelErrorConfiguration);
+ }
+ withName(dd.getAnalysisEngineDeploymentDescription().getName()).
+ withDescription(dd.getAnalysisEngineDeploymentDescription().getDescription()).
+ withVersion(dd.getAnalysisEngineDeploymentDescription().getVersion()).
+ withVendor(dd.getAnalysisEngineDeploymentDescription().getVendor()).
+ withProtocol(deployment.getProtocol()).
+ withProvider(deployment.getProvider());
+
+ String brokerURL = service.getInputQueue().getBrokerURL();
+ String queueName = service.getInputQueue().getEndpoint();
+ int prefetch = service.getInputQueue().getPrefetch();
+ configureEndpoint( queueName, brokerURL, prefetch);
+ if ( Objects.nonNull(service.getAnalysisEngine()) ) {
+ configureAnalysisEngine(service.getAnalysisEngine());
+
+ if ( Objects.nonNull(service.getAnalysisEngine().getCasMultiplier()) ) {
+ configureCasMultiplier(service.getAnalysisEngine().getCasMultiplier());
+ }
+ }
+ }
+ @Override
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ private void configureEndpoint(String name, String server, int prefetch) {
+ endpoint = new DelegateEndpoint().new Builder().withDelegateKey(getKey()).withEndpointName(name)
+ .setRemote(isRemote()).setServerURI(server).withResourceSpecifier(getResourceSpecifier()).build();
+ if (isCasMultiplier()) {
+ endpoint.setIsCasMultiplier(true);
+ endpoint.setProcessParentLast(processParentLast);
+ if (poolSize > 1) {
+ endpoint.setShadowCasPoolSize(poolSize);
+ }
+ if (initialHeapSize > 0) {
+ endpoint.setInitialFsHeapSize(initialHeapSize);
+ }
+ endpoint.setDisableJCasCache(disableJCasCache);
+ }
+ }
+ private void configureEnvironmentVariables(EnvironmentVariablesType evt) {
+ for( EnvironmentVariableType ev : evt.getEnvironmentVariableArray() ) {
+ EnvironmentVariable envVariable = new EnvironmentVariable(ev.getName(), ev.getStringValue());
+ envVariables.add(envVariable);
+ }
+ }
+ private int convertStringToint(String value, int defaultValue) {
+ try {
+ return Integer.valueOf(value);
+ } catch( Exception e ) {
+ return defaultValue;
+ }
+ }
+ private void configureErrorHandling(AsyncPrimitiveErrorConfigurationType topLevelErrorConfiguration) {
+
+ }
+ private void configureAnalysisEngine(TopLevelAnalysisEngineType tlae) {
+ int replyQueueScaleout = convertStringToint(tlae.getInternalReplyQueueScaleout(),1);
+ int inputQueueScaleout = convertStringToint(tlae.getInputQueueScaleout(),1);
+ boolean async = false; // default
+ int scaleout = Objects.nonNull(tlae.getScaleout())? tlae.getScaleout().getNumberOfInstances() : 1;
+ if ( Objects.nonNull(tlae.getAsync()) ) {
+ async = Boolean.parseBoolean(tlae.getAsync());
+ }
+
+ decoratedComponent.withScaleout(scaleout)
+ .withReplyThreadPoolSize(replyQueueScaleout)
+ .withRequestThreadPoolSize(inputQueueScaleout);
+ // Component is async iff 'async=true' or dd has delegates
+ if ( !decoratedComponent.isPrimitive() && (async || Objects.nonNull(tlae.getDelegates())) ) {
+ decoratedComponent.enableAsync();
+ }
+
+// String async = tlae.getAsync(); // true or false
+// String replyQueueScaleout = tlae.getInternalReplyQueueScaleout();
+// String key = tlae.getKey();
+// int scaleout = tlae.getScaleout().getNumberOfInstances();
+ }
+ public AggregateAnalysisEngineComponent aggregateComponent() throws InvalidObjectException{
+ if ( !isAggregate() ) {
+ throw new InvalidObjectException("This component is not an aggregate");
+ }
+ return (AggregateAnalysisEngineComponent)decoratedComponent;
+ }
+ public boolean isAggregate() {
+ return decoratedComponent instanceof AggregateAnalysisEngineComponent;
+ }
+ @Override
+ public boolean isPrimitive() {
+ return !isAggregate();
+ }
+ private void configureCasPool(CasPoolType casPoolType) {
+ boolean disableJCasCache = casPoolType.getDisableJCasCache();
+ int initialHeapSize = casPoolType.getInitialFsHeapSize();
+ int poolSize = casPoolType.getNumberOfCASes();
+
+ casPool = new ComponentCasPool(disableJCasCache, initialHeapSize, poolSize);
+
+ }
+ private void configureCasMultiplier(CasMultiplierType casMultiplierType ) {
+ poolSize = casMultiplierType.getPoolSize();
+ disableJCasCache = casMultiplierType.getDisableJCasCache();
+ initialHeapSize = Integer.parseInt(casMultiplierType.getInitialFsHeapSize());
+ processParentLast = Boolean.parseBoolean(casMultiplierType.getProcessParentLast()); // true or false
+ }
+ public void addEnvVariable(String name, String value) {
+ envVariables.add(new EnvironmentVariable(name, value));
+ }
+ public TopLevelServiceComponent withName(String name) {
+ this.name = name;
+ return this;
+ }
+ public TopLevelServiceComponent withDescription(String description) {
+ this.description = description;
+ return this;
+ }
+ public TopLevelServiceComponent withVersion(String version) {
+ this.version = version;
+ return this;
+ }
+ public TopLevelServiceComponent withVendor(String vendor) {
+ this.vendor = vendor;
+ return this;
+ }
+ public TopLevelServiceComponent withProtocol(String protocol) {
+ this.protocol = protocol;
+ return this;
+ }
+ public TopLevelServiceComponent withProvider(String provider) {
+ this.provider = provider;
+ return this;
+ }
+
+
+ public ComponentCasPool getComponentCasPool() {
+ if ( Objects.isNull(casPool)) {
+ return new ComponentCasPool(false, 1000, 1);
+ } else {
+ return casPool;
+ }
+
+ }
+
+ public String getName() {
+ return name;
+ }
+ public String getDescription() {
+ return description;
+ }
+ public String getVersion() {
+ return version;
+ }
+ public String getVendor() {
+ return vendor;
+ }
+ public String getProtocol() {
+ return protocol;
+ }
+ public String getProvider() {
+ return provider;
+ }
+
+ @Override
+ public Object getConnector() {
+ return null;
+ }
+
+ public class EnvironmentVariable {
+ String name;
+ String value;
+
+ public EnvironmentVariable( String name, String value ) {
+ this.name = name;
+ this.value = value;
+ }
+ public String getName() {
+ return name;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+
+ }
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java
new file mode 100644
index 0000000..7013079
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java
@@ -0,0 +1,267 @@
+package org.apache.uima.aae.component.dd;
+
+import java.io.File;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.uima.aae.UimaASUtils;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.CasMultiplierComponent;
+import org.apache.uima.aae.component.CasMultiplierNature;
+import org.apache.uima.aae.component.RemoteAnalysisEngineComponent;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
+import org.apache.uima.aae.component.factory.AnalysisEngineComponentFactory;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AnalysisEngineType;
+import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.apache.xmlbeans.XmlDocumentProperties;
+
+public class DeploymentDescriptorProcessor {
+
+ private AnalysisEngineDeploymentDescriptionDocument dd = null;
+ public DeploymentDescriptorProcessor() {
+
+ }
+ public DeploymentDescriptorProcessor(AnalysisEngineDeploymentDescriptionDocument dd) {
+ this.dd = dd;
+ }
+ public AnalysisEngineComponent newComponent(String descriptorPath) throws Exception {
+ this.dd = parseDD(descriptorPath);
+ return newComponent();
+ }
+
+ public TopLevelServiceComponent newComponent() throws Exception {
+ ServiceType service =
+ dd.getAnalysisEngineDeploymentDescription().getDeployment().getService();
+ XmlDocumentProperties dp = dd.documentProperties();
+ System.out.println(dp.getSourceName());
+
+ // get absolute path to resource specifier
+ String aeDescriptor =
+ UimaASUtils.fixPath(dp.getSourceName(), getDescriptor(service));
+
+ // Get top level uima resource specifier
+ ResourceSpecifier resourceSpecifier =
+ UimaClassFactory.produceResourceSpecifier(aeDescriptor);
+
+ AnalysisEngineComponentFactory componentFactory =
+ new AnalysisEngineComponentFactory();
+ // Process top level AE resource specifier and all its delegates.
+ // For aggregates, recursively walk through a delegate tree, producing
+ // a tree of AnalysisEngineComponent instances, one for every delegate.
+
+ AnalysisEngineComponent aeComponent =
+ componentFactory.produce(resourceSpecifier, null);
+
+ // Decorate above with top level component functionality
+ TopLevelServiceComponent topLevelComponent =
+ new TopLevelServiceComponent(aeComponent, dd);
+
+// if ( aeComponent.isPrimitive()) {
+// // The AE descriptor is for a primitive AE
+// } else {
+ // the AE descriptor is for an aggregate AE. Check DD to
+ // see if its an async aggregate. Its async=true or
+ // has delegates.
+// if ( isAggregate(service.getAnalysisEngine()) ) {
+ if ( topLevelComponent.isAggregate()) {
+ // All delegates will be colocated unless
+ // a delegate is remote. That is determined
+ // below.
+// aeComponent.enableAsync();
+ //if ( dd.getAnalysisEngineDeploymentDescription().getDeployment().getProtocol()
+ DelegateAnalysisEngineType[] colocatedDelegates = null;
+ if ( Objects.nonNull(service.getAnalysisEngine()) &&
+ Objects.nonNull(service.getAnalysisEngine().getDelegates()) ) {
+ colocatedDelegates = service.getAnalysisEngine().
+ getDelegates().
+ getAnalysisEngineArray();
+
+ }
+ handleColocatedDelegates(colocatedDelegates, aeComponent.getChildren());
+
+ RemoteAnalysisEngineType[] remoteDelegates = null;
+ if ( Objects.nonNull(service.getAnalysisEngine()) &&
+ Objects.nonNull(service.getAnalysisEngine().getDelegates())) {
+ remoteDelegates = service.getAnalysisEngine().
+ getDelegates().
+ getRemoteAnalysisEngineArray();
+ }
+ handleRemoteDelegates(remoteDelegates, aeComponent.getChildren());
+
+// service.getAnalysisEngine().
+// getDelegates().
+ //}
+ }
+
+
+ return topLevelComponent;
+ }
+ public AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
+ return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath));
+
+ }
+ private boolean isAggregate(AnalysisEngineType aet) {
+ return ("true".equals(aet.getAsync()) || aet.isSetAsync() || aet.isSetDelegates());
+ }
+ private String getDescriptor(ServiceType service) {
+ String aeDescriptor = service.getTopDescriptor().getImport().getLocation();
+ if ( aeDescriptor == null ) {
+ aeDescriptor = service.getTopDescriptor().getImport().getName();
+ }
+ return aeDescriptor;
+ }
+
+ private void markAllDelegatesAsAsync(List<AnalysisEngineComponent> resourceSpecifierDelegates) {
+ for ( AnalysisEngineComponent aec : resourceSpecifierDelegates ) {
+ if ( !aec.isPrimitive() ) {
+ handleColocatedDelegates(null, aec.getChildren());
+ }
+ aec.enableAsync();
+ }
+
+ }
+ private void handleColocatedDelegates(DelegateAnalysisEngineType[] ddDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) {
+ if ( Objects.isNull(ddDelegates)) {
+ // the dd does not include delegates but is configured as an asynch service
+ // so process resource specifiers recursively marking each part of a pipeline
+ // as asynch so that it is deployed as a collocated asynch service.
+ handleDefaultColocatedDelegates(resourceSpecifierDelegates);
+ //markAllDelegatesAsAsync(resourceSpecifierDelegates);
+ return;
+ }
+ // go through all delegates defined in the deployment descriptor (dd)
+ for( DelegateAnalysisEngineType ddDelegate : ddDelegates ) {
+ // find a matching delegate in the AE resource specifier
+ for( AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates ) {
+ if ( ddDelegate.getKey().equals(resourceSpecifierDelegate.getKey())) {
+ if ( resourceSpecifierDelegate.isCasMultiplier() && Objects.nonNull(ddDelegate.getCasMultiplier())) {
+ // plugin cas multiplier settings from dd
+ CasMultiplierNature casMultiplier =
+ new CasMultiplierComponent(ddDelegate.getCasMultiplier().getDisableJCasCache(),
+ TypeConverter.convertStringToLong(ddDelegate.getCasMultiplier().getInitialFsHeapSize(), 1000),
+ ddDelegate.getCasMultiplier().getPoolSize(),
+ TypeConverter.convertStringToBoolean(ddDelegate.getCasMultiplier().getProcessParentLast(),true) );
+ resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier);
+
+ }
+ resourceSpecifierDelegate.enableAsync(); // delegate is async
+
+ resourceSpecifierDelegate.
+ withScaleout(Objects.isNull(ddDelegate.getScaleout()) ? 1 :ddDelegate.getScaleout().getNumberOfInstances()).
+ withRequestThreadPoolSize( TypeConverter.convertStringToInt(ddDelegate.getInputQueueScaleout(), 1)).
+ withReplyThreadPoolSize( TypeConverter.convertStringToInt(ddDelegate.getInternalReplyQueueScaleout(),1));
+
+ if ( isAggregate(ddDelegate) ) {
+
+ resourceSpecifierDelegate.enableAsync();
+ for ( AnalysisEngineComponent aec : resourceSpecifierDelegate.getChildren() ) {
+ aec.enableAsync();
+ }
+ if ( ddDelegate.getDelegates() != null ) {
+ // recursively process collocated delegates
+ handleColocatedDelegates(ddDelegate.getDelegates().getAnalysisEngineArray() , resourceSpecifierDelegate.getChildren());
+ handleRemoteDelegates(ddDelegate.getDelegates().getRemoteAnalysisEngineArray(), resourceSpecifierDelegate.getChildren());
+ }
+
+ }
+ break; // found a match and completed processing it. We are done with it.
+ }
+ }
+ }
+ }
+
+ private void handleDefaultColocatedDelegates(List<AnalysisEngineComponent> resourceSpecifierDelegates) {
+ // find a matching delegate in the AE resource specifier
+ for (AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates) {
+ if (resourceSpecifierDelegate.isCasMultiplier()) {
+ // plugin cas multiplier settings from dd
+ CasMultiplierNature casMultiplier = new CasMultiplierComponent(false, 1000, 1, true);
+ resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier);
+ }
+ resourceSpecifierDelegate.withScaleout(1).
+ withRequestThreadPoolSize(1).
+ withReplyThreadPoolSize(1).
+ enableAsync();
+
+ if (!resourceSpecifierDelegate.isPrimitive()) {
+ handleDefaultColocatedDelegates(resourceSpecifierDelegate.getChildren());
+ }
+ }
+ }
+ private void handleRemoteDelegates(RemoteAnalysisEngineType[] remoteDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) {
+ if ( Objects.isNull(remoteDelegates) ) {
+ return;
+ }
+ for( RemoteAnalysisEngineType remoteDelegateType : remoteDelegates ) {
+ // find a matching delegate in the AE resource specifier
+ for( AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates ) {
+ if ( remoteDelegateType.getKey().equals(resourceSpecifierDelegate.getKey())) {
+ // find an index of the current component in the list. We
+ // will decorate this component as a remote, and replace
+ // it in the list.
+ int index =
+ resourceSpecifierDelegates.indexOf(resourceSpecifierDelegate);
+ // Decorate existing component with remote flavor
+ RemoteAnalysisEngineComponent remoteDelegate =
+ new RemoteAnalysisEngineComponent(resourceSpecifierDelegate, remoteDelegateType);
+
+ //replace component with decorated remote
+ resourceSpecifierDelegates.set(index, remoteDelegate);
+ }
+
+ }
+
+ }
+ }
+/*
+ public void parse(DelegateAnalysisEngineType colocatedDelegate, AnalysisEngineComponent component) {
+ if ( isAggregate(colocatedDelegate) ) {
+ DelegateAnalysisEngineType[] colocatedDelegates =
+ colocatedDelegate.getDelegates().getAnalysisEngineArray();
+ handleColocatedDelegates(colocatedDelegates, component.getChildren());
+ } else {
+
+ }
+ }
+*/
+ public static void main(String[] args) {
+ try {
+ DeploymentDescriptorProcessor ddp =
+ new DeploymentDescriptorProcessor();
+ ddp.newComponent(args[0]);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ private static class TypeConverter {
+ private static int convertStringToInt(String value, int defaultValue) {
+ int returnValue = defaultValue;
+ try {
+ returnValue = Integer.parseInt(value);
+ } catch( Exception e) {
+ }
+ return returnValue;
+ }
+ private static boolean convertStringToBoolean(String value, boolean defaultValue) {
+ boolean returnValue = defaultValue;
+ try {
+ returnValue = Boolean.parseBoolean(value);
+ } catch( Exception e) {
+ }
+ return returnValue;
+ }
+ private static long convertStringToLong(String value, long defaultValue) {
+ long returnValue = defaultValue;
+ try {
+ returnValue = Long.parseLong(value);
+ } catch( Exception e) {
+ }
+ return returnValue;
+ }
+ }
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java
new file mode 100644
index 0000000..ba2a69e
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java
@@ -0,0 +1,55 @@
+package org.apache.uima.aae.component.factory;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.component.AggregateAnalysisEngineComponent;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.PrimitiveAnalysisEngineComponent;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public class AnalysisEngineComponentFactory {
+
+ private AnalysisEngineDescription getAeDescription(ResourceSpecifier rs) {
+ return (AnalysisEngineDescription) rs;
+ }
+
+ public AnalysisEngineComponent produce(ResourceSpecifier rs, String key) throws Exception {
+ AnalysisEngineDescription aeDescriptor = getAeDescription(rs);
+ AnalysisEngineComponent component = null;
+
+ if ( aeDescriptor.isPrimitive() ) {
+ component = new PrimitiveAnalysisEngineComponent(key, rs);
+ } else {
+ component =
+ new AggregateAnalysisEngineComponent(key, rs);
+ Map<String, ResourceSpecifier> delegates =
+ aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+ for(Entry<String, ResourceSpecifier> delegateEntry: delegates.entrySet() ) {
+ component.add(produce(delegateEntry.getValue(), delegateEntry.getKey() ));
+ }
+ }
+
+ if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed() ) {
+ component.enableScaleout();
+ }
+ if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() ) {
+ component.enableCasMultipler();
+ }
+
+ return component;
+ }
+ public static void main(String[] args ) {
+ try {
+ AnalysisEngineComponentFactory factory =
+ new AnalysisEngineComponentFactory();
+ ResourceSpecifier resourceSpecifier =
+ UimaClassFactory.produceResourceSpecifier(args[0]);
+ factory.produce(resourceSpecifier, "TopLevel");
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
index c241308..941cef1 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
@@ -2028,25 +2028,54 @@ implements
return retValue;
}
- private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) {
-
- // Get the key of the Cas Producer
- String casProducer = cacheEntry.getCasProducerAggregateName();
- // CAS is considered new from the point of view of this service IF it was produced by it
- boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(
- casProducer));
- if (parent != null && parent.isFailed() && isNewCas) {
- return true; // no point to continue if the CAS was produced in this aggregate and its parent
- // failed here
- }
- // If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR
- // this component
- // is not a Cas Multiplier
- if (isNewCas && parent.getSubordinateCasInPlayCount() == 0 && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
- return true;
- }
- return false;
- }
+ private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) {
+
+ // Get the key of the Cas Producer
+ String casProducer = cacheEntry.getCasProducerAggregateName();
+ // CAS is considered new from the point of view of this service IF it was
+ // produced by it
+ boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(casProducer));
+ // force to drop a child CAS if this service is not a Cas Multiplier
+ if ( isNewCas ) {
+ if ( !isCasMultiplier()) {
+ System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP THE CAS");
+ return true;
+ }
+ if (parent != null && parent.isFailed()) {
+ return true; // no point to continue if the CAS was produced in this aggregate and its parent
+ // failed
+ }
+ // If the CAS was generated by this component but the Flow Controller wants to
+ // drop the CAS OR this component is not a Cas Multiplier
+ if ( parent != null && parent.getSubordinateCasInPlayCount() == 0
+ && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
+ return true;
+ }
+ }
+
+
+ /*
+ if (isNewCas && isTopLevelComponent() && !isCasMultiplier()) {
+ System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP THE CAS");
+ return true;
+ }
+ if (parent != null && parent.isFailed() && isNewCas) {
+ return true; // no point to continue if the CAS was produced in this aggregate and its parent
+ // failed here
+ }
+ // If the CAS was generated by this component but the Flow Controller wants to
+ // drop the CAS OR
+ // this component
+ // is not a Cas Multiplier
+ if (isNewCas && parent.getSubordinateCasInPlayCount() == 0
+ && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
+ return true;
+ }
+
+ */
+
+ return false;
+ }
private boolean casHasExceptions(CasStateEntry casStateEntry) {
return (casStateEntry.getErrors().size() > 0) ? true : false;
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
index b7a2e1f..555cceb 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
@@ -40,6 +40,7 @@ import org.apache.uima.aae.jmx.JmxManagement;
import org.apache.uima.aae.jmx.ServiceErrors;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.Origin;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.spi.transport.UimaMessageListener;
import org.apache.uima.aae.spi.transport.UimaTransport;
@@ -55,6 +56,8 @@ public interface AnalysisEngineController extends ControllerLifecycle {
public static final String AEInstanceCount = "AEInstanceCount";
+ public Origin getOrigin();
+
public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException;
public ControllerLatch getControllerLatch();
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
index d5562ce..5ee7f54 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
@@ -78,6 +78,8 @@ import org.apache.uima.aae.jmx.ServiceErrors;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
+import org.apache.uima.aae.message.UimaAsOrigin;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.monitor.MonitorBaseImpl;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
@@ -111,6 +113,7 @@ public abstract class BaseAnalysisEngineController extends Resource_ImplBase imp
JMS,
DIRECT
};
+ private final Origin origin;
private static final Class<?> CLASS_NAME = BaseAnalysisEngineController.class;
private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
public enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
@@ -284,7 +287,7 @@ public abstract class BaseAnalysisEngineController extends Resource_ImplBase imp
protected abstract void doWarmUp(CAS cas, String casReferenceId) throws Exception;
public BaseAnalysisEngineController() {
-
+ origin = new UimaAsOrigin("");
}
public BaseAnalysisEngineController(AnalysisEngineController aParentController,
@@ -316,7 +319,7 @@ public abstract class BaseAnalysisEngineController extends Resource_ImplBase imp
Map aDestinationMap, JmxManagement aJmxManagement,boolean disableJCasCache) throws Exception {
System.out.println("C'tor Called Descriptor:"+aDescriptor);
-
+ origin = new UimaAsOrigin(anEndpointName);
casManager = aCasManager;
inProcessCache = anInProcessCache;
localCache = new LocalCache(this);
@@ -529,7 +532,9 @@ public abstract class BaseAnalysisEngineController extends Resource_ImplBase imp
return uimaContext;
}
-
+ public Origin getOrigin() {
+ return origin;
+ }
public void setThreadFactory(ThreadPoolTaskExecutor factory) {
threadFactory = factory;
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
index b32d407..655bea1 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
@@ -21,6 +21,7 @@ package org.apache.uima.aae.controller;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.jmx.ServiceInfo;
+import org.apache.uima.aae.message.Origin;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.TypeSystemImpl;
@@ -31,6 +32,12 @@ public interface Endpoint {
public static final int DISABLED = 3;
+ public void setMessageOrigin(Origin origin);
+
+ public Origin getMessageOrigin();
+
+ public String getUniqueId();
+
public boolean isJavaRemote();
public void setJavaRemote();
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
index 853a20b..7fac0b8 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
@@ -20,11 +20,13 @@
package org.apache.uima.aae.controller;
import java.util.Timer;
+import java.util.UUID;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.TypeSystemImpl;
import org.apache.uima.resource.ResourceSpecifier;
@@ -32,6 +34,8 @@ import org.apache.uima.resource.ResourceSpecifier;
public class Endpoint_impl implements Endpoint, Cloneable {
private static final Class<?> CLASS_NAME = Endpoint_impl.class;
+ private String uniqueId = UUID.randomUUID().toString();
+
private volatile boolean javaRemote=false;
private volatile Object destination = null;
@@ -130,6 +134,18 @@ public class Endpoint_impl implements Endpoint, Cloneable {
private ResourceSpecifier resourceSpecifier;
+ private Origin messageOrigin;
+
+ public void setMessageOrigin(Origin origin) {
+ this.messageOrigin = origin;
+ }
+
+ public Origin getMessageOrigin() {
+ return messageOrigin;
+ }
+ public String getUniqueId() {
+ return uniqueId;
+ }
public void setJavaRemote() {
javaRemote = true;
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java
new file mode 100644
index 0000000..c83da79
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java
@@ -0,0 +1,9 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.message.MessageProcessor;
+
+public abstract class AbstractUimaAsConsumer implements UimaAsConsumer{
+
+ protected abstract void setMessageProcessor(MessageProcessor processor);
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java
new file mode 100644
index 0000000..f5c8e3a
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java
@@ -0,0 +1,5 @@
+package org.apache.uima.aae.definition.connectors;
+
+public interface ComponentConnector {
+ public Object getConnectionInfo();
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java
new file mode 100644
index 0000000..c8d0a68
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java
@@ -0,0 +1,31 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.definition.connectors.basic.BasicConnector;
+import org.apache.uima.aae.definition.connectors.basic.DirectConnector;
+
+public class ConnectorFactory {
+
+ public static ComponentConnector newConnector(String protocol, String vendor) {
+ ComponentConnector connector=null;
+ switch(protocol.toLowerCase()) {
+ case "jms":
+ connector = getJmsConnector(vendor);
+ break;
+ case "direct":
+ connector = new DirectConnector();
+ break;
+
+ default:
+ connector = new BasicConnector();
+ }
+ return connector;
+ }
+ private static ComponentConnector getJmsConnector(String vendor) {
+ return null;
+ }
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java
new file mode 100644
index 0000000..dcfa80d
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java
@@ -0,0 +1,7 @@
+package org.apache.uima.aae.definition.connectors;
+
+public interface ListenerCallback {
+ public void onInitializationError(Exception e);
+ public boolean failedInitialization();
+ public Exception getException();
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java
new file mode 100644
index 0000000..51fb45e
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java
@@ -0,0 +1,11 @@
+package org.apache.uima.aae.definition.connectors;
+
+import java.util.Map;
+
+public interface UimaAsConnector {
+
+ public UimaAsEndpoint createEndpoint(String uri, Map<String, Object> params)
+ throws Exception;
+
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java
new file mode 100644
index 0000000..35c725b
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java
@@ -0,0 +1,17 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.as.client.DirectMessage;
+
+public interface UimaAsConsumer extends Lifecycle {
+ public enum ConsumerType {GetMeta,ProcessCAS,Cpc,FreeCAS,Reply,Info};
+
+ public void initialize() throws Exception;
+ public void initialize(AnalysisEngineController controller) throws Exception;
+
+ public void consume(DirectMessage message) throws Exception;
+
+ public ConsumerType getType();
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java
new file mode 100644
index 0000000..3c854a8
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java
@@ -0,0 +1,15 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.message.MessageContext;
+
+public interface UimaAsEndpoint extends Lifecycle {
+ public UimaAsProducer createProducer(String targetUri) throws Exception;
+ public UimaAsProducer createProducer(UimaAsConsumer consumer, String delegateKey) throws Exception;
+ public UimaAsConsumer createConsumer(String targetUri, ConsumerType type, int consumerThreadCount) throws Exception;
+ public void dispatch(MessageContext messageContext) throws Exception;
+ public UimaAsConsumer getConsumer(String targetUri, ConsumerType type);
+ public MessageContext createMessage(int command, int messageType, Endpoint endpoint);
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java
new file mode 100644
index 0000000..195bb0c
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java
@@ -0,0 +1,13 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UimaAsMessage;
+import org.apache.uima.as.client.DirectMessage;
+
+public interface UimaAsProducer extends Lifecycle {
+
+ public void dispatch(DirectMessage message) throws Exception;
+ public void dispatch(DirectMessage message, UimaAsConsumer target) throws Exception;
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java
new file mode 100644
index 0000000..ca9f52c
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java
@@ -0,0 +1,12 @@
+package org.apache.uima.aae.definition.connectors.basic;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class BasicConnector implements ComponentConnector {
+
+ @Override
+ public Object getConnectionInfo() {
+ return "";
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java
new file mode 100644
index 0000000..a52bf27
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java
@@ -0,0 +1,13 @@
+package org.apache.uima.aae.definition.connectors.basic;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class DirectConnector implements ComponentConnector {
+
+ @Override
+ public Object getConnectionInfo() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java
new file mode 100644
index 0000000..6f5cb2b
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java
@@ -0,0 +1,36 @@
+package org.apache.uima.aae.definition.connectors.jms;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class ActiveMqConnector implements ComponentConnector {
+ private final Object connection;
+
+ ActiveMqConnector(String queue, String broker, int prefetch) {
+ connection = new ActiveMqConnection(queue, broker, prefetch);
+ }
+ @Override
+ public Object getConnectionInfo() {
+ return connection;
+ }
+
+ public class ActiveMqConnection {
+ private final String queueName;
+ private final String brokerUrl;
+ private final int prefetch;
+
+ ActiveMqConnection(String queue, String broker, int prefetch) {
+ this.queueName = queue;
+ this.brokerUrl = broker;
+ this.prefetch = prefetch;
+ }
+ public String getQueueName() {
+ return queueName;
+ }
+ public String getBrokerUrl() {
+ return brokerUrl;
+ }
+ public int getPrefetch() {
+ return prefetch;
+ }
+ }
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java
new file mode 100644
index 0000000..7cbad59
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java
@@ -0,0 +1,9 @@
+package org.apache.uima.aae.message;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+
+public interface MessageProcessor {
+
+ public void process(MessageContext message) throws Exception;
+ public AnalysisEngineController getController();
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java
new file mode 100644
index 0000000..e801a45
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java
@@ -0,0 +1,6 @@
+package org.apache.uima.aae.message;
+
+public interface Origin {
+ public String getUniqueId();
+ public String getName();
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java
new file mode 100644
index 0000000..3cba9d0
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java
@@ -0,0 +1,36 @@
+package org.apache.uima.aae.message;
+
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.AsynchAEException;
+
+public interface UimaAsMessage {
+ public enum Command {GetMetaRequest, GetMetaResponse, CpcRegeuest, CpcResponse, ProcessRequest, ProcessResponse };
+
+ public String getMessageStringProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public int getMessageIntProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public long getMessageLongProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public Object getMessageObjectProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public boolean getMessageBooleanProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public Endpoint getEndpoint();
+
+ public String getStringMessage() throws AsynchAEException;
+
+ public Object getObjectMessage() throws AsynchAEException;
+
+ public byte[] getByteMessage() throws AsynchAEException;
+
+ public Object getRawMessage();
+
+ public boolean propertyExists(String aKey) throws AsynchAEException;
+
+ public void setMessageArrivalTime(long anArrivalTime);
+
+ public long getMessageArrivalTime();
+
+ public String getEndpointName();
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java
new file mode 100644
index 0000000..fa62d86
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java
@@ -0,0 +1,55 @@
+package org.apache.uima.aae.message;
+
+import java.util.UUID;
+
+public class UimaAsOrigin implements Origin {
+
+ private final String uniqueId = UUID.randomUUID().toString();
+ private final String name;
+
+ public UimaAsOrigin(String name) {
+ this.name = name;
+ }
+ @Override
+ public String getUniqueId() {
+ return uniqueId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((uniqueId == null) ? 0 : uniqueId.hashCode());
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ UimaAsOrigin other = (UimaAsOrigin) obj;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (uniqueId == null) {
+ if (other.uniqueId != null)
+ return false;
+ } else if (!uniqueId.equals(other.uniqueId))
+ return false;
+ return true;
+ }
+ @Override
+ public String getName() {
+ return name;
+ }
+ @Override
+ public String toString() {
+ return "Origin[name: " + name + "] [id:"+uniqueId+"]";
+ }
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
index 60e50f7..e086903 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
@@ -20,6 +20,7 @@ package org.apache.uima.aae.service;
import java.util.concurrent.BlockingQueue;
+import org.apache.uima.aae.InProcessCache;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.as.client.DirectMessage;
import org.apache.uima.cas.CAS;
@@ -34,6 +35,7 @@ public interface UimaASService {
public static final int STOP_NOW = 1001;
public String getEndpoint();
+ public int getScaleout();
public String getId();
public void start() throws Exception;
public void stop() throws Exception;
@@ -46,5 +48,5 @@ public interface UimaASService {
public void releaseCAS(String casReferenceId, BlockingQueue<DirectMessage> releaseCASQueue ) throws Exception;
public AnalysisEngineMetaData getMetaData() throws Exception;
public void removeFromCache(String casReferenceId);
-
+ public UimaASService withInProcessCache(InProcessCache cache);
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
index 7cadf5a..f04320c 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
@@ -24,7 +24,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -37,9 +38,15 @@ import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.UimaASUtils;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.AggregateAnalysisEngineComponent;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.ComponentCasPool;
+import org.apache.uima.aae.component.RemoteAnalysisEngineComponent;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.DelegateEndpoint;
import org.apache.uima.aae.controller.Endpoint;
@@ -51,7 +58,6 @@ import org.apache.uima.aae.error.ErrorHandlerChain;
import org.apache.uima.aae.error.Threshold;
import org.apache.uima.aae.error.Thresholds;
import org.apache.uima.aae.error.Thresholds.Action;
-import org.apache.uima.aae.error.UimaAsDelegateException;
import org.apache.uima.aae.error.handler.CpcErrorHandler;
import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
@@ -78,7 +84,6 @@ import org.apache.uima.resource.ResourceCreationSpecifier;
import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
-import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionType;
import org.apache.uima.resourceSpecifier.AnalysisEngineType;
import org.apache.uima.resourceSpecifier.AsyncAggregateErrorConfigurationType;
import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
@@ -109,6 +114,246 @@ public abstract class AbstractUimaAsServiceBuilder implements ServiceBuilder {
}
protected abstract void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception;
+
+// public AnalysisEngineController createController( AnalysisEngineComponent component, int howManyInstances) throws Exception {
+ public AnalysisEngineController createController( AnalysisEngineComponent component, ControllerCallbackListener aListener, String serviceId) throws Exception {
+ AnalysisEngineController controller =
+ createController(component, null /*, component.getScaleout() */);
+ controller.setServiceId(serviceId);
+ controller.addControllerCallbackListener(aListener);
+ return controller;
+ }
+
+ /**
+ * Recursively walks through the AE descriptor creating instances of AnalysisEngineController
+ * and linking them in parent-child tree.
+ *
+ * @param d - wrapper around delegate defined in DD (may be null)
+ * @param resourceSpecifier - AE descriptor specifier
+ * @param name - name of the delegate
+ * @param parentController - reference to a parent controller. TopLevel has no parent
+ * @param howManyInstances - scalout for the delegate
+ *
+ * @return
+ * @throws Exception
+ */
+ public AnalysisEngineController createController( AnalysisEngineComponent component, AnalysisEngineController parentController/*, int howManyInstances */) throws Exception {
+
+ AnalysisEngineController controller = null;
+ System.out.println("---------Controller:"+
+ component.getKey()+
+ " resourceSpecifier:"+
+ component.getResourceSpecifier().getClass().getName()+
+ " ResourceCreationSpecifier:"+(component.getResourceSpecifier() instanceof ResourceCreationSpecifier) );
+
+ if ( component.isPrimitive()) {
+ controller = new PrimitiveAnalysisEngineController_impl(parentController, component.getKey(), component.getResourceSpecifier().getSourceUrlString(),casManager, cache, 10, component.getScaleout());
+ } else {
+ // add an endpoint for each delegate in this aggregate. The endpoint Map is required
+ // during initialization of an aggregate controller.
+ Map<String, Endpoint> endpoints = new HashMap<>();
+ AggregateAnalysisEngineComponent aggregate;
+ if ( component instanceof AggregateAnalysisEngineComponent) {
+ aggregate = (AggregateAnalysisEngineComponent)component;
+ } else if ( component instanceof TopLevelServiceComponent) {
+ aggregate = ((TopLevelServiceComponent)component).aggregateComponent();
+ } else {
+ throw new RuntimeException("Expected instance of AggregateAnalysisEngineComponent, instead is instanceof "+component.getClass().getName());
+ }
+// List<AnalysisEngineComponent> delegateComponents = ((AggregateAnalysisEngineComponent)component).getChildren();
+ List<AnalysisEngineComponent> delegateComponents = aggregate.getChildren();
+ for( AnalysisEngineComponent delegateComponent : delegateComponents ) {
+ endpoints.put(delegateComponent.getKey(), delegateComponent.getEndpoint());
+ }
+ controller = new AggregateAnalysisEngineController_impl(parentController, component.getKey(), component.getResourceSpecifier().getSourceUrlString(), casManager, cache, endpoints);
+ addFlowController((AggregateAnalysisEngineController)controller, (AnalysisEngineDescription)component.getResourceSpecifier());
+ // recursively create delegate controllers for all async delegates
+ createDelegateControllers(aggregate, controller);
+ }
+ if ( !controller.isTopLevelComponent() ) {
+ UimaASService service = createUimaASServiceWrapper(controller, component);
+ service.start();
+ }
+
+ return controller;
+ }
+
+
+
+ private void createDelegateControllers(AggregateAnalysisEngineComponent aggregateComponent, AnalysisEngineController controller) throws Exception {
+ for (AnalysisEngineComponent delegateComponent : aggregateComponent.getChildren()) {
+ // if error handling threshold has not been defined for the delegate, add
+ // default thresholds.
+ addDelegateDefaultErrorHandling(controller, delegateComponent.getKey());
+ if (delegateComponent.isRemote()) {
+ Endpoint endpoint = delegateComponent.getEndpoint();
+ if ("java".equals(endpoint.getServerURI()) ) {
+ endpoint.setJavaRemote();
+ }
+
+ } else {
+ if (Objects.isNull(controller.getOutputChannel(ENDPOINT_TYPE.DIRECT))) {
+ OutputChannel oc = new DirectOutputChannel().withController(controller);
+ oc.initialize();
+ controller.addOutputChannel(oc);
+ }
+ if (Objects.isNull(controller.getInputChannel(ENDPOINT_TYPE.DIRECT))) {
+ DirectInputChannel inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY)
+ .withController(controller);
+// 10/11/18 For Direct messaging the message handlers are not needed. Its using command factory
+// inputChannel.setMessageHandler(getMessageHandler(controller));
+ controller.addInputChannel(inputChannel);
+
+ }
+ createController(delegateComponent, controller /*, scaleout */);
+ }
+
+ }
+
+ }
+
+
+ private UimaASService createUimaASServiceWrapper(AnalysisEngineController controller, AnalysisEngineComponent component) throws Exception {
+
+ AsynchronousUimaASService service =
+ new AsynchronousUimaASService(controller.getComponentName()).withController(controller);
+ // Need an OutputChannel to dispatch messages from this service
+ OutputChannel outputChannel;
+ if ( ( outputChannel = controller.getOutputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ outputChannel = getOutputChannel(controller);
+ }
+
+ // Need an InputChannel to handle incoming messages
+ InputChannel inputChannel;
+ if ((inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ inputChannel = getInputChannel(controller);
+ Handler messageHandlerChain = getMessageHandler(controller);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ controller.addInputChannel(inputChannel);
+ }
+
+ // add reply queue listener to the parent aggregate controller
+ if ( !controller.isTopLevelComponent() ) {
+ // For every delegate the parent controller needs a reply listener.
+ DirectListener replyListener =
+ addDelegateReplyListener(controller, component);
+ // add process, getMeta, reply queues to an endpoint
+ setDelegateDestinations(controller, service, replyListener);
+ }
+ DirectListener processListener =
+ createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getProcessRequestQueue(),Type.ProcessCAS);
+ inputChannel.registerListener(processListener);
+
+ DirectListener getMetaListener =
+ createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getMetaRequestQueue(),Type.GetMeta);
+ inputChannel.registerListener(getMetaListener);
+ if (controller.isCasMultiplier()) {
+ DirectListener freCASChannelListener =
+ createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getFreeCasQueue(),Type.FreeCAS);
+ inputChannel.registerListener(freCASChannelListener);
+ ((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+ }
+
+ /*
+ DirectListener processListener = new DirectListener(Type.ProcessCAS).
+ withController(controller).
+ withConsumerThreads(component.getScaleout()).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getProcessRequestQueue()).
+ initialize();
+ inputChannel.registerListener(processListener);
+
+ DirectListener getMetaListener = new DirectListener(Type.GetMeta).
+ withController(controller).
+ withConsumerThreads(getReplyScaleout(d)).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getMetaRequestQueue()).initialize();
+ inputChannel.registerListener(getMetaListener);
+
+ if (controller.isCasMultiplier()) {
+ DirectListener freCASChannelListener =
+ new DirectListener(Type.FreeCAS).
+ withController(controller).
+ withConsumerThreads(component.getScaleout()).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getFreeCasQueue()).
+ initialize();
+ inputChannel.registerListener(freCASChannelListener);
+ ((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+ }
+ */
+ return service;
+ }
+ private DirectListener createDirectListener(AnalysisEngineController controller, int scaleout, DirectInputChannel inputChannel, BlockingQueue<DirectMessage> q, Type type) throws Exception{
+// DirectListener listener = new DirectListener(type).
+ return new DirectListener(type).
+ withController(controller).
+ withConsumerThreads(scaleout).
+ withInputChannel(inputChannel).
+ withQueue(q).initialize();
+// inputChannel.registerListener(listener);
+// return listener;
+ }
+ private DirectListener addDelegateReplyListener(AnalysisEngineController controller, AnalysisEngineComponent component) throws Exception {
+ DirectInputChannel parentInputChannel;
+ // create parent controller's input channel if necessary
+ if ((controller.getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ // create delegate
+ parentInputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).
+ withController(controller.getParentController());
+ Handler messageHandlerChain = getMessageHandler(controller.getParentController());
+ parentInputChannel.setMessageHandler(messageHandlerChain);
+ controller.getParentController().addInputChannel(parentInputChannel);
+ } else {
+ parentInputChannel = (DirectInputChannel) controller.
+ getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+ int replyScaleout = 1;
+ if ( component instanceof RemoteAnalysisEngineComponent) {
+ ((RemoteAnalysisEngineComponent)component).getReplyConsumerCount();
+ }
+
+ // USE FACTORY HERE. CHANGE DirectListener to interface
+ // DirectListner replyListener = DirectListenerFactory.newReplyListener();
+ DirectListener replyListener = new DirectListener(Type.Reply).
+ withController(controller.getParentController()).
+ withConsumerThreads(replyScaleout).
+ withInputChannel(parentInputChannel).
+ withQueue(new LinkedBlockingQueue<DirectMessage>()).
+ withName(controller.getKey()).
+ initialize();
+ parentInputChannel.registerListener(replyListener);
+
+ return replyListener;
+ }
+ protected void initialize(UimaASService service, ComponentCasPool cp, Transport transport) {
+
+ resourceManager = UimaClassFactory.produceResourceManager();
+ casManager = new AsynchAECasManager_impl(resourceManager);
+ casManager.setCasPoolSize(cp.getPoolSize());
+ casManager.setDisableJCasCache(cp.isDisableJCasCache());
+ casManager.setInitialFsHeapSize(cp.getInitialHeapSize());
+
+ if ( transport.equals(Transport.JMS)) {
+ cache = new InProcessCache();
+ } else if ( transport.equals(Transport.Java)) {
+
+ // ?????????????????????????? is this test necessary?
+ if ( (cache = (InProcessCache)System.getProperties().get("InProcessCache")) == null) {
+ cache = new InProcessCache();
+ System.getProperties().put("InProcessCache", cache);
+ }
+
+ }
+// if ( cache == null ) {
+// cache = new InProcessCache();
+// }
+ }
+
+
+ /*
+ * OLD CODE *****************************
+ */
public AsyncPrimitiveErrorConfigurationType addDefaultErrorHandling(ServiceType s) {
AsyncPrimitiveErrorConfigurationType pec;
@@ -196,13 +441,16 @@ public abstract class AbstractUimaAsServiceBuilder implements ServiceBuilder {
return null;
}
private void addDelegateDefaultErrorHandling(AnalysisEngineController controller, String delegatKey) {
- ErrorHandlerChain erc = controller.getErrorHandlerChain();
- for( ErrorHandler eh : erc ) {
- if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
- // add default error handling
- eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold());
- }
- }
+ if ( Objects.nonNull(controller.getErrorHandlerChain()) ) {
+ ErrorHandlerChain erc = controller.getErrorHandlerChain();
+ for( ErrorHandler eh : erc ) {
+ if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
+ // add default error handling
+ eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold());
+ }
+ }
+ }
+
}
private OutputChannel getOutputChannel(AnalysisEngineController controller ) throws Exception {
OutputChannel outputChannel = null;
@@ -1244,7 +1492,7 @@ public abstract class AbstractUimaAsServiceBuilder implements ServiceBuilder {
private boolean isAggregate(AnalysisEngineType aet) {
// Is this an aggregate? An aggregate has a property async=true or has delegates.
System.out.println("......"+aet.getKey()+" aet.getAsync()="+aet.getAsync()+" aet.isSetAsync()="+aet.isSetAsync()+" aet.isSetDelegates()="+aet.isSetDelegates() );
-
+ Objects.requireNonNull(aet, "AnalysisEngineType must be non-null");
if ( "true".equals(aet.getAsync()) || aet.isSetDelegates() ) {
return true;
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
index 1d0ebce..c4e576c 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
@@ -24,13 +24,12 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import javax.management.ServiceNotFoundException;
-
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.InputChannel.ChannelType;
import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -46,7 +45,6 @@ import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
import org.apache.uima.aae.handler.Handler;
import org.apache.uima.aae.service.AsynchronousUimaASService;
-import org.apache.uima.aae.service.ServiceRegistry;
import org.apache.uima.aae.service.UimaASService;
import org.apache.uima.aae.service.UimaAsServiceRegistry;
import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
@@ -61,8 +59,6 @@ import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
import org.apache.uima.resourceSpecifier.CasPoolType;
-import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
-import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
import org.apache.uima.resourceSpecifier.ServiceType;
public class UimaAsDirectServiceBuilder extends AbstractUimaAsServiceBuilder {
@@ -92,6 +88,144 @@ public class UimaAsDirectServiceBuilder extends AbstractUimaAsServiceBuilder {
}
}
+
+ public UimaASService build(TopLevelServiceComponent topLevelComponent, ControllerCallbackListener callback)
+ throws Exception {
+ AsynchronousUimaASService service = null;
+
+ // is this the only one resource specifier type supported by the current uima-as?
+ if (topLevelComponent.getResourceSpecifier() instanceof AnalysisEngineDescription) {
+ AnalysisEngineDescription aeDescriptor =
+ (AnalysisEngineDescription) topLevelComponent.getResourceSpecifier();
+ String endpoint = resolvePlaceholder(topLevelComponent.getEndpoint().getEndpoint());
+ // Create a Top Level Service (TLS) wrapper. This wrapper may contain
+ // references to multiple TLS service instances if the TLS is scaled
+ // up.
+ service = new AsynchronousUimaASService(endpoint)
+ .withName(aeDescriptor.getAnalysisEngineMetaData().getName())
+ .withResourceSpecifier(topLevelComponent.getResourceSpecifier())
+ .withScaleout(topLevelComponent.getScaleout());
+
+ this.buildAndDeploy(topLevelComponent, service, callback);
+
+
+ }
+ return service;
+ }
+ public UimaASService buildAndDeploy(TopLevelServiceComponent topLevelComponent, AsynchronousUimaASService service, ControllerCallbackListener callback) throws Exception {
+ // create ResourceManager, CasManager, and InProcessCache
+ initialize(service, topLevelComponent.getComponentCasPool(), Transport.Java);
+
+ AnalysisEngineController topLevelController =
+ createController(topLevelComponent, callback, service.getId());
+
+ //topLevelController.addControllerCallbackListener(callback);
+
+ //topLevelController.setServiceId(service.getId());
+
+ service.withInProcessCache(super.cache);
+ System.setProperty("BrokerURI", "Direct");
+ configureTopLevelService(topLevelController, service);//, topLevelComponent.getScaleout());
+ return service;
+
+ }
+
+ private DirectOutputChannel outputChannel(AnalysisEngineController topLevelController) throws Exception {
+ DirectOutputChannel outputChannel = null;
+ if (topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+ outputChannel = new DirectOutputChannel().withController(topLevelController);
+ topLevelController.addOutputChannel(outputChannel);
+ } else {
+ outputChannel = (DirectOutputChannel) topLevelController.
+ getOutputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+ return outputChannel;
+ }
+ private DirectInputChannel inputChannel(AnalysisEngineController topLevelController) throws Exception {
+ DirectInputChannel inputChannel;
+ if ((topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).
+ withController(topLevelController);
+ Handler messageHandlerChain = getMessageHandler(topLevelController);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ topLevelController.addInputChannel(inputChannel);
+ } else {
+ inputChannel = (DirectInputChannel) topLevelController.
+ getInputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+ return inputChannel;
+ }
+/*
+ private void configureTopLevelService(AnalysisEngineController topLevelController,
+ AsynchronousUimaASService service, int howMany) throws Exception {
+*/
+ private void configureTopLevelService(AnalysisEngineController topLevelController,
+ AsynchronousUimaASService service) throws Exception {
+
+ //addErrorHandling(topLevelController, pec);
+
+
+ // create a single instance of OutputChannel for Direct communication if
+ // necessary
+ DirectOutputChannel outputChannel = outputChannel(topLevelController);
+
+ DirectInputChannel inputChannel = inputChannel(topLevelController);
+
+ if ( topLevelController instanceof AggregateAnalysisEngineController ) {
+ ((AggregateAnalysisEngineController_impl)topLevelController).
+ setServiceEndpointName(service.getEndpoint());
+ }
+ BlockingQueue<DirectMessage> pQ = null;
+ BlockingQueue<DirectMessage> mQ = null;
+
+ // Lookup queue name in service registry. If this queue exists, the new service
+ // being
+ // created here will share the same queue to balance the load.
+ UimaASService s;
+ try {
+ s = UimaAsServiceRegistry.getInstance().lookupByEndpoint(service.getEndpoint());
+ if ( s instanceof AsynchronousUimaASService) {
+ pQ = ((AsynchronousUimaASService) s).getProcessRequestQueue();
+ mQ = ((AsynchronousUimaASService) s).getMetaRequestQueue();
+ }
+
+ } catch( Exception ee) {
+ pQ = service.getProcessRequestQueue();
+ mQ = service.getMetaRequestQueue();
+ }
+
+ scaleout = service.getScaleout();
+ DirectListener processListener = new DirectListener(Type.ProcessCAS).withController(topLevelController)
+ .withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(pQ).
+ initialize();
+
+ DirectListener getMetaListener = new DirectListener(Type.GetMeta).withController(topLevelController)
+ .withConsumerThreads(1).withInputChannel(inputChannel).
+ withQueue(mQ).initialize();
+
+ addFreeCASListener(service, topLevelController, inputChannel, outputChannel, scaleout );
+
+ inputChannel.registerListener(getMetaListener);
+ inputChannel.registerListener(processListener);
+
+ service.withController(topLevelController);
+
+ }
+
+
+
+
+
+
+
+
+
+
+ /*
+ * OLD CODE **********************************************************************************
+ */
+
+
public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback)
throws Exception {
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
index abbdfd7..c7be254 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
@@ -47,27 +47,30 @@ import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo;
import org.apache.uima.cas.impl.Serialization;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.Level;
public abstract class AbstractUimaAsCommand implements UimaAsCommand {
protected AnalysisEngineController controller;
private Object mux = new Object();
-
- protected AbstractUimaAsCommand(AnalysisEngineController controller) {
+ private final MessageContext messageContext;
+
+ protected AbstractUimaAsCommand(AnalysisEngineController controller, MessageContext aMessageContext) {
this.controller = controller;
+ this.messageContext = aMessageContext;
}
- protected String getCasReferenceId(Class<?> concreteClassName, MessageContext aMessageContext) throws AsynchAEException {
- if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) {
+ protected String getCasReferenceId(Class<?> concreteClassName/*, MessageContext aMessageContext */) throws AsynchAEException {
+ if (!messageContext.propertyExists(AsynchAEMessage.CasReference)) {
if (UIMAFramework.getLogger(concreteClassName).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(concreteClassName).logrb(Level.INFO, concreteClassName.getName(),
"getCasReferenceId", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_message_has_cas_refid__INFO",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ new Object[] { messageContext.getEndpoint().getEndpoint() });
}
return null;
}
- return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ return messageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
}
protected CacheEntry getCacheEntryForCas(String casReferenceId) {
@@ -95,7 +98,7 @@ public abstract class AbstractUimaAsCommand implements UimaAsCommand {
return (controller.isTopLevelComponent() && controller instanceof AggregateAnalysisEngineController);
}
- protected void handleError(Exception e, CacheEntry cacheEntry, MessageContext mc) {
+ protected void handleError(Exception e, CacheEntry cacheEntry/*, MessageContext mc */) {
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "handleError",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
@@ -105,7 +108,7 @@ public abstract class AbstractUimaAsCommand implements UimaAsCommand {
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
}
ErrorContext errorContext = new ErrorContext();
- errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Endpoint, messageContext.getEndpoint());
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId());
controller.dropCAS(cacheEntry.getCas());
@@ -154,31 +157,69 @@ public abstract class AbstractUimaAsCommand implements UimaAsCommand {
}
- protected static ErrorContext populateErrorContext(MessageContext aMessageCtx) {
+ protected ErrorContext populateErrorContext(/*MessageContext aMessageCtx */) {
ErrorContext errorContext = new ErrorContext();
- if (aMessageCtx != null) {
+ if (messageContext != null) {
try {
- if (aMessageCtx.propertyExists(AsynchAEMessage.Command)) {
+ if (messageContext.propertyExists(AsynchAEMessage.Command)) {
errorContext.add(AsynchAEMessage.Command,
- aMessageCtx.getMessageIntProperty(AsynchAEMessage.Command));
+ messageContext.getMessageIntProperty(AsynchAEMessage.Command));
}
- if (aMessageCtx.propertyExists(AsynchAEMessage.MessageType)) {
+ if (messageContext.propertyExists(AsynchAEMessage.MessageType)) {
errorContext.add(AsynchAEMessage.MessageType,
- aMessageCtx.getMessageIntProperty(AsynchAEMessage.MessageType));
+ messageContext.getMessageIntProperty(AsynchAEMessage.MessageType));
}
- if (aMessageCtx.propertyExists(AsynchAEMessage.CasReference)) {
+ if (messageContext.propertyExists(AsynchAEMessage.CasReference)) {
errorContext.add(AsynchAEMessage.CasReference,
- aMessageCtx.getMessageStringProperty(AsynchAEMessage.CasReference));
+ messageContext.getMessageStringProperty(AsynchAEMessage.CasReference));
}
- errorContext.add(UIMAMessage.RawMsg, aMessageCtx.getRawMessage());
+ errorContext.add(UIMAMessage.RawMsg, messageContext.getRawMessage());
} catch (Exception e) { /* ignore */
}
}
return errorContext;
}
-
+ protected Endpoint getEndpoint() {
+ return messageContext.getEndpoint();
+ }
+ protected int getMessageIntProperty(String propertyName) throws Exception {
+ return messageContext.getMessageIntProperty(propertyName);
+ }
+ protected String getMessageStringProperty(String propertyName) throws Exception {
+ return messageContext.getMessageStringProperty(propertyName);
+ }
+ protected ResourceMetaData getResourceMetaData() throws Exception {
+ return (ResourceMetaData)messageContext.getMessageObjectProperty(AsynchAEMessage.AEMetadata);
+ }
+ protected String getStringMessage() throws Exception {
+ return messageContext.getStringMessage();
+ }
+ protected Object getMessageObjectProperty(String propertyName) throws Exception {
+ return messageContext.getMessageObjectProperty(propertyName);
+ }
+ protected boolean getMessageBooleanProperty(String propertyName) throws Exception {
+ return messageContext.getMessageBooleanProperty(propertyName);
+ }
+ protected long getMessageLongProperty( String propertyName) throws Exception {
+ return messageContext.getMessageLongProperty(propertyName);
+ }
+ protected boolean propertyExists(String propertyName) throws Exception {
+ return messageContext.propertyExists(propertyName);
+ }
+ protected String getEndpointName() {
+ return messageContext.getEndpointName();
+ }
+ protected Object getObjectMessage() throws Exception {
+ return messageContext.getObjectMessage();
+ }
+ protected byte[] getByteMessage() throws Exception {
+ return messageContext.getByteMessage();
+ }
+ protected MessageContext getMessageContext() {
+ return messageContext;
+ }
protected Endpoint fetchParentCasOrigin(String parentCasId) throws AsynchAEException {
Endpoint endpoint = null;
String parentId = parentCasId;
@@ -224,8 +265,8 @@ public abstract class AbstractUimaAsCommand implements UimaAsCommand {
return cas;
}
- protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint,
- MessageContext mc) throws Exception {
+ protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint
+ /*MessageContext mc*/) throws Exception {
SerializationResult result = new SerializationResult();
// Aggregate time spent waiting for a CAS in the shadow cas pool
@@ -250,17 +291,17 @@ public abstract class AbstractUimaAsCommand implements UimaAsCommand {
// Create deserialized wrapper for XMI, BINARY, COMPRESSED formats. To add
// a new serialization format add a new class which implements
// UimaASDeserializer and modify DeserializerFactory class.
- UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+ UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, messageContext);
deserializer.deserialize(result);
return result;
}
- protected SerializationResult deserializeInputCAS( MessageContext mc)
+ protected SerializationResult deserializeInputCAS()
throws Exception {
SerializationResult result = new SerializationResult();
- String origin = mc.getEndpoint().getEndpoint();
- Endpoint endpoint = mc.getEndpoint();
+ String origin = messageContext.getEndpoint().getEndpoint();
+ Endpoint endpoint = messageContext.getEndpoint();
// Time how long we wait on Cas Pool to fetch a new CAS
long t1 = controller.getCpuTime();
@@ -276,20 +317,20 @@ public abstract class AbstractUimaAsCommand implements UimaAsCommand {
return null;
}
- UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+ UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, messageContext);
deserializer.deserialize(result);
return result;
}
- protected Delegate getDelegate(MessageContext mc) throws AsynchAEException {
+ protected Delegate getDelegate(/* MessageContext mc */) throws AsynchAEException {
String delegateKey = null;
- if (mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length() == 0) {
- String fromEndpoint = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ if (messageContext.getEndpoint().getEndpoint() == null || messageContext.getEndpoint().getEndpoint().trim().length() == 0) {
+ String fromEndpoint = messageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(fromEndpoint);
} else {
delegateKey = ((AggregateAnalysisEngineController) controller)
- .lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+ .lookUpDelegateKey(messageContext.getEndpoint().getEndpoint());
}
return ((AggregateAnalysisEngineController) controller).lookupDelegate(delegateKey);
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
index efb06fe..667f081 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
@@ -23,14 +23,14 @@ import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.message.MessageContext;
public class CollectionProcessCompleteRequestCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public CollectionProcessCompleteRequestCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+// this.mc = mc;
}
public void execute() throws Exception {
- Endpoint endpoint = mc.getEndpoint();
- controller.collectionProcessComplete(endpoint);
+// Endpoint endpoint = mc.getEndpoint();
+ controller.collectionProcessComplete(super.getEndpoint());
}
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
index fff8fc9..ffe450c 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
@@ -26,14 +26,14 @@ import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
public class CollectionProcessCompleteResponseCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public CollectionProcessCompleteResponseCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller,mc);
+// this.mc = mc;
}
public void execute() throws Exception {
- Delegate delegate = super.getDelegate(mc);
+ Delegate delegate = super.getDelegate();
try {
System.out.println("..... Controller:"+controller.getComponentName()+" Handling CPC From "+delegate.getKey());
((AggregateAnalysisEngineController)controller)
@@ -41,7 +41,7 @@ public class CollectionProcessCompleteResponseCommand extends AbstractUimaAsCom
} catch (Exception e) {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
- errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint());
controller.getErrorHandlerChain().handle(e, errorContext, controller);
}
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
index eb3d067..e72a287 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
@@ -22,6 +22,7 @@ import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UimaAsMessage;
public class CommandFactory {
// Can't instantiate this factory. Use static methods only
@@ -41,6 +42,7 @@ public class CommandFactory {
return CommandBuilder.createNoOpCommand(mc, controller);
}
+
private static UimaAsCommand newRequestCommand(MessageContext mc, AnalysisEngineController controller)
throws AsynchAEException {
int command = mc.getMessageIntProperty(AsynchAEMessage.Command);
@@ -157,5 +159,146 @@ public class CommandFactory {
}
}
+
+ /* ------------------------------------------------------------------ */
+ /* ------------------------------------------------------------------ */
+ /* ------------------------------------------------------------------ */
+ /* ------------------------------------------------------------------ */
+ /* ------------------------------------------------------------------ */
+ /* ------------------------------------------------------------------ */
+
+
+
+
+ /*
+ public static UimaAsCommand newCommand(UimaAsMessage mc, AnalysisEngineController controller)
+ throws AsynchAEException {
+ // Message type is either Request or Response
+ int messageType = mc.getMessageIntProperty(AsynchAEMessage.MessageType);
+
+ if (messageType == AsynchAEMessage.Request) {
+ return newRequestCommand(mc, controller);
+ } else if (messageType == AsynchAEMessage.Response) {
+ return newResponseCommand(mc, controller);
+ }
+
+ return CommandBuilder.createNoOpCommand(mc, controller);
+ }
+ private static UimaAsCommand newRequestCommand(MessageContext mc, AnalysisEngineController controller)
+ throws AsynchAEException {
+ int command = mc.getMessageIntProperty(AsynchAEMessage.Command);
+ UimaAsCommand command2Run;
+ switch (command) {
+ case AsynchAEMessage.Process:
+ if (mc.propertyExists(AsynchAEMessage.CasSequence)) {
+ command2Run = CommandBuilder.createProcessChildCasRequestCommand(mc, controller);
+ } else {
+ command2Run = CommandBuilder.createProcessInputCasRequestCommand(mc, controller);
+ }
+ break;
+ case AsynchAEMessage.GetMeta:
+ command2Run = CommandBuilder.createGetMetaRequestCommand(mc, controller);
+ break;
+ case AsynchAEMessage.CollectionProcessComplete:
+ command2Run = CommandBuilder.createCollectionProcessCompleteRequestCommand(mc, controller);
+ break;
+
+ case AsynchAEMessage.ReleaseCAS:
+ command2Run = CommandBuilder.createReleaseCASRequestCommand(mc, controller);
+ break;
+ default:
+ command2Run = CommandBuilder.createNoOpCommand(mc, controller);
+ break;
+ }
+ return command2Run;
+ }
+
+ private static UimaAsCommand newResponseCommand(MessageContext mc, AnalysisEngineController controller)
+ throws AsynchAEException {
+ int command = mc.getMessageIntProperty(AsynchAEMessage.Command);
+ UimaAsCommand command2Run;
+ switch (command) {
+ case AsynchAEMessage.Process:
+ if (mc.propertyExists(AsynchAEMessage.CasSequence)) {
+ command2Run = CommandBuilder.createProcessChildCasResponseCommand(mc, controller);
+ } else {
+ command2Run = CommandBuilder.createProcessInputCasResponseCommand(mc, controller);
+ }
+ break;
+ case AsynchAEMessage.GetMeta:
+ command2Run = CommandBuilder.createGetMetaResponseCommand(mc, controller);
+ break;
+ case AsynchAEMessage.CollectionProcessComplete:
+ command2Run = CommandBuilder.createCollectionProcessCompleteResponseCommand(mc, controller);
+ break;
+ case AsynchAEMessage.ServiceInfo:
+ command2Run = CommandBuilder.createServiceInfoResponseCommand(mc, controller);
+ break;
+ default:
+ command2Run = CommandBuilder.createNoOpCommand(mc, controller);
+ break;
+ }
+ return command2Run;
+ }
+
+
+
+ private static class CommandBuilder {
+ // Can't instantiate the builder. Use static calls only
+ private CommandBuilder() {
+
+ }
+ static UimaAsCommand createProcessChildCasRequestCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessChildCasRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createProcessInputCasRequestCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessInputCasRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createProcessChildCasResponseCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessChildCasResponseCommand(mc, controller);
+ }
+
+ static UimaAsCommand createProcessInputCasResponseCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessInputCasResponseCommand(mc, controller);
+ }
+
+ static UimaAsCommand createGetMetaResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+ return new GetMetaResponseCommand(mc, controller);
+ }
+
+ static UimaAsCommand createGetMetaRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ return new GetMetaRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createReleaseCASRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ return new ReleaseCASRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createNoOpCommand(MessageContext mc, AnalysisEngineController controller) {
+ return new NoOpCommand(mc, controller);
+ }
+
+ static UimaAsCommand createCollectionProcessCompleteRequestCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new CollectionProcessCompleteRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createServiceInfoResponseCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessServiceInfoResponseCommand(mc, controller);
+ }
+ static UimaAsCommand createCollectionProcessCompleteResponseCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new CollectionProcessCompleteResponseCommand(mc, controller);
+ }
+
+ }
+*/
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java
index 566f924..a706e9c 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java
@@ -27,14 +27,14 @@ import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.util.Level;
public class GetMetaRequestCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+ //private MessageContext mc;
public GetMetaRequestCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+ // this.mc = mc;
}
public void execute() throws Exception {
- Endpoint endpoint = mc.getEndpoint();
+ Endpoint endpoint = super.getEndpoint();
if (controller.isTopLevelComponent()) {
endpoint.setCommand(AsynchAEMessage.GetMeta);
controller.cacheClientEndpoint(endpoint);
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java
index 80f127e..6320696 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java
@@ -31,25 +31,25 @@ import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.XMLInputSource;
public class GetMetaResponseCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public GetMetaResponseCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+ // this.mc = mc;
}
public void execute() throws Exception {
System.out.println(".......... GetMetaResponseCommand.execute()- handling GetMeta response - Controller:"+controller.getName());
// Endpoint endpoint = mc.getEndpoint();
- int payload = mc
- .getMessageIntProperty(AsynchAEMessage.Payload);
+ int payload = //mc
+ super.getMessageIntProperty(AsynchAEMessage.Payload);
if (AsynchAEMessage.Exception == payload) {
return;
}
- String fromEndpoint = mc
- .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ String fromEndpoint =// mc
+ super.getMessageStringProperty(AsynchAEMessage.MessageFrom);
String delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(fromEndpoint);
@@ -58,18 +58,22 @@ public class GetMetaResponseCommand extends AbstractUimaAsCommand {
// ((MessageContext) anObjectToHandle).getMessageIntProperty(AsynchAEMessage.SERIALIZATION);
if ( serializationSupportedByRemote == AsynchAEMessage.None ) {
- resource = (ResourceMetaData)
- ((MessageContext)mc).getMessageObjectProperty(AsynchAEMessage.AEMetadata);
+ resource = super.getResourceMetaData();
+ //(ResourceMetaData)
+ //((MessageContext)mc).getMessageObjectProperty(AsynchAEMessage.AEMetadata);
} else {
- String analysisEngineMetadata = ((MessageContext) mc).getStringMessage();
+ String analysisEngineMetadata = super.getStringMessage();
+ //((MessageContext) mc).getStringMessage();
ByteArrayInputStream bis = new ByteArrayInputStream(analysisEngineMetadata.getBytes());
XMLInputSource in1 = new XMLInputSource(bis, null);
resource = UIMAFramework.getXMLParser().parseResourceMetaData(in1);
}
String fromServer = null;
- if (((MessageContext) mc).propertyExists(AsynchAEMessage.EndpointServer)) {
- fromServer = ((MessageContext) mc)
- .getMessageStringProperty(AsynchAEMessage.EndpointServer);
+// if (((MessageContext) mc).propertyExists(AsynchAEMessage.EndpointServer)) {
+ if ( super.propertyExists(AsynchAEMessage.EndpointServer)) {
+ fromServer = super.getMessageStringProperty(AsynchAEMessage.EndpointServer);
+ //((MessageContext) mc)
+ //.getMessageStringProperty(AsynchAEMessage.EndpointServer);
}
((AggregateAnalysisEngineController)controller).changeCollocatedDelegateState(delegateKey, ServiceState.RUNNING);
// If old service does not echo back the external broker name then the queue name must
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java
index f0eacbe..c6f0ec4 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java
@@ -23,16 +23,16 @@ import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
public class NoOpCommand extends AbstractUimaAsCommand {
- MessageContext mc;
+// MessageContext mc;
public NoOpCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+// this.mc = mc;
}
public void execute() throws Exception {
System.out.println("*******************************************************"
+ "\nNoOpCommand.execute() - Either wrong command or message type - Command:"+
- mc.getMessageIntProperty(AsynchAEMessage.Command) + " MessageType:"+
- mc.getMessageIntProperty(AsynchAEMessage.MessageType) + " Service:"+controller.getComponentName() +
+ super.getMessageIntProperty(AsynchAEMessage.Command) + " MessageType:"+
+ super.getMessageIntProperty(AsynchAEMessage.MessageType) + " Service:"+controller.getComponentName() +
"\n*******************************************************");
}
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java
index 9df8421..832fb9c 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java
@@ -27,21 +27,21 @@ import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.util.Level;
public class PingRequestCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public PingRequestCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+// this.mc = mc;
}
public void execute() throws Exception {
try {
ENDPOINT_TYPE et = ENDPOINT_TYPE.DIRECT; // default
- if ( mc.getEndpoint().isRemote() ) {
+ if ( super.getEndpoint().isRemote() ) {
et = ENDPOINT_TYPE.JMS;
}
- controller.getOutputChannel(et).sendReply(AsynchAEMessage.Ping, mc.getEndpoint(), null, false);
+ controller.getOutputChannel(et).sendReply(AsynchAEMessage.Ping, super.getEndpoint(), null, false);
} catch (Exception e) {
if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.WARNING)) {
if (controller != null) {
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
index a8b01ac..760a81d 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
@@ -37,21 +37,21 @@ import org.apache.uima.cas.Marker;
import org.apache.uima.util.Level;
public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public ProcessChildCasRequestCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+// this.mc = mc;
}
public void execute() throws Exception {
- int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
- String casReferenceId = super.getCasReferenceId(this.getClass(), mc);
- String parentCasReferenceId = mc.getMessageStringProperty(AsynchAEMessage.InputCasReference);
+ int payload = super.getMessageIntProperty(AsynchAEMessage.Payload);
+ String casReferenceId = super.getCasReferenceId(this.getClass());
+ String parentCasReferenceId = super.getMessageStringProperty(AsynchAEMessage.InputCasReference);
System.out.println(">>>>>>>>>>>>>>> Controller:"+controller.getComponentName()+
" in ProcessChildCasRequestCommand.execute() - Child CAS:"+casReferenceId+
" Parent CAS:"+parentCasReferenceId+
- " from "+mc
+ " from "+super
.getMessageStringProperty(AsynchAEMessage.MessageFrom)
);
@@ -63,7 +63,7 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
"execute",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_input_cas_invalid__INFO",
- new Object[] { controller.getComponentName(), mc.getEndpointName(),
+ new Object[] { controller.getComponentName(), super.getEndpointName(),
parentCasReferenceId });
}
@@ -90,7 +90,7 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
System.out.println(
"ProcessChildCasRequestCommand.execute() - Child CasReferenceId:"+casReferenceId+" From Co-located CM");
- if (mc.getEndpoint() == null) {
+ if (super.getEndpoint() == null) {
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "executeDirectRequest",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
@@ -102,8 +102,8 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
executeDirectRequest(casReferenceId, parentCasReferenceId);
} else {
// Check if there is an XMI cargo in the message
- if (mc.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload
- && mc.getStringMessage() == null) {
+ if (super.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload
+ && super.getStringMessage() == null) {
return; // No XMI just return
}
System.out.println(
@@ -112,10 +112,10 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
executeRemoteRequest(casReferenceId, parentCasReferenceId);
}
}
- private void saveFreeCasDestination(CasStateEntry childCasStateEntry) throws AsynchAEException {
- if ( mc.getMessageObjectProperty(AsynchAEMessage.FreeCASQueue) != null ) {
+ private void saveFreeCasDestination(CasStateEntry childCasStateEntry) throws Exception {
+ if ( super.getMessageObjectProperty(AsynchAEMessage.FreeCASQueue) != null ) {
Object freeCASQueue =
- mc.getMessageObjectProperty(AsynchAEMessage.FreeCASQueue);
+ super.getMessageObjectProperty(AsynchAEMessage.FreeCASQueue);
Endpoint freeCasNotificationEndpoint =
new Endpoint_impl();
freeCasNotificationEndpoint.setServerURI("java");
@@ -205,7 +205,7 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
// CAS is a top ancestor from which child CASes are produced.
childCasStateEntry.setInputCasReferenceId(parentCasStateEntry.getInputCasReferenceId());
- mc.getEndpoint().setIsCasMultiplier(true);
+ super.getEndpoint().setIsCasMultiplier(true);
associateInputCASOriginWithChildCAS(childCasReferenceId, parentCasReferenceId, delegateCasMultiplier );
Endpoint e = ((AggregateAnalysisEngineController_impl)controller).
@@ -266,7 +266,7 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
if ( !(e instanceof AsynchAEException) ) {
e = new AsynchAEException(e);
}
- controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+ controller.getErrorHandlerChain().handle(e, super.populateErrorContext(), controller);
e.printStackTrace();
@@ -280,9 +280,9 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
try {
CasStateEntry parentCasStateEntry = saveFreeCasEndpointInParentCasStateEntry(parentCasReferenceId);
- computeStats(mc, parentCasReferenceId);
+ computeStats( parentCasReferenceId);
- mc.getEndpoint().setDestination(null);
+ super.getEndpoint().setDestination(null);
// This CAS came in from a CAS Multiplier. Treat it differently than the
// input CAS. In case the Aggregate needs to send this CAS to the
@@ -296,8 +296,8 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
// the client of
// this service. Client endpoint is attached to an input Cas cache entry.
if (replyToEndpoint != null) {
- mc.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
- mc.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
+ super.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
+ super.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
}
// create local cache entry for the child CAS
CasStateEntry childCasStateEntry = super.getCasStateEntry(childCasReferenceId);
@@ -321,14 +321,14 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
long t1 = controller.getCpuTime();
Exception cachedException = null;
try {
- result = deserializeChildCAS(delegate.getKey(), mc.getEndpoint(), mc);
+ result = deserializeChildCAS(delegate.getKey(), super.getEndpoint());
} catch (Exception e) {
failed = true;
cachedException = e;
} finally {
// create child CAS cache entry
- childCacheEntry = controller.getInProcessCache().register(result.getCas(), mc,
+ childCacheEntry = controller.getInProcessCache().register(result.getCas(), super.getMessageContext(),
result.getDeserSharedData(), result.getReuseInfo(), childCasReferenceId, result.getMarker(),
result.acceptsDeltaCas());
childCacheEntry.setInputCasReferenceId(parentCasReferenceId);
@@ -348,8 +348,8 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
// *************************************************************************
boolean acceptsDeltaCas = false;
Marker marker = null;
- if (mc.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
- acceptsDeltaCas = mc.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
+ if (super.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
+ acceptsDeltaCas = super.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
if (acceptsDeltaCas) {
marker = result.getCas().createMarker();
result.setAcceptsDeltaCas(acceptsDeltaCas);
@@ -365,7 +365,7 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "executeRemoteRequest",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
- new Object[] { mc.getEndpoint().getEndpoint() });
+ new Object[] { super.getEndpoint().getEndpoint() });
}
((AggregateAnalysisEngineController) controller).process(result.getCas(), parentCasReferenceId,
@@ -373,7 +373,7 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
}
} catch (Exception e) {
- super.handleError(e, childCacheEntry, mc);
+ super.handleError(e, childCacheEntry);
}
}
@@ -383,8 +383,8 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
controller.getInProcessCache().setCasProducer(childCasStateEntry.getCasReferenceId(), delegate.getKey());
}
- private Delegate getLastDelegate(CasStateEntry childCasStateEntry) throws AsynchAEException {
- String cmEndpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ private Delegate getLastDelegate(CasStateEntry childCasStateEntry) throws Exception {
+ String cmEndpointName = super.getMessageStringProperty(AsynchAEMessage.MessageFrom);
String newCASProducedBy = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(cmEndpointName);
Delegate delegate = ((AggregateAnalysisEngineController) controller).lookupDelegate(newCASProducedBy);
@@ -392,14 +392,14 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
}
private CasStateEntry saveFreeCasEndpointInParentCasStateEntry(String parentCasReferenceId)
- throws AsynchAEException {
+ throws Exception {
// Fetch the name of the Cas Multiplier's input queue
// String cmEndpointName = aMessageContext.getEndpoint().getEndpoint();
- String cmEndpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ String cmEndpointName = super.getMessageStringProperty(AsynchAEMessage.MessageFrom);
String newCASProducedBy = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(cmEndpointName);
Endpoint casMultiplierEndpoint = ((AggregateAnalysisEngineController) controller)
.lookUpEndpoint(newCASProducedBy, false);
- Endpoint freeCasEndpoint = mc.getEndpoint();
+ Endpoint freeCasEndpoint = super.getEndpoint();
// Clone an endpoint where Free Cas Request will be sent
freeCasEndpoint = (Endpoint) ((Endpoint_impl) freeCasEndpoint).clone();
@@ -416,42 +416,42 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
return parentCasStateEntry;
}
- protected void computeStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException {
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
- long departureTime = controller.getTime(aCasReferenceId, aMessageContext.getEndpoint().getEndpoint());
+ protected void computeStats(String aCasReferenceId) throws Exception {
+ if (super.propertyExists(AsynchAEMessage.TimeInService)) {
+ long departureTime = controller.getTime(aCasReferenceId, super.getEndpoint().getEndpoint());
long currentTime = System.nanoTime();
long roundTrip = currentTime - departureTime;
- long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+ long timeInService = super.getMessageLongProperty(AsynchAEMessage.TimeInService);
long totalTimeInComms = currentTime - (departureTime - timeInService);
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
- new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),
+ new Object[] { aCasReferenceId, super.getEndpoint(),
(double) roundTrip / (double) 1000000 });
UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000,
- aMessageContext.getEndpoint() });
+ super.getEndpoint() });
UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000,
- aMessageContext.getEndpoint() });
+ super.getEndpoint() });
}
}
- aggregateDelegateStats(aMessageContext, aCasReferenceId);
+ aggregateDelegateStats( aCasReferenceId);
}
- protected synchronized void aggregateDelegateStats(MessageContext aMessageContext, String aCasReferenceId)
+ protected synchronized void aggregateDelegateStats( String aCasReferenceId)
throws AsynchAEException {
String delegateKey = "";
try {
delegateKey = ((AggregateAnalysisEngineController) controller)
- .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ .lookUpDelegateKey(super.getEndpoint().getEndpoint());
CacheEntry entry = controller.getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
if (entry == null) {
throw new AsynchAEException("CasReferenceId:" + aCasReferenceId + " Not Found in the Cache.");
@@ -472,18 +472,18 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
.getServicePerformance(delegateKey);
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
- long timeToSerializeCAS = ((Long) aMessageContext
- .getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+ if (super.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+ long timeToSerializeCAS = super
+ .getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS);
if (timeToSerializeCAS > 0) {
if (delegateServicePerformance != null) {
delegateServicePerformance.incrementCasSerializationTime(timeToSerializeCAS);
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
- long timeToDeserializeCAS = ((Long) aMessageContext
- .getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+ if (super.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+ long timeToDeserializeCAS = super
+ .getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS);
if (timeToDeserializeCAS > 0) {
if (delegateServicePerformance != null) {
delegateServicePerformance.incrementCasDeserializationTime(timeToDeserializeCAS);
@@ -491,20 +491,20 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
- long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
+ if (super.propertyExists(AsynchAEMessage.IdleTime)) {
+ long idleTime = super.getMessageLongProperty(AsynchAEMessage.IdleTime);
if (idleTime > 0 && delegateServicePerformance != null) {
- Endpoint endp = aMessageContext.getEndpoint();
+ Endpoint endp = super.getEndpoint();
if (endp != null && endp.isRemote()) {
delegateServicePerformance.incrementIdleTime(idleTime);
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
- long timeWaitingForCAS = ((Long) aMessageContext
- .getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
- if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+ if (super.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+ long timeWaitingForCAS = super
+ .getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS);
+ if (timeWaitingForCAS > 0 && super.getEndpoint().isRemote()) {
entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
delegateServicePerformance.incrementCasPoolWaitTime(
timeWaitingForCAS - delegateServicePerformance.getRawCasPoolWaitTime());
@@ -513,10 +513,10 @@ public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
- long timeInProcessCAS = ((Long) aMessageContext
- .getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
- Endpoint endp = aMessageContext.getEndpoint();
+ if (super.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+ long timeInProcessCAS = super
+ .getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS);
+ Endpoint endp = super.getEndpoint();
if (endp != null && endp.isRemote()) {
if (delegateServicePerformance != null) {
// calculate the time spent in analysis. The remote service returns total time
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java
index 14b64c8..06892b3 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java
@@ -37,11 +37,11 @@ import org.apache.uima.cas.CAS;
import org.apache.uima.util.Level;
public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public ProcessChildCasResponseCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+// this.mc = mc;
}
public void execute() throws Exception {
// System.out.println(">>>>>>>>>>>>>>> in ProcessChildCasResponseCommand.execute(");
@@ -76,21 +76,21 @@ public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand {
CacheEntry cacheEntry = null;
try {
- casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ casReferenceId = super.getMessageStringProperty(AsynchAEMessage.CasReference);
cacheEntry = controller.getInProcessCache().getCacheEntryForCAS(casReferenceId);
CasStateEntry casStateEntry = ((AggregateAnalysisEngineController)controller)
.getLocalCache().lookupEntry(casReferenceId);
CAS cas = cacheEntry.getCas();
String delegateKey = null;
- if ( mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length()==0) {
- String fromEndpoint = mc
+ if ( super.getEndpoint().getEndpoint() == null || super.getEndpoint().getEndpoint().trim().length()==0) {
+ String fromEndpoint = super
.getMessageStringProperty(AsynchAEMessage.MessageFrom);
delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(fromEndpoint);
} else {
delegateKey = ((AggregateAnalysisEngineController) controller)
- .lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+ .lookUpDelegateKey(super.getEndpoint().getEndpoint());
}
Delegate delegate = ((AggregateAnalysisEngineController) controller)
.lookupDelegate(delegateKey);
@@ -101,7 +101,7 @@ public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand {
delegate.removeCasFromOutstandingList(casReferenceId);
if (cas != null) {
- cancelTimerAndProcess(mc, casReferenceId, cas);
+ cancelTimerAndProcess(casReferenceId, cas);
} else {
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(getClass()).logrb(
@@ -111,7 +111,7 @@ public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand {
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_not_in_cache__INFO",
new Object[] { controller.getName(), casReferenceId,
- mc.getEndpoint().getEndpoint() });
+ super.getEndpoint().getEndpoint() });
}
throw new AsynchAEException("CAS with Reference Id:" + casReferenceId
+ " Not Found in CasManager's CAS Cache");
@@ -121,13 +121,13 @@ public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
- errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint());
controller.getErrorHandlerChain().handle(e, errorContext, controller);
} finally {
- incrementDelegateProcessCount(mc);
+ incrementDelegateProcessCount();
if (controller instanceof AggregateAnalysisEngineController) {
try {
- String endpointName = mc.getEndpoint().getEndpoint();
+ String endpointName = super.getEndpoint().getEndpoint();
String delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(endpointName);
if (delegateKey != null) {
@@ -164,8 +164,8 @@ public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand {
}
}
- private void incrementDelegateProcessCount(MessageContext aMessageContext) {
- Endpoint endpoint = aMessageContext.getEndpoint();
+ private void incrementDelegateProcessCount() {
+ Endpoint endpoint = super.getEndpoint();
if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
try {
String delegateKey = ((AggregateAnalysisEngineController) controller)
@@ -186,7 +186,7 @@ public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand {
private void executeRemoteRequest(String casReferenceId) throws AsynchAEException {
}
- private void cancelTimerAndProcess(MessageContext aMessageContext, String aCasReferenceId,
+ private void cancelTimerAndProcess( String aCasReferenceId,
CAS aCAS) throws AsynchAEException {
// computeStats(aMessageContext, aCasReferenceId);
// ((AggregateAnalysisEngineController) controller).process(aCAS, anInputCasReferenceId,
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java
index 1eb2359..51eefdb 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java
@@ -44,20 +44,20 @@ import org.apache.uima.util.Level;
public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
private static final Class<?> CLASS_NAME = ProcessInputCasRequestCommand.class;
- private MessageContext mc;
+// private MessageContext mc;
// controls access to Aggregates semaphore which
// throttles processing of CASes from a service input queue
private Object lock = new Object();
public ProcessInputCasRequestCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+// this.mc = mc;
}
public void execute() throws Exception {
- int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
- String inputCasReferenceId = super.getCasReferenceId(this.getClass(), mc);
+ int payload = super.getMessageIntProperty(AsynchAEMessage.Payload);
+ String inputCasReferenceId = super.getCasReferenceId(this.getClass());
if (inputCasReferenceId == null) {
// LOG THIS
System.out.println(
@@ -87,33 +87,33 @@ public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
}
- private void saveReplyTo() throws AsynchAEException {
+ private void saveReplyTo() throws Exception {
// !!!!!!!!!!!!!!!!! HACK !!!!!!!!!!!!!!!!!!!
// Save true replyTo endpoint to the service sending the request
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
- Object replyTo = mc.getMessageObjectProperty(AsynchAEMessage.ReplyToEndpoint);
- mc.getEndpoint().setReplyDestination(replyTo);
+ Object replyTo = super.getMessageObjectProperty(AsynchAEMessage.ReplyToEndpoint);
+ super.getEndpoint().setReplyDestination(replyTo);
}
- private void saveDelegateKey() throws AsynchAEException{
- String delegateKey = mc.getMessageStringProperty(AsynchAEMessage.DelegateKey);
+ private void saveDelegateKey() throws Exception{
+ String delegateKey = super.getMessageStringProperty(AsynchAEMessage.DelegateKey);
if ( delegateKey == null ) {
- delegateKey = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ delegateKey = super.getMessageStringProperty(AsynchAEMessage.MessageFrom);
}
- mc.getEndpoint().setDelegateKey(delegateKey);
+ super.getEndpoint().setDelegateKey(delegateKey);
}
- private void saveEndpointName() throws AsynchAEException {
- String endpointName = mc.getMessageStringProperty(AsynchAEMessage.EndpointName);
+ private void saveEndpointName() throws Exception {
+ String endpointName = super.getMessageStringProperty(AsynchAEMessage.EndpointName);
if (endpointName == null ) {
- endpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ endpointName = super.getMessageStringProperty(AsynchAEMessage.MessageFrom);
}
- mc.getEndpoint().setEndpoint(endpointName);
+ super.getEndpoint().setEndpoint(endpointName);
}
private void addMessageOrigin(CacheEntry inputCasCacheEntry) {
if (!controller.isPrimitive()) {
- ((AggregateAnalysisEngineController) controller).addMessageOrigin(inputCasCacheEntry.getCasReferenceId(), mc.getEndpoint());
+ ((AggregateAnalysisEngineController) controller).addMessageOrigin(inputCasCacheEntry.getCasReferenceId(), super.getEndpoint());
}
}
@@ -148,7 +148,7 @@ public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
// Create a CasStateEntry in a local cache
CasStateEntry localStateEntry = getCasStateEntry(inputCasCacheEntry.getCasReferenceId());
// associate client endpoint with the input CAS. We need to reply to this client
- localStateEntry.setClientEndpoint(mc.getEndpoint());
+ localStateEntry.setClientEndpoint(super.getEndpoint());
localStateEntry.setInputCasReferenceId(inputCasCacheEntry.getCasReferenceId());
@@ -163,7 +163,7 @@ public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
process(inputCasCacheEntry, false);
} catch (AsynchAEException e) {
- controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+ controller.getErrorHandlerChain().handle(e, super.populateErrorContext(), controller);
e.printStackTrace();
} catch (Exception e) {
// if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
@@ -178,7 +178,7 @@ public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
// e = new AsynchAEException(e);
// }
e = new AsynchAEException(e);
- controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+ controller.getErrorHandlerChain().handle(e, super.populateErrorContext(), controller);
e.printStackTrace();
} finally {
@@ -219,12 +219,12 @@ public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
}
long inTime = System.nanoTime();
- SerializationResult result = super.deserializeInputCAS(mc);
+ SerializationResult result = super.deserializeInputCAS();
// Time how long we wait on Cas Pool to fetch a new CAS
long t1 = controller.getCpuTime();
- inputCasCacheEntry = controller.getInProcessCache().register(result.getCas(), mc, result.getDeserSharedData(),
+ inputCasCacheEntry = controller.getInProcessCache().register(result.getCas(), super.getMessageContext(), result.getDeserSharedData(),
result.getReuseInfo(), inputCasReferenceId, result.getMarker(), result.acceptsDeltaCas());
saveStats(inputCasCacheEntry, inTime, t1, result.getTimeWaitingForCAS());
@@ -250,12 +250,12 @@ public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "executeRemoteRequest",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
- new Object[] { mc.getEndpoint().getEndpoint() });
+ new Object[] { super.getEndpoint().getEndpoint() });
}
process(inputCasCacheEntry, waitForCompletion);
} catch (Exception e) {
- super.handleError(e, inputCasCacheEntry, mc);
+ super.handleError(e, inputCasCacheEntry);
}
}
@@ -267,7 +267,7 @@ public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
// Process the CAS
// *****************************************************************
if (controller.isPrimitive()) {
- controller.process(entry.getCas(), entry.getCasReferenceId(), mc.getEndpoint());
+ controller.process(entry.getCas(), entry.getCasReferenceId(), super.getEndpoint());
} else {
controller.process(entry.getCas(), entry.getCasReferenceId());
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
index 992f295..2353a54 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
@@ -60,22 +60,22 @@ import org.apache.uima.util.Level;
public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+ //private MessageContext mc;
public ProcessInputCasResponseCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+ //this.mc = mc;
}
public void execute() throws Exception {
- int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
- String casReferenceId = super.getCasReferenceId(this.getClass(), mc);
- String msgFrom = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ int payload = super.getMessageIntProperty(AsynchAEMessage.Payload);
+ String casReferenceId = super.getCasReferenceId(this.getClass());
+ String msgFrom = super.getMessageStringProperty(AsynchAEMessage.MessageFrom);
System.out.println(">>>>>>>>>>>>>>> Controller:" + controller.getComponentName()
+ " in ProcessInputCasResponseCommand.execute() - Input CAS:" + casReferenceId + " from "
- + mc.getMessageStringProperty(AsynchAEMessage.MessageFrom)+" Payload:"+payload);
+ + super.getMessageStringProperty(AsynchAEMessage.MessageFrom)+" Payload:"+payload);
if (casReferenceId == null) {
// LOG THIS
@@ -98,7 +98,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
executeRemoteRequest(casReferenceId);
} else if (failedProcessReply(payload)) {
if (key == null) {
- key = ((Endpoint) mc.getEndpoint()).getEndpoint();
+ key = ((Endpoint) super.getEndpoint()).getEndpoint();
}
handleProcessResponseWithException(key);
} else {
@@ -117,16 +117,16 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
private boolean remoteProcessReply(int payload) {
return (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload);
}
- private Object getCause() throws AsynchAEException {
+ private Object getCause() throws Exception {
Object object = null;
- if ( ( object = mc.getObjectMessage() ) == null ) {
+ if ( ( object = super.getObjectMessage() ) == null ) {
// Could be a C++ exception. In this case the exception is just a String in the message
// cargo
- if (mc.getStringMessage() != null) {
- object = new UimaEEServiceException(mc.getStringMessage());
+ if (super.getStringMessage() != null) {
+ object = new UimaEEServiceException(super.getStringMessage());
} else {
- object = mc.getMessageObjectProperty(AsynchAEMessage.ErrorCause);
+ object = super.getMessageObjectProperty(AsynchAEMessage.ErrorCause);
}
}
@@ -142,17 +142,17 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_handling_exception_from_delegate_FINE",
new Object[] { controller.getName(),
- mc.getEndpoint().getEndpoint() });
+ super.getEndpoint().getEndpoint() });
}
boolean isCpCError = false;
String casReferenceId = null;
try {
// If a Process Request, increment number of docs processed
- if (mc.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response
- && mc.getMessageIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.Process) {
+ if (super.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response
+ && super.getMessageIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.Process) {
// Increment number of CASes processed by a delegate
- incrementDelegateProcessCount(mc);
+ incrementDelegateProcessCount( );
}
Object object = getCause();
if (ignoreException(object)) {
@@ -160,13 +160,13 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
if (controller instanceof AggregateAnalysisEngineController
- && mc.propertyExists(AsynchAEMessage.Command)
- && mc.getMessageIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.CollectionProcessComplete) {
+ && super.propertyExists(AsynchAEMessage.Command)
+ && super.getMessageIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.CollectionProcessComplete) {
isCpCError = true;
((AggregateAnalysisEngineController) controller)
.processCollectionCompleteReplyFromDelegate(delegateKey, false);
} else {
- casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ casReferenceId = super.getMessageStringProperty(AsynchAEMessage.CasReference);
}
if (object != null && (object instanceof Exception || object instanceof Throwable)) {
@@ -180,14 +180,14 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
+ controllerName + " Received Exception " + casid_msg + " From Delegate:"
+ delegateKey, (Exception) object);
ErrorContext errorContext = new ErrorContext();
- errorContext.add(AsynchAEMessage.Command, mc
+ errorContext.add(AsynchAEMessage.Command, super
.getMessageIntProperty(AsynchAEMessage.Command));
- errorContext.add(AsynchAEMessage.MessageType, mc
+ errorContext.add(AsynchAEMessage.MessageType, super
.getMessageIntProperty(AsynchAEMessage.MessageType));
if (!isCpCError) {
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
}
- errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint());
controller.getErrorHandlerChain().handle(remoteException, errorContext,
controller);
}
@@ -195,7 +195,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
- errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint());
controller.getErrorHandlerChain().handle(e, errorContext, controller);
}
@@ -230,7 +230,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
CacheEntry cacheEntry = null;
Delegate delegate = null;
try {
- casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ casReferenceId = super.getMessageStringProperty(AsynchAEMessage.CasReference);
// find an entry for a given cas id in a global cache
cacheEntry = super.getCacheEntryForCas(casReferenceId);
if (cacheEntry == null) {
@@ -240,7 +240,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
// find an entry for a given cas id in this aggregate's local cache
CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
// find delegate which sent the reply
- delegate = super.getDelegate(mc);
+ delegate = super.getDelegate();
if (casStateEntry != null) {
casStateEntry.setReplyReceived();
casStateEntry.setLastDelegate(delegate);
@@ -251,22 +251,22 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
delegate.removeCasFromOutstandingList(casReferenceId);
if (cacheEntry.getCas() != null) {
- computeStats(mc, cacheEntry);
+ computeStats(cacheEntry);
((AggregateAnalysisEngineController) controller).process(cacheEntry.getCas(), casReferenceId);
} else {
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "executeDirectRequest",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_not_in_cache__INFO",
- new Object[] { controller.getName(), casReferenceId, mc.getEndpoint().getEndpoint() });
+ new Object[] { controller.getName(), casReferenceId, super.getEndpoint().getEndpoint() });
}
throw new AsynchAEException(
"CAS with Reference Id:" + casReferenceId + " Not Found in CasManager's CAS Cache");
}
} catch (Exception e) {
- super.handleError(e, cacheEntry, mc);
+ super.handleError(e, cacheEntry);
} finally {
- incrementDelegateProcessCount(mc);
+ incrementDelegateProcessCount();
if (delegate != null) {
handleAbortedCasMultiplier(delegate, cacheEntry);
}
@@ -290,9 +290,9 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
//String casReferenceId = null;
Endpoint endpointWithTimer = null;
try {
- final int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+ final int payload = super.getMessageIntProperty(AsynchAEMessage.Payload);
//casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
- endpointWithTimer = lookupEndpoint(mc.getEndpoint().getEndpoint(),
+ endpointWithTimer = lookupEndpoint(super.getEndpoint().getEndpoint(),
casReferenceId);
if (endpointWithTimer == null) {
@@ -300,12 +300,12 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(),
"handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_invalid_endpoint__WARNING",
- new Object[] { mc.getEndpoint().getEndpoint(), casReferenceId });
+ new Object[] { super.getEndpoint().getEndpoint(), casReferenceId });
}
return;
}
String delegateKey = ((AggregateAnalysisEngineController) controller)
- .lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+ .lookUpDelegateKey(super.getEndpoint().getEndpoint());
Delegate delegate = ((AggregateAnalysisEngineController) controller)
.lookupDelegate(delegateKey);
boolean casRemovedFromOutstandingList = delegate.removeCasFromOutstandingList(casReferenceId);
@@ -327,14 +327,14 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
- String xmi = mc.getStringMessage();
+ String xmi = super.getStringMessage();
// Fetch entry from the cache for a given Cas Id. The entry contains a CAS that will be used
// during deserialization
CacheEntry cacheEntry = controller.getInProcessCache().getCacheEntryForCAS(
casReferenceId);
// check if the client requested Performance Metrics for the CAS
- if ( mc.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
+ if ( super.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
try {
// find top ancestor of this CAS. All metrics are accumulated there since
// this is what will be returned to the client
@@ -345,7 +345,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
if ( ancestor != null ) {
// fetch Performance Metrics from remote delegate reply
List<AnalysisEnginePerformanceMetrics> metrics =
- UimaSerializer.deserializePerformanceMetrics(mc.getMessageStringProperty(AsynchAEMessage.CASPerComponentMetrics));
+ UimaSerializer.deserializePerformanceMetrics(super.getMessageStringProperty(AsynchAEMessage.CASPerComponentMetrics));
List<AnalysisEnginePerformanceMetrics> adjustedMetrics =
new ArrayList<AnalysisEnginePerformanceMetrics>();
for(AnalysisEnginePerformanceMetrics delegateMetric : metrics ) {
@@ -422,14 +422,14 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
UIMAFramework.getLogger(getClass()).logrb(Level.FINEST, getClass().getName(),
"handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_rcvd_reply_FINEST",
- new Object[] { mc.getEndpoint().getEndpoint(), casReferenceId, xmi });
+ new Object[] { super.getEndpoint().getEndpoint(), casReferenceId, xmi });
}
long t1 = controller.getCpuTime();
/* --------------------- */
/** DESERIALIZE THE CAS. */
/* --------------------- */
//all subsequent serialization must be complete CAS.
- if ( !mc.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
+ if ( !super.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
cacheEntry.setAcceptsDeltaCas(false);
}
@@ -445,14 +445,14 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
new Object[] { casStateEntry.howManyDelegatesResponded(), casReferenceId });
}
// If a delta CAS, merge it while checking that no pre-existing FSs are modified.
- if (mc.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
+ if (super.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
switch (serialFormat) {
case XMI:
int highWaterMark = cacheEntry.getHighWaterMark();
deserialize(xmi, cas, casReferenceId, highWaterMark, AllowPreexistingFS.disallow);
break;
case COMPRESSED_FILTERED:
- deserialize(mc.getByteMessage(), cas, cacheEntry, endpointWithTimer.getTypeSystemImpl(), AllowPreexistingFS.disallow);
+ deserialize(super.getByteMessage(), cas, cacheEntry, endpointWithTimer.getTypeSystemImpl(), AllowPreexistingFS.disallow);
break;
default:
throw new UIMARuntimeException(new Exception("Internal error"));
@@ -471,7 +471,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
casStateEntry.incrementHowManyDelegatesResponded();
}
} else { // Processing a reply from a non-parallel delegate (binary or delta xmi or xmi)
- byte[] binaryData = mc.getByteMessage();
+ byte[] binaryData = super.getByteMessage();
ByteArrayInputStream istream = new ByteArrayInputStream(binaryData);
switch (serialFormat) {
case BINARY:
@@ -482,7 +482,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
bcs.deserialize(istream, AllowPreexistingFS.allow);
break;
case XMI:
- if (mc.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
+ if (super.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
int highWaterMark = cacheEntry.getHighWaterMark();
deserialize(xmi, cas, casReferenceId, highWaterMark, AllowPreexistingFS.allow);
} else {
@@ -505,7 +505,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
statistic.increment(timeToDeserializeCAS);
}
- computeStats(mc, casReferenceId);
+ computeStats( casReferenceId);
// Send CAS for processing when all delegates reply
// totalNumberOfParallelDelegatesProcessingCas indicates how many delegates are processing CAS
@@ -517,7 +517,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
if (totalNumberOfParallelDelegatesProcessingCas == 1
|| receivedAllResponsesFromParallelDelegates(casStateEntry,
totalNumberOfParallelDelegatesProcessingCas)) {
- invokeProcess(cas, casReferenceId, null, mc, null);
+ invokeProcess(cas, casReferenceId, null, null);
}
} catch (Exception e) {
@@ -550,10 +550,10 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
- errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint());
controller.getErrorHandlerChain().handle(e, errorContext, controller);
} finally {
- incrementDelegateProcessCount(mc);
+ incrementDelegateProcessCount();
}
@@ -562,13 +562,13 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
public void invokeProcess(CAS aCAS, String anInputCasReferenceId, String aNewCasReferenceId,
- MessageContext aMessageContext, String aNewCasProducedBy) throws AsynchAEException {
+ String aNewCasProducedBy) throws AsynchAEException {
try {
// Use empty string as key. Top level component stats are stored under this key.
controller.getMonitor().incrementCount("", Monitor.ProcessCount);
if (controller instanceof AggregateAnalysisEngineController) {
- boolean isNewCAS = aMessageContext.propertyExists(AsynchAEMessage.CasSequence);
+ boolean isNewCAS = super.propertyExists(AsynchAEMessage.CasSequence);
if (isNewCAS) {
((AggregateAnalysisEngineController) controller).process(aCAS, anInputCasReferenceId,
aNewCasReferenceId, aNewCasProducedBy);
@@ -577,7 +577,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
} else if (controller instanceof PrimitiveAnalysisEngineController) {
((PrimitiveAnalysisEngineController) controller).process(aCAS, anInputCasReferenceId,
- aMessageContext.getEndpoint());
+ super.getEndpoint());
} else {
throw new AsynchAEException(
"Invalid Controller. Expected AggregateController or PrimitiveController. Got:"
@@ -590,21 +590,21 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
- protected void computeStats(MessageContext aMessageContext, String aCasReferenceId)
- throws AsynchAEException {
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
+ protected void computeStats(String aCasReferenceId)
+ throws Exception {
+ if (super.propertyExists(AsynchAEMessage.TimeInService)) {
long departureTime = controller.getTime(aCasReferenceId,
- aMessageContext.getEndpoint().getEndpoint());
+ super.getEndpoint().getEndpoint());
long currentTime = System.nanoTime();
long roundTrip = currentTime - departureTime;
- long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+ long timeInService = super.getMessageLongProperty(AsynchAEMessage.TimeInService);
long totalTimeInComms = currentTime - (departureTime - timeInService);
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(getClass()).logrb( Level.FINE, getClass().getName(),
"computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_show_roundtrip_time__FINE",
- new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),
+ new Object[] { aCasReferenceId, super.getEndpoint(),
(double) roundTrip / (double) 1000000 });
UIMAFramework.getLogger(getClass()).logrb(
@@ -614,7 +614,7 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_show_time_spent_in_delegate__FINE",
new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000,
- aMessageContext.getEndpoint() });
+ super.getEndpoint() });
UIMAFramework.getLogger(getClass()).logrb(
Level.FINE,
@@ -623,28 +623,27 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_show_time_spent_in_comms__FINE",
new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000,
- aMessageContext.getEndpoint() });
+ super.getEndpoint() });
}
}
if (controller instanceof AggregateAnalysisEngineController) {
- aggregateDelegateStats(aMessageContext, aCasReferenceId);
+ aggregateDelegateStats(aCasReferenceId);
}
}
- protected synchronized void aggregateDelegateStats(MessageContext aMessageContext,
- String aCasReferenceId) throws AsynchAEException {
+ protected synchronized void aggregateDelegateStats(String aCasReferenceId) throws AsynchAEException {
String delegateKey = "";
try {
- if ( aMessageContext.getEndpoint().getEndpoint() == null || aMessageContext.getEndpoint().getEndpoint().trim().length()==0) {
- String fromEndpoint = aMessageContext
+ if ( super.getEndpoint().getEndpoint() == null || super.getEndpoint().getEndpoint().trim().length()==0) {
+ String fromEndpoint = super
.getMessageStringProperty(AsynchAEMessage.MessageFrom);
delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(fromEndpoint);
} else {
delegateKey = ((AggregateAnalysisEngineController) controller)
- .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ .lookUpDelegateKey(super.getEndpoint().getEndpoint());
}
// delegateKey = ((AggregateAnalysisEngineController) getController())
// .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
@@ -672,8 +671,8 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
.getServicePerformance(delegateKey);
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
- long timeToSerializeCAS = ((Long) aMessageContext
+ if (super.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+ long timeToSerializeCAS = ((Long) super
.getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
if (timeToSerializeCAS > 0) {
if (delegateServicePerformance != null) {
@@ -681,8 +680,8 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
- long timeToDeserializeCAS = ((Long) aMessageContext
+ if (super.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+ long timeToDeserializeCAS = ((Long) super
.getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
if (timeToDeserializeCAS > 0) {
if (delegateServicePerformance != null) {
@@ -691,21 +690,21 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
- long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime))
+ if (super.propertyExists(AsynchAEMessage.IdleTime)) {
+ long idleTime = ((Long) super.getMessageLongProperty(AsynchAEMessage.IdleTime))
.longValue();
if (idleTime > 0 && delegateServicePerformance != null) {
- Endpoint endp = aMessageContext.getEndpoint();
+ Endpoint endp = super.getEndpoint();
if (endp != null && endp.isRemote()) {
delegateServicePerformance.incrementIdleTime(idleTime);
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
- long timeWaitingForCAS = ((Long) aMessageContext
+ if (super.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+ long timeWaitingForCAS = ((Long) super
.getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
- if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+ if (timeWaitingForCAS > 0 && super.getEndpoint().isRemote()) {
entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
delegateServicePerformance.incrementCasPoolWaitTime(timeWaitingForCAS
- delegateServicePerformance.getRawCasPoolWaitTime());
@@ -714,10 +713,10 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
- long timeInProcessCAS = ((Long) aMessageContext
+ if (super.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+ long timeInProcessCAS = ((Long) super
.getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
- Endpoint endp = aMessageContext.getEndpoint();
+ Endpoint endp = super.getEndpoint();
if (endp != null && endp.isRemote()) {
if (delegateServicePerformance != null) {
// calculate the time spent in analysis. The remote service returns total time
@@ -815,8 +814,8 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
- private void incrementDelegateProcessCount(MessageContext aMessageContext) {
- Endpoint endpoint = aMessageContext.getEndpoint();
+ private void incrementDelegateProcessCount() {
+ Endpoint endpoint = super.getEndpoint();
if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
try {
String delegateKey = ((AggregateAnalysisEngineController) controller)
@@ -836,50 +835,50 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
- protected void computeStats(MessageContext aMessageContext, CacheEntry cacheEntry) throws AsynchAEException {
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
+ protected void computeStats(CacheEntry cacheEntry) throws Exception {
+ if (super.propertyExists(AsynchAEMessage.TimeInService)) {
long departureTime = controller.getTime(cacheEntry.getCasReferenceId(),
- aMessageContext.getEndpoint().getEndpoint());
+ super.getEndpoint().getEndpoint());
long currentTime = System.nanoTime();
long roundTrip = currentTime - departureTime;
- long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+ long timeInService = super.getMessageLongProperty(AsynchAEMessage.TimeInService);
long totalTimeInComms = currentTime - (departureTime - timeInService);
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
- new Object[] { cacheEntry.getCasReferenceId(), aMessageContext.getEndpoint(),
+ new Object[] { cacheEntry.getCasReferenceId(), super.getEndpoint(),
(double) roundTrip / (double) 1000000 });
UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
new Object[] { cacheEntry.getCasReferenceId(), (double) timeInService / (double) 1000000,
- aMessageContext.getEndpoint() });
+ super.getEndpoint() });
UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
new Object[] { cacheEntry.getCasReferenceId(), (double) totalTimeInComms / (double) 1000000,
- aMessageContext.getEndpoint() });
+ super.getEndpoint() });
}
}
if (controller instanceof AggregateAnalysisEngineController) {
- aggregateDelegateStats(aMessageContext, cacheEntry);
+ aggregateDelegateStats( cacheEntry);
}
}
- protected synchronized void aggregateDelegateStats(MessageContext aMessageContext, CacheEntry cacheEntry)
+ protected synchronized void aggregateDelegateStats(CacheEntry cacheEntry)
throws AsynchAEException {
String delegateKey = "";
try {
- if (aMessageContext.getEndpoint().getEndpoint() == null
- || aMessageContext.getEndpoint().getEndpoint().trim().length() == 0) {
- String fromEndpoint = aMessageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ if (super.getEndpoint().getEndpoint() == null
+ || super.getEndpoint().getEndpoint().trim().length() == 0) {
+ String fromEndpoint = super.getMessageStringProperty(AsynchAEMessage.MessageFrom);
delegateKey = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(fromEndpoint);
} else {
delegateKey = ((AggregateAnalysisEngineController) controller)
- .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ .lookUpDelegateKey(super.getEndpoint().getEndpoint());
}
CacheEntry parentCasEntry = null;
String parentCasReferenceId = cacheEntry.getInputCasReferenceId();
@@ -897,8 +896,8 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
.getServicePerformance(delegateKey);
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
- long timeToSerializeCAS = ((Long) aMessageContext
+ if (super.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+ long timeToSerializeCAS = ((Long) super
.getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
if (timeToSerializeCAS > 0) {
if (delegateServicePerformance != null) {
@@ -906,8 +905,8 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
- long timeToDeserializeCAS = ((Long) aMessageContext
+ if (super.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+ long timeToDeserializeCAS = ((Long) super
.getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
if (timeToDeserializeCAS > 0) {
if (delegateServicePerformance != null) {
@@ -916,20 +915,20 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
- long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
+ if (super.propertyExists(AsynchAEMessage.IdleTime)) {
+ long idleTime = ((Long) super.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
if (idleTime > 0 && delegateServicePerformance != null) {
- Endpoint endp = aMessageContext.getEndpoint();
+ Endpoint endp = super.getEndpoint();
if (endp != null && endp.isRemote()) {
delegateServicePerformance.incrementIdleTime(idleTime);
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
- long timeWaitingForCAS = ((Long) aMessageContext
+ if (super.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+ long timeWaitingForCAS = ((Long) super
.getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
- if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+ if (timeWaitingForCAS > 0 && super.getEndpoint().isRemote()) {
cacheEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
delegateServicePerformance.incrementCasPoolWaitTime(
timeWaitingForCAS - delegateServicePerformance.getRawCasPoolWaitTime());
@@ -938,10 +937,10 @@ public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
}
}
}
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
- long timeInProcessCAS = ((Long) aMessageContext
+ if (super.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+ long timeInProcessCAS = ((Long) super
.getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
- Endpoint endp = aMessageContext.getEndpoint();
+ Endpoint endp = super.getEndpoint();
if (endp != null && endp.isRemote()) {
if (delegateServicePerformance != null) {
// calculate the time spent in analysis. The remote service returns total time
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessServiceInfoResponseCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessServiceInfoResponseCommand.java
index b98cef4..3571d99 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessServiceInfoResponseCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessServiceInfoResponseCommand.java
@@ -29,22 +29,22 @@ import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.util.Level;
public class ProcessServiceInfoResponseCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public ProcessServiceInfoResponseCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+ // this.mc = mc;
}
public void execute() throws Exception {
String casReferenceId = null;
try {
- casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ casReferenceId = super.getMessageStringProperty(AsynchAEMessage.CasReference);
if ( casReferenceId == null ) {
return; // nothing to do
}
- Endpoint freeCasEndpoint = mc.getEndpoint();
+ Endpoint freeCasEndpoint = super.getEndpoint();
CasStateEntry casStateEntry = ((AggregateAnalysisEngineController) controller)
.getLocalCache().lookupEntry(casReferenceId);
if (casStateEntry != null) {
@@ -52,7 +52,7 @@ public class ProcessServiceInfoResponseCommand extends AbstractUimaAsCommand {
// Fetch host IP where the CAS is being processed. When the UIMA AS service
// receives a CAS it immediately sends ServiceInfo Reply message containing
// IP of the host where the service is running.
- String serviceHostIp = mc.getMessageStringProperty(AsynchAEMessage.ServerIP);
+ String serviceHostIp = super.getMessageStringProperty(AsynchAEMessage.ServerIP);
if ( serviceHostIp != null ) {
casStateEntry.setHostIpProcessingCAS(serviceHostIp);
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java
index 22010d0..594005c 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java
@@ -26,15 +26,15 @@ import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.util.Level;
public class ReleaseCASRequestCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+ //private MessageContext mc;
public ReleaseCASRequestCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+ //this.mc = mc;
}
public void execute() throws Exception {
- String casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ String casReferenceId = super.getMessageStringProperty(AsynchAEMessage.CasReference);
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(),
"execute", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java
index d20fcdf..c29227e 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java
@@ -28,17 +28,17 @@ import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.util.Level;
public class StopRequestCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+ //private MessageContext mc;
public StopRequestCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+ //this.mc = mc;
}
public void execute() throws Exception {
try {
- String casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ String casReferenceId = super.getMessageStringProperty(AsynchAEMessage.CasReference);
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "execute",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_received_stop_request__INFO",
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsMessageProcessor.java b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsMessageProcessor.java
new file mode 100644
index 0000000..bf61089
--- /dev/null
+++ b/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsMessageProcessor.java
@@ -0,0 +1,22 @@
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.MessageProcessor;
+import org.apache.uima.aae.message.UimaAsMessage;
+
+public class UimaAsMessageProcessor implements MessageProcessor {
+
+ private AnalysisEngineController controller;
+
+ public UimaAsMessageProcessor(AnalysisEngineController ctlr) {
+ controller = ctlr;
+ }
+ @Override
+ public void process(MessageContext message ) throws Exception {
+ UimaAsCommand command =
+ CommandFactory.newCommand(message, controller);
+ command.execute();
+ }
+
+}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java b/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
index 67d03c9..a3e6454 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
@@ -25,7 +25,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
import org.apache.uima.aae.controller.AnalysisEngineController;
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java b/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java
index 4ad9c9e..747105b 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java
@@ -22,10 +22,15 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
+import org.apache.uima.aae.message.UimaAsMessage;
+import org.apache.uima.aae.message.UimaAsOrigin;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
-public class DirectMessage {
+public class DirectMessage implements UimaAsMessage {
private static final long serialVersionUID = 1L;
private Map<String, Object> stateMap =
@@ -78,7 +83,7 @@ public class DirectMessage {
store(AsynchAEMessage.MessageType, messageType);
return this;
}
- public DirectMessage withOrigin(String origin) {
+ public DirectMessage withOrigin(Origin origin) {
store(AsynchAEMessage.MessageFrom, origin);
return this;
}
@@ -103,6 +108,9 @@ public class DirectMessage {
store(AsynchAEMessage.ReplyToEndpoint, replyDestination);
return this;
}
+ public Object getReplyDestination() {
+ return stateMap.get(AsynchAEMessage.ReplyToEndpoint);
+ }
public DirectMessage withDelegateKey(Object delegateKey) {
store(AsynchAEMessage.DelegateKey, delegateKey);
return this;
@@ -122,4 +130,69 @@ public class DirectMessage {
public boolean propertyExists(String key) {
return stateMap.containsKey(key);
}
+ @Override
+ public String getMessageStringProperty(String aMessagePropertyName) throws AsynchAEException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public int getMessageIntProperty(String aMessagePropertyName) throws AsynchAEException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ @Override
+ public long getMessageLongProperty(String aMessagePropertyName) throws AsynchAEException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ @Override
+ public Object getMessageObjectProperty(String aMessagePropertyName) throws AsynchAEException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public boolean getMessageBooleanProperty(String aMessagePropertyName) throws AsynchAEException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ @Override
+ public Endpoint getEndpoint() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public String getStringMessage() throws AsynchAEException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public Object getObjectMessage() throws AsynchAEException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public byte[] getByteMessage() throws AsynchAEException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public Object getRawMessage() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public void setMessageArrivalTime(long anArrivalTime) {
+ // TODO Auto-generated method stub
+
+ }
+ @Override
+ public long getMessageArrivalTime() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ @Override
+ public String getEndpointName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
diff --git a/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java b/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
index 522a1fb..0b527a1 100644
--- a/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
+++ b/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
@@ -23,6 +23,7 @@ import org.apache.uima.aae.controller.Endpoint_impl;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.Origin;
import org.apache.uima.cas.SerialFormat;
public class DirectMessageContext implements MessageContext {
@@ -47,6 +48,8 @@ public class DirectMessageContext implements MessageContext {
endpoint.setEndpoint(anEndpointName);
endpoint.setReplyDestination(message.get(AsynchAEMessage.ReplyToEndpoint));
endpoint.setDelegateKey(message.getAsString(AsynchAEMessage.DelegateKey));
+ endpoint.setMessageOrigin((Origin)message.get(AsynchAEMessage.MessageFrom));
+
StringBuilder sb = new StringBuilder();
if ( controllerName != null && !controllerName.trim().isEmpty()) {
sb.append("Service:"+controllerName+" ");
diff --git a/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java b/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
index 33b02fe..ea9609f 100644
--- a/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
+++ b/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
@@ -55,6 +55,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.DestinationDoesNotExistException;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMARuntimeException;
@@ -407,9 +408,8 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
}
private void addMessage(PendingMessage msg) {
- System.out.println("Client addMessage() - adding message to pendingMessageQueue");
-
- pendingMessageQueue.add(msg);
+ System.out.println("Client addMessage() - adding message to pendingMessageQueue - queue hashcode:"+pendingMessageQueue.hashCode());
+ pendingMessageQueue.add(msg);
}
protected void acquireCpcReadySemaphore() {
@@ -1044,15 +1044,19 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
}
return requestToCache.getCasReferenceId();
}
- SharedConnection sharedConnection = lookupConnection(getBrokerURI());
-
- if ( sharedConnection != null && !sharedConnection.isOpen() ) {
- if (requestToCache != null && !requestToCache.isSynchronousInvocation() && aCAS != null ) {
- aCAS.release();
- }
- throw new ResourceProcessException(new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+sharedConnection.getBroker()+" Has Been Lost"));
+ if ( isServiceRemote() ) {
+// if ( !Transport.Java.toString().toLowerCase().equals(getBrokerURI().toLowerCase())) {
+ SharedConnection sharedConnection = lookupConnection(getBrokerURI());
+
+ if ( sharedConnection != null && !sharedConnection.isOpen() ) {
+ if (requestToCache != null && !requestToCache.isSynchronousInvocation() && aCAS != null ) {
+ aCAS.release();
+ }
+ throw new ResourceProcessException(new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+sharedConnection.getBroker()+" Has Been Lost"));
+ }
}
+
// Incremented number of outstanding CASes sent to a service. When a reply comes
// this counter is decremented
outstandingCasRequests.incrementAndGet();
@@ -1721,7 +1725,7 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
}
int payload = -1;
String casReferenceId = message.asString(AsynchAEMessage.CasReference);
-
+ System.out.println("Client processing reply with CAS Id:"+casReferenceId);
// beforeProcessReply(casReferenceId);
// Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
@@ -1777,6 +1781,7 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
// cachedRequest is only null if we are receiving child CASes from a
// Cas Multiplier. Otherwise, we drop the message as it is out of band
if ( cachedRequest == null && !casMultiplierDelegate ) {
+ System.out.println("............... cachedRequest is null - not processing ");
// most likely a reply came in after the thread was interrupted
return;
}
@@ -2414,6 +2419,8 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
System.out.println("1 /////////////////////////////////// Client.hashCode()-"+this.hashCode()+" calling removeFromCache()- CAS:"+message.asString(AsynchAEMessage.CasReference));
removeFromCache(message.asString(AsynchAEMessage.CasReference));
+
+ System.out.println("1 /////////////////////////////////// removeFromCache() returned");
}
}
}
@@ -2526,6 +2533,7 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
break;
case AsynchAEMessage.Process: // received Process reply from a service
System.out.println("onMessage() - recv'd Process reply");
+
handleProcessReply(message, true, null);
break;