You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2020/06/10 13:15:11 UTC
[uima-async-scaleout] 27/34: UIMA-5501 refactored to use pluggagble
endpoints
This is an automated email from the ASF dual-hosted git repository.
cwiklik pushed a commit to branch uima-as-3
in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git
commit 2a1f37fe4ed6afad66cf1e5582998a271b1b6207
Author: cwiklik <cwiklik>
AuthorDate: Thu Nov 29 17:31:09 2018 +0000
UIMA-5501 refactored to use pluggagble endpoints
---
uimaj-as-connectors/pom.xml | 66 ++-
.../as/connectors/direct/DirectUimaAsConsumer.java | 142 +++---
.../as/connectors/direct/DirectUimaAsEndpoint.java | 550 ++++++++++++++++-----
.../as/connectors/direct/DirectUimaAsProducer.java | 60 ++-
.../mockup/MockUpAnalysisEngineController.java | 55 ++-
.../as/connectors/mockup/TestMessageProcessor.java | 4 +-
.../connectors/direct/DirectUimaAsConnector.class | Bin 1276 -> 0 bytes
.../direct/DirectUimaAsConsumer$1$1.class | Bin 2023 -> 0 bytes
.../connectors/direct/DirectUimaAsConsumer$1.class | Bin 3352 -> 0 bytes
...rectUimaAsConsumer$DirectListenerCallback.class | Bin 1358 -> 0 bytes
.../connectors/direct/DirectUimaAsConsumer.class | Bin 8575 -> 0 bytes
...int$MockupService$ServiceMessageProcessor.class | Bin 2619 -> 0 bytes
.../DirectUimaAsEndpoint$MockupService.class | Bin 3518 -> 0 bytes
.../connectors/direct/DirectUimaAsEndpoint.class | Bin 8402 -> 0 bytes
.../connectors/direct/DirectUimaAsProducer.class | Bin 1534 -> 0 bytes
.../mockup/MockUpAnalysisEngineController.class | Bin 16652 -> 0 bytes
.../connectors/mockup/TestMessageProcessor.class | Bin 1378 -> 0 bytes
17 files changed, 663 insertions(+), 214 deletions(-)
diff --git a/uimaj-as-connectors/pom.xml b/uimaj-as-connectors/pom.xml
index da30e8c..2bb5f29 100644
--- a/uimaj-as-connectors/pom.xml
+++ b/uimaj-as-connectors/pom.xml
@@ -8,8 +8,31 @@
</parent>
<artifactId>uimaj-as-connectors</artifactId>
-
-
+ <name>Apache UIMA-AS: ${project.artifactId}</name>
+ <description>UIMA-AS Connectors</description>
+ <url>${uimaWebsiteUrl}</url>
+
+ <!-- Special inheritance note even though the <scm> element that follows
+ is exactly the same as those in super poms, it cannot be inherited because
+ there is some special code that computes the connection elements from the
+ chain of parent poms, if this is omitted. Keeping this a bit factored allows
+ cutting/pasting the <scm> element, and just changing the following two properties -->
+ <scm>
+ <connection>
+ scm:svn:http://svn.apache.org/repos/asf/uima/uima-as/tags/uima-as-2.10.2/uimaj-as-connectors
+ </connection>
+ <developerConnection>
+ scm:svn:https://svn.apache.org/repos/asf/uima/uima-as/tags/uima-as-2.10.2/uimaj-as-connectors
+ </developerConnection>
+ <url>
+ http://svn.apache.org/viewvc/uima/uima-as/tags/uima-as-2.10.2/uimaj-as-connectors
+ </url>
+ </scm>
+
+ <properties>
+ <uimaScmProject>${project.artifactId}</uimaScmProject>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.uima</groupId>
@@ -17,9 +40,44 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
-
-
+
</dependencies>
+ <build>
+ <finalName>${project.artifactId}</finalName>
+ <plugins>
+ <!-- This plugin makes the tests run by - giving more memory to them,
+ and - eliminating the very slow integration test -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>-Xmx300M</argLine>
+ <includes>
+ <include>**/TestUimaASBasic.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-cli</id>
+ <configuration>
+ <excludes>
+ <exclude>release.properties</exclude> <!-- release generated artifact -->
+ <exclude>src/test/resources/data/DoubleByteText.txt</exclude> <!-- test data -->
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
</project>
\ No newline at end of file
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java
index a52b1b1..b87308d 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java
@@ -2,28 +2,26 @@ package org.apache.uima.as.connectors.direct;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.controller.AnalysisEngineController;
-import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.definition.connectors.AbstractUimaAsConsumer;
+import org.apache.uima.aae.definition.connectors.Initializer;
import org.apache.uima.aae.definition.connectors.ListenerCallback;
import org.apache.uima.aae.definition.connectors.UimaAsConsumer;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.message.MessageProcessor;
-import org.apache.uima.aae.spi.transport.vm.UimaVmQueue;
+import org.apache.uima.aae.message.Target;
+import org.apache.uima.aae.message.UimaAsTarget;
import org.apache.uima.as.client.DirectMessage;
import org.apache.uima.as.client.DirectMessageContext;
public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
-
+ //private static final String DIRECT = "direct:";
private BlockingQueue<DirectMessage> inQueue= new LinkedBlockingQueue<>();;
private MessageProcessor processor;
private boolean started = false;
@@ -31,38 +29,60 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
private final ConsumerType consumerType;
private int consumerThreadCount = 1;
private boolean doStop = false;
- private final CountDownLatch latchToCountNumberOfInitedThreads;
- private final CountDownLatch latchToCountNumberOfTerminatedThreads;
+// private final CountDownLatch latchToCountNumberOfInitedThreads;
+// private final CountDownLatch latchToCountNumberOfTerminatedThreads;
private AnalysisEngineController controller;
private final String name;
+ private Initializer initializer;
+ private DirectListenerCallback callback = new DirectListenerCallback(this);
+ private Target target;
+ private UimaAsConsumer delegate;
+
public DirectUimaAsConsumer(String name, BlockingQueue<DirectMessage> inQueue, ConsumerType type, int consumerThreadCount) {
this(name, type,consumerThreadCount);
this.inQueue = inQueue;
}
- public DirectUimaAsConsumer(String name,String targetUri, ConsumerType type, int consumerThreadCount) {
- this(name, type,consumerThreadCount);
-
- }
+// public DirectUimaAsConsumer(String name, ConsumerType type, int consumerThreadCount) {
+// this(name, type, consumerThreadCount);
+//
+// }
- private DirectUimaAsConsumer( String name, ConsumerType type, int consumerThreadCount) {
- this.name = name;
+ public DirectUimaAsConsumer( String name, ConsumerType type, int consumerThreadCount) {
+ if ( name.indexOf(EndpointType.Direct.getName()) > -1 ) {
+ this.name = name;
+ } else {
+ this.name = EndpointType.Direct.getName()+name;
+ }
+
this.consumerType = type;
this.consumerThreadCount = consumerThreadCount;
- latchToCountNumberOfInitedThreads = new CountDownLatch(consumerThreadCount);
- latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerThreadCount);
+// latchToCountNumberOfInitedThreads = new CountDownLatch(consumerThreadCount);
+// latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerThreadCount);
+ target = new UimaAsTarget(name, EndpointType.Direct);
+ }
+ public void setInitializer(Initializer initializer) {
+ this.initializer = initializer;
+ }
+ public Target getTarget() {
+ return target;
+ }
+ public int getConsumerCount() {
+ return consumerThreadCount;
}
-
public ConsumerType getType() {
return consumerType;
}
-
+ public void delegateTo(UimaAsConsumer delegateConsumer) {
+ delegate = delegateConsumer;
+ }
protected void setMessageProcessor(MessageProcessor processor) {
this.processor = processor;
}
public void initialize() throws Exception {
-
+ initializer = new DefaultInitializer(consumerThreadCount);
+ executor = initializer.initialize(callback);
}
/**
@@ -71,50 +91,24 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
*/
@Override
public void consume(DirectMessage message) throws Exception {
- inQueue.add(message);
- }
-
- private void initializeUimaPipeline() throws Exception {
-// workQueue = new UimaVmQueue();
- if ( controller.isPrimitive() ) {
- ThreadGroup threadGroup = new ThreadGroup("VmThreadGroup" + 1 + "_" + controller.getComponentName());
- executor = new ThreadPoolExecutor(consumerThreadCount, consumerThreadCount, Long.MAX_VALUE, TimeUnit.DAYS, new UimaVmQueue());
- UimaAsThreadFactory tf = null;
-
- DirectListenerCallback callback = new DirectListenerCallback(this);
-
- tf = new UimaAsThreadFactory().
- withCallback(callback).
- withThreadGroup(threadGroup).
- withPrimitiveController((PrimitiveAnalysisEngineController)processor.getController()).
- withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
- withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
- tf.setDaemon(true);
- ((ThreadPoolExecutor)executor).setThreadFactory(tf);
- ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
- latchToCountNumberOfInitedThreads.await();
- if ( callback.failedInitialization() ) {
- throw callback.getException();
- }
- System.out.println("Executor Started - All Process Threads Initialized");
+ // if this consumer has a delegate it does not handle
+ // messages itself. Instead messages are passed to the
+ // delegate consumer. An example of this is CPC Consumer
+ // which must delegate CPC requests to ProcessCASConsumer
+ // since CPC requires access to AE instance which is
+ // only associated with Process Cas Consumers.
+ if ( Objects.nonNull(delegate)) {
+ delegate.consume(message);
} else {
- executor = Executors.newFixedThreadPool(consumerThreadCount);
+ inQueue.add(message);
}
}
+
public void initialize(AnalysisEngineController controller) throws Exception {
this.controller = controller;
// Consumer handling ProcessCAS must first initialize each
// UIMA pipeline.
- if (ConsumerType.ProcessCAS.equals(consumerType)) {
- if ( Objects.isNull(controller)) {
- executor = Executors.newFixedThreadPool(consumerThreadCount);
- } else {
- initializeUimaPipeline();
- }
-
- } else {
- executor = Executors.newFixedThreadPool(consumerThreadCount);
- }
+ executor = initializer.initialize(callback);
}
private boolean stopConsumingMessages(DirectMessage message ) throws Exception{
@@ -128,6 +122,15 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
if ( started ) {
return;
}
+ if ( Objects.isNull(executor)) {
+ try {
+ initialize();
+ } catch( Exception e) {
+ e.printStackTrace();
+ return;
+ }
+
+ }
System.out.println(">>> "+name+" DirectConsumer.start() - Consumer Type:"+getType());
new Thread() {
@Override
@@ -139,8 +142,10 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
try {
final DirectMessage message = inQueue.take(); //blocks if empty
- System.out.println(">>> "+name+" DirectConsumer.run() - Consumer Type:"+getType()+" Got new message");
-
+ //System.out.println(">>> "+name+" DirectConsumer.run() - Consumer Type:"+getType()+" Got new message");
+ System.out.println(getType()+" Consumer Received Message From:"+
+ message.getOrigin());
+
if ( stopConsumingMessages(message)) { // special type of msg indicating end of processing
System.out.println(">>> "+name+" Got END message - Stopping Queue Consumer");
doStop = true;
@@ -149,13 +154,9 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
public void run() {
try {
- //System.out.println(">>> "+controller.getComponentName()+" Got new message - processing on thread "+Thread.currentThread().getName()+" channel:"+getType());
- //ic.onMessage(message);
-
// every message is wrapped in the MessageContext
MessageContext messageContext =
- new DirectMessageContext(message, "", controller.getComponentName());
-
+ new DirectMessageContext(message, "", name);
processor.process(messageContext);
} catch( Exception e) {
e.printStackTrace();
@@ -202,4 +203,17 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
}
}
+ private class DefaultInitializer implements Initializer {
+ private final int consumerThreadCount;
+
+ public DefaultInitializer(int consumerThreadCount) {
+ this.consumerThreadCount = consumerThreadCount;
+ }
+
+ @Override
+ public ExecutorService initialize(ListenerCallback callback) throws Exception {
+ return Executors.newFixedThreadPool(consumerThreadCount);
+ }
+
+ }
}
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java
index 012c89a..f22b107 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java
@@ -1,139 +1,455 @@
package org.apache.uima.as.connectors.direct;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.uima.UimaContext;
-import org.apache.uima.aae.AsynchAECasManager;
-import org.apache.uima.aae.InProcessCache;
-import org.apache.uima.aae.InputChannel;
-import org.apache.uima.aae.OutputChannel;
-import org.apache.uima.aae.UimaEEAdminContext;
+
import org.apache.uima.aae.controller.AnalysisEngineController;
-import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
-import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
-import org.apache.uima.aae.controller.ControllerCallbackListener;
-import org.apache.uima.aae.controller.ControllerLatch;
-import org.apache.uima.aae.controller.Endpoint;
-import org.apache.uima.aae.controller.EventSubscriber;
-import org.apache.uima.aae.controller.LocalCache;
+import org.apache.uima.aae.controller.PrimitiveAeInitializer;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.definition.connectors.Initializer;
import org.apache.uima.aae.definition.connectors.UimaAsConsumer;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
import org.apache.uima.aae.definition.connectors.UimaAsProducer;
-import org.apache.uima.aae.error.AsynchAEException;
-import org.apache.uima.aae.error.ErrorContext;
-import org.apache.uima.aae.error.ErrorHandlerChain;
-import org.apache.uima.aae.jmx.JmxManagement;
-import org.apache.uima.aae.jmx.ServiceErrors;
-import org.apache.uima.aae.jmx.ServiceInfo;
-import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageBuilder;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.message.MessageProcessor;
+import org.apache.uima.aae.message.Origin;
+import org.apache.uima.aae.message.ServiceMessageProcessor;
+import org.apache.uima.aae.message.Target;
import org.apache.uima.aae.message.UimaAsOrigin;
-import org.apache.uima.aae.monitor.Monitor;
-import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
-import org.apache.uima.aae.service.command.UimaAsMessageProcessor;
-import org.apache.uima.aae.spi.transport.UimaMessageListener;
-import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.aae.message.UimaAsTarget;
import org.apache.uima.as.client.DirectMessage;
-import org.apache.uima.as.client.DirectMessageContext;
-import org.apache.uima.as.client.Listener;
import org.apache.uima.as.connectors.mockup.MockUpAnalysisEngineController;
+import org.apache.uima.as.connectors.mockup.TestClientMessageProcessor;
import org.apache.uima.as.connectors.mockup.TestMessageProcessor;
-import org.apache.uima.cas.CAS;
-import org.apache.uima.resource.ResourceSpecifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
public class DirectUimaAsEndpoint implements UimaAsEndpoint {
- private Map<String,UimaAsConsumer> consumers = new ConcurrentHashMap<>();
- private Map<String,UimaAsProducer> producers = new ConcurrentHashMap<>();
- private final MessageProcessor processor;
- private final String name;
+ private Map<Target,UimaAsConsumer> consumers = new ConcurrentHashMap<>();
+ private Map<Origin,UimaAsProducer> producers = new ConcurrentHashMap<>();
+
+ private List<UimaAsProducer> getMetaRequestProducers = new ArrayList<>();
+ private List<UimaAsProducer> processCASRequestProducers = new ArrayList<>();
+ private List<UimaAsProducer> cpcRequestProducers = new ArrayList<>();
+ private List<UimaAsProducer> freeCASRequestProducers = new ArrayList<>();
+
+ private List<UimaAsProducer> getMetaResponseProducers = new ArrayList<>();
+ private List<UimaAsProducer> processCASResponseProducers = new ArrayList<>();
+ private List<UimaAsProducer> cpcResponseProducers = new ArrayList<>();
+
+ private List<UimaAsConsumer> getMetaRequestConsumers = new ArrayList<>();
+ private List<UimaAsConsumer> processCASRequestConsumers = new ArrayList<>();
+ private List<UimaAsConsumer> cpcRequestConsumers = new ArrayList<>();
+ private List<UimaAsConsumer> freeCASRequestConsumers = new ArrayList<>();
+
+ private List<UimaAsConsumer> getMetaResponseConsumers = new ArrayList<>();
+ private List<UimaAsConsumer> processCASResponseConsumers = new ArrayList<>();
+ private List<UimaAsConsumer> cpcResponseConsumers = new ArrayList<>();
+
+ private Map<String, Target> targets = new HashMap<>();
- public DirectUimaAsEndpoint(MessageProcessor processor, String name) {
+ private MessageProcessor processor;
+ //private final String name;
+ private final EndpointType type = EndpointType.Direct;
+
+ private final Origin origin;
+ // this ctor is used for Producer type endpoints.
+ public DirectUimaAsEndpoint(final String name) {
+
+ origin = new UimaAsOrigin(name, EndpointType.Direct);
+/*
+ if ( name.indexOf(EndpointType.Direct.getName()) > -1 ) {
+ origin = new UimaAsOrigin(name);
+ } else {
+ origin = new UimaAsOrigin(EndpointType.Direct.getName()+name);
+ }
+ */
+// this.name = name;
+ }
+ // this ctor is used for Consumer type endpoints
+ public DirectUimaAsEndpoint(MessageProcessor processor,String name) {
+ this(name);
this.processor = processor;
- this.name = name;
+ if ( this.processor == null ) {
+ System.out.println("...........DirectUimaAsEndpoint(MessageProcessor processor,String name).......processor is null............");
+ Thread.currentThread().dumpStack();
+ }
}
- public MessageContext createMessage(int command, int messageType, Endpoint endpoint) {
- DirectMessage message =
- new DirectMessage().
- withCommand(command).
- withMessageType(messageType).
- withOrigin(processor.getController().getOrigin());
-
- MessageContext messageContext =
- new DirectMessageContext(message,name,name);
+ public Origin getOrigin() {
+ return origin;
+ }
+ @Override
+ public Map<Target, UimaAsConsumer> getConsumers() {
+ return consumers;
+ }
+
+ public String getName() {
+ return origin.getName();
+ }
+
+ @Override
+ public EndpointType getType() {
+ return type;
+ }
+
+ @Override
+ public MessageBuilder newMessageBuilder() {
+ return new DirectMessageBuilder(this);
+ }
+
+ private ConsumerType consumerTypeForRequestCommand(int command) {
+ ConsumerType type = null;
+ switch(command) {
+ case AsynchAEMessage.GetMeta:
+ type = ConsumerType.GetMetaRequest;
+ break;
+ case AsynchAEMessage.Process:
+ type = ConsumerType.ProcessCASRequest;
+ break;
+ case AsynchAEMessage.CollectionProcessComplete:
+ type = ConsumerType.CpcRequest;
+ break;
+ case AsynchAEMessage.ReleaseCAS:
+ type = ConsumerType.FreeCASRequest;
+ break;
+ }
+ return type;
+ }
+ private ConsumerType consumerTypeForResponseCommand(int command) {
+ ConsumerType type = null;
+ switch(command) {
+ case AsynchAEMessage.GetMeta:
+ type = ConsumerType.GetMetaResponse;
+ break;
+ case AsynchAEMessage.Process:
+ type = ConsumerType.ProcessCASResponse;
+ break;
+ case AsynchAEMessage.CollectionProcessComplete:
+ type = ConsumerType.CpcResponse;
+ }
+ return type;
+ }
+
+ public ConsumerType type(MessageContext messageContext) throws Exception {
+ int command =
+ messageContext.getMessageIntProperty(AsynchAEMessage.Command);
+ int msgType =
+ messageContext.getMessageIntProperty(AsynchAEMessage.MessageType);
- message.withCommand(command).
- withMessageType(messageType);
+ ConsumerType type = null;
- if ( command == AsynchAEMessage.GetMeta && messageType == AsynchAEMessage.Response ) {
- message.withMetadata(null);
-// message.withMetadata(processor.getController().getResourceSpecifier().);
+ if ( AsynchAEMessage.Request == msgType ) {
+ type = consumerTypeForRequestCommand(command);
+ } else if ( AsynchAEMessage.Response == msgType ) {
+ type = consumerTypeForResponseCommand(command);
+ } else {
+
}
-
- return messageContext;
+
+ return type;
+
}
+ /*
public void dispatch(MessageContext messageContext) throws Exception {
+
UimaAsProducer producer;
- if ( !producers.containsKey(messageContext.getEndpoint().getDelegateKey())) {
+ if ( !producers.containsKey( ((DirectMessage)messageContext.getRawMessage()).getOrigin())) {
producer =
- createProducer((UimaAsConsumer)messageContext.getEndpoint().getDestination(), messageContext.getEndpoint().getDelegateKey());
+ createProducer((UimaAsConsumer)messageContext.getEndpoint().getReplyDestination(), ((DirectMessage)messageContext.getRawMessage()).getOrigin());
+ System.out.println(".............. Creating new Producer for endpoint:"+(UimaAsConsumer)messageContext.getEndpoint().getReplyDestination());
} else {
- producer = producers.get(messageContext.getEndpoint().getDelegateKey());
+ producer = producers.get(((DirectMessage)messageContext.getRawMessage()).getOrigin());
+ System.out.println(".............. Reusing existing producer with Origin:"+((DirectMessage)messageContext.getRawMessage()).getOrigin());
+ }
+ producer.dispatch((DirectMessage)messageContext.getRawMessage());
+ }
+ */
+ public void dispatch(MessageContext messageContext, String serviceUri) throws Exception {
+ if ( !serviceUri.startsWith(EndpointType.Direct.getName())) {
+ serviceUri = EndpointType.Direct.getName()+serviceUri;
+ }
+ Target target = targets.get(serviceUri);
+ UimaAsProducer producer = producerFor(type(messageContext), target);
+ if ( Objects.isNull(producer)) {
+ producer = createProducer((UimaAsConsumer)messageContext.getMessageObjectProperty(AsynchAEMessage.ReplyToEndpoint), serviceUri);
}
producer.dispatch((DirectMessage)messageContext.getRawMessage());
}
- public UimaAsProducer createProducer(UimaAsConsumer consumer, String delegateKey) throws Exception {
+ public void dispatch(MessageContext messageContext, Origin origin) throws Exception {
+
+ Target target = targets.get(origin.getName());
+ UimaAsProducer producer = producerFor(type(messageContext), target);
+/*
+ UimaAsProducer producer;
+ if ( !producers.containsKey( target ) ) {
+ producer =
+ createProducer((UimaAsConsumer)messageContext.getEndpoint().getReplyDestination(), target);
+ } else {
+ producer = producers.get(target);
+ }
+ */
+ producer.dispatch((DirectMessage)messageContext.getRawMessage());
+ }
+
+ private UimaAsProducer matchProducer(Target target, List<UimaAsProducer> producerList) {
+ UimaAsProducer producer = null;
+ for( UimaAsProducer p : producerList) {
+ if ( p.getTarget().equals(target)) {
+ producer = p;
+ }
+ }
- UimaAsProducer producer = new DirectUimaAsProducer(consumer);
- producers.put(delegateKey,producer);
return producer;
}
-
- public UimaAsProducer createProducer(String targetUri) throws Exception {
+ private UimaAsConsumer matchConsumer(Target target, List<UimaAsConsumer> consumerList) {
+ UimaAsConsumer consumer = null;
+ for( UimaAsConsumer c : consumerList) {
+ if ( c.getTarget().equals(target)) {
+ consumer = c;
+ }
+ }
- UimaAsProducer producer = new DirectUimaAsProducer(targetUri);
- producers.put(targetUri, producer);
+ return consumer;
+ }
+ private UimaAsProducer producerFor(ConsumerType type, Target target) {
+ UimaAsProducer producer = null;
+
+ switch(type) {
+ case GetMetaRequest:
+ producer = matchProducer( target, getMetaRequestProducers);
+ break;
+ case GetMetaResponse:
+ producer = matchProducer( target,getMetaResponseProducers);
+ break;
+
+ case ProcessCASRequest:
+ producer = matchProducer( target,processCASRequestProducers);
+ break;
+
+ case ProcessCASResponse:
+ producer = matchProducer( target,processCASResponseProducers);
+ break;
+
+ case CpcRequest:
+ producer = matchProducer( target,cpcRequestProducers);
+ break;
+
+ case CpcResponse:
+ producer = matchProducer( target,cpcResponseProducers);
+ break;
+
+ case FreeCASRequest:
+ producer = matchProducer( target,freeCASRequestProducers);
+ break;
+
+ default:
+ }
return producer;
}
+ private void categorizeProducer(UimaAsProducer producer) {
+
+ switch(producer.getType()) {
+ case GetMetaRequest:
+ getMetaRequestProducers.add(producer);
+ break;
+
+ case GetMetaResponse:
+ getMetaResponseProducers.add(producer);
+ break;
+
+ case ProcessCASRequest:
+ processCASRequestProducers.add(producer);
+ break;
+
+ case ProcessCASResponse:
+ processCASResponseProducers.add(producer);
+ break;
+
+ case CpcRequest:
+ cpcRequestProducers.add(producer);
+ break;
+
+ case CpcResponse:
+ cpcResponseProducers.add(producer);
+ break;
+
+ case FreeCASRequest:
+ freeCASRequestProducers.add(producer);
+ break;
+
+ default:
+ }
+ }
+ private void categorizeConsumer(UimaAsConsumer consumer) {
+
+ switch(consumer.getType()) {
+ case GetMetaRequest:
+ getMetaRequestConsumers.add(consumer);
+ break;
+
+ case GetMetaResponse:
+ getMetaResponseConsumers.add(consumer);
+ break;
+
+ case ProcessCASRequest:
+ processCASRequestConsumers.add(consumer);
+ break;
+
+ case ProcessCASResponse:
+ processCASResponseConsumers.add(consumer);
+ break;
+
+ case CpcRequest:
+ cpcRequestConsumers.add(consumer);
+ break;
+
+ case CpcResponse:
+ cpcResponseConsumers.add(consumer);
+ break;
+
+ case FreeCASRequest:
+ freeCASRequestConsumers.add(consumer);
+ break;
+ default:
+ }
+ }
+ protected UimaAsConsumer consumerFor(ConsumerType type, Target target) {
+ UimaAsConsumer consumer = null;
+
+ switch(type) {
+ case GetMetaRequest:
+ consumer = matchConsumer( target, getMetaRequestConsumers);
+ break;
+ case GetMetaResponse:
+ consumer = matchConsumer( target,getMetaResponseConsumers);
+ break;
+
+ case ProcessCASRequest:
+ consumer = matchConsumer( target,processCASRequestConsumers);
+ break;
+
+ case ProcessCASResponse:
+ consumer = matchConsumer( target,processCASResponseConsumers);
+ break;
+
+ case CpcRequest:
+ consumer = matchConsumer( target,cpcRequestConsumers);
+ break;
+
+ case CpcResponse:
+ consumer = matchConsumer( target,cpcResponseConsumers);
+ break;
+ case FreeCASRequest:
+ consumer = matchConsumer( target,freeCASRequestConsumers);
+ break;
- public UimaAsConsumer createConsumer(String targetUri, ConsumerType type, int consumerThreadCount) throws Exception {
+ default:
+ }
+ return consumer;
+ }
+ /*
+ public UimaAsProducer createProducer(UimaAsConsumer consumer, Origin origin) throws Exception {
- DirectUimaAsConsumer consumer = new DirectUimaAsConsumer(name, targetUri, type, consumerThreadCount);
- consumer.setMessageProcessor(processor);
- consumers.put(targetUri+type.name(), consumer);
+ UimaAsProducer producer = new DirectUimaAsProducer(consumer);
+ targets.put(origin.getName(), producer.getTarget());
+
+ producers.put(origin,producer);
+ categorizeProducer(producer);
+ return producer;
+ }
+ */
+ public UimaAsProducer createProducer(UimaAsConsumer consumer, String serviceUri) throws Exception {
+
+ UimaAsProducer producer = new DirectUimaAsProducer(serviceUri, consumer);
+ StringBuilder sb = new StringBuilder();
+ if ( serviceUri != null && serviceUri.startsWith(EndpointType.Direct.getName())) {
+ sb.append(serviceUri).append(":").append(consumer.getType().toString());
+
+ } else {
+ sb.append(EndpointType.Direct.getName()).append(serviceUri).append(":").append(consumer.getType().toString());
+
+ }
+ targets.put(sb.toString(), producer.getTarget());
+
+ categorizeProducer(producer);
+ return producer;
+ }
+
+ public UimaAsProducer createProducer(Origin origin) throws Exception {
+ // THIS SHOULD NOT BE CALLED
+ UimaAsProducer producer = new DirectUimaAsProducer(origin.getName());
+ targets.put(origin.getName(), producer.getTarget());
+ producers.put(origin, producer);
+ return producer;
+ }
+
+ public UimaAsConsumer createConsumer( ConsumerType type, int consumerThreadCount) throws Exception {
+ String cid = new StringBuilder(getName()).append(":").append(type.name()).toString();
+ UimaAsConsumer consumer;
+ Target target;
+ if ( (target = targets.get(cid)) == null ) {
+ consumer =
+ new DirectUimaAsConsumer(getName(), type, consumerThreadCount);
+ ((DirectUimaAsConsumer)consumer).setMessageProcessor(processor);
+ categorizeConsumer(consumer);
+ targets.put(cid, consumer.getTarget());
+ consumers.put(consumer.getTarget(), consumer);
+ } else {
+ consumer = consumers.get(target);
+ }
+
+ System.out.println(".......... Added new consumer - key:"+consumer.getTarget().getName()+":"+type.name()+" Consumer Count:"+consumers.size());
return consumer;
}
public UimaAsConsumer getConsumer(String targetUri, ConsumerType type) {
- return consumers.get(targetUri+type.name());
+// return consumers.get(targetUri+type.name());
+ Target target = targets.get(new StringBuilder(targetUri).append(":").append(type.name()).toString());
+ return consumerFor(type, target);
}
+
+
@Override
public void start() throws Exception {
- for(Entry<String, UimaAsConsumer> entry : consumers.entrySet()) {
- entry.getValue().initialize(processor.getController());
+ System.out.println("Consumer Count:"+consumers.size());
+ for(Entry<Target, UimaAsConsumer> entry : consumers.entrySet()) {
+ if ( ConsumerType.ProcessCASRequest.equals(entry.getValue().getType())) {
+ if ( processor == null ) {
+ System.out.println(".... Processor is null");
+ }
+
+ AnalysisEngineController controller = ((ServiceMessageProcessor)processor).getController();
+ if ( controller.isPrimitive() ) {
+ Initializer initializer =
+ new PrimitiveAeInitializer((PrimitiveAnalysisEngineController)controller, entry.getValue().getConsumerCount());
+ entry.getValue().setInitializer(initializer);
+ entry.getValue().initialize(controller);
+ }
+ }
entry.getValue().start();
}
- for(Entry<String, UimaAsProducer> entry : producers.entrySet()) {
+ for(Entry<Origin, UimaAsProducer> entry : producers.entrySet()) {
entry.getValue().start();
}
}
@Override
public void stop() throws Exception {
- for(Entry<String, UimaAsConsumer> entry : consumers.entrySet()) {
+ for(Entry<Target, UimaAsConsumer> entry : consumers.entrySet()) {
entry.getValue().stop();
}
- for(Entry<String, UimaAsProducer> entry : producers.entrySet()) {
+ for(Entry<Origin, UimaAsProducer> entry : producers.entrySet()) {
entry.getValue().stop();
}
@@ -142,12 +458,7 @@ public class DirectUimaAsEndpoint implements UimaAsEndpoint {
public static void main(String[] args) {
try {
MessageProcessor dummyProcessor =
- new TestMessageProcessor(null);
- //new TestMessageProcessor(new MockUpAnalysisEngineController("MockupClient", 4));
-
-// MessageProcessor processor =
- // new UimaAsMessageProcessor(null);
- //TestMessageProcessor processor = new TestMessageProcessor(null);
+ new TestClientMessageProcessor();
DirectUimaAsEndpoint endpoint =
new DirectUimaAsEndpoint(dummyProcessor, "Client");
@@ -158,26 +469,26 @@ public class DirectUimaAsEndpoint implements UimaAsEndpoint {
service.initialize();
service.start();
- endpoint.createConsumer("direct:", ConsumerType.GetMeta, 1);
- endpoint.createConsumer("direct:", ConsumerType.ProcessCAS, 4);
- endpoint.createConsumer("direct:", ConsumerType.Cpc, 1);
+// endpoint.createConsumer("direct:", ConsumerType.GetMetaResponse, 1);
+// endpoint.createConsumer("direct:", ConsumerType.ProcessCASResponse, 4);
+// endpoint.createConsumer("direct:", ConsumerType.CpcResponse, 1);
UimaAsProducer producer =
- endpoint.createProducer( "direct:serviceA");
+ endpoint.createProducer( new UimaAsOrigin(EndpointType.Direct.getName()+"serviceA", EndpointType.Direct));
endpoint.start();
- DirectMessage getMetaRequestMessage =
- new DirectMessage().
- withCommand(AsynchAEMessage.GetMeta).
- withMessageType(AsynchAEMessage.Request).
- withOrigin(new UimaAsOrigin("Client")).
- withReplyDestination(endpoint.getConsumer("direct:", ConsumerType.GetMeta)).
- withPayload(AsynchAEMessage.None);
+ MessageContext getMetaRequestMessage =
+ endpoint.newMessageBuilder()
+ .newGetMetaRequestMessage(new UimaAsOrigin("Client",EndpointType.Direct))
+ .withPayload(AsynchAEMessage.None)
+ .withReplyDestination(endpoint.getConsumer("direct:", ConsumerType.GetMetaResponse))
+ .build();
+
UimaAsConsumer target =
- service.getEndpoint().getConsumer("direct:", ConsumerType.GetMeta);
+ service.getEndpoint().getConsumer("direct:", ConsumerType.GetMetaRequest);
- producer.dispatch(getMetaRequestMessage, target);
+ producer.dispatch((DirectMessage)getMetaRequestMessage.getRawMessage(), target);
} catch( Exception e) {
e.printStackTrace();
@@ -185,25 +496,23 @@ public class DirectUimaAsEndpoint implements UimaAsEndpoint {
}
-
+
private class MockupService {
UimaAsEndpoint endpoint;
- UimaAsProducer producer;
public void initialize() throws Exception {
MockUpAnalysisEngineController controller =
new MockUpAnalysisEngineController("MockupService",4);
- MessageProcessor dummyProcessor =
+ TestMessageProcessor dummyProcessor =
new TestMessageProcessor(controller);
endpoint = new DirectUimaAsEndpoint(dummyProcessor, "Service");
- UimaAsConsumer getMetaReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.GetMeta, 1);
- UimaAsConsumer processCasReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.ProcessCAS, 4);
- UimaAsConsumer cpcReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.Cpc, 1);
+// UimaAsConsumer getMetaRequestConsumer = endpoint.createConsumer("direct:", ConsumerType.GetMetaRequest, 1);
+// UimaAsConsumer processCasRequestConsumer = endpoint.createConsumer("direct:", ConsumerType.ProcessCASRequest, 4);
+// UimaAsConsumer cpcRequestConsumer = endpoint.createConsumer("direct:", ConsumerType.CpcRequest, 1);
- producer = endpoint.createProducer( "direct:serviceA");
- controller.addEndpoint(new UimaAsOrigin("Service"), endpoint);
+ controller.addEndpoint(EndpointType.Direct, endpoint);
}
public void process(MessageContext mc) throws Exception {
@@ -217,35 +526,16 @@ public class DirectUimaAsEndpoint implements UimaAsEndpoint {
public UimaAsEndpoint getEndpoint() {
return endpoint;
}
-
-
- private class ServiceMessageProcessor implements MessageProcessor {
- MockupService service;
- public ServiceMessageProcessor(MockupService service) {
- this.service = service;
- }
- @Override
- public void process(MessageContext message) throws Exception {
- DirectMessage request =
- (DirectMessage)message.getRawMessage();
- DirectMessage getMetaReply =
- new DirectMessage().
- withCommand(AsynchAEMessage.GetMeta).
- withMessageType(AsynchAEMessage.Response).
- withOrigin(message.getEndpoint().getMessageOrigin()).
- withReplyDestination(request.getReplyDestination()).
- withPayload(AsynchAEMessage.Metadata);
- MessageContext reply = new DirectMessageContext(getMetaReply, "", "");
- service.getEndpoint().dispatch(reply);
-
- }
- @Override
- public AnalysisEngineController getController() {
- // TODO Auto-generated method stub
- return null;
- }
-
- }
+
+
}
+
+
+
+
+
+
+
+
}
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java
index be7c9e3..5285da7 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java
@@ -1,31 +1,71 @@
package org.apache.uima.as.connectors.direct;
+import java.util.EnumSet;
+
import org.apache.uima.aae.definition.connectors.UimaAsConsumer;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
+import org.apache.uima.aae.message.Target;
+import org.apache.uima.aae.message.UimaAsTarget;
import org.apache.uima.aae.definition.connectors.UimaAsProducer;
-import org.apache.uima.aae.message.MessageContext;
-import org.apache.uima.aae.message.UimaAsMessage;
import org.apache.uima.as.client.DirectMessage;
public class DirectUimaAsProducer implements UimaAsProducer{
-
+ private Target target;
private UimaAsConsumer consumer;
+ EnumSet<ConsumerType> requestSet = EnumSet.of(ConsumerType.GetMetaRequest,ConsumerType.ProcessCASRequest,ConsumerType.CpcRequest);
+ EnumSet<ConsumerType> responseSet = EnumSet.of(ConsumerType.GetMetaResponse,ConsumerType.ProcessCASResponse,ConsumerType.CpcResponse);
public DirectUimaAsProducer(String targetUri) {
}
- public DirectUimaAsProducer(UimaAsConsumer consumer) {
+
+ public DirectUimaAsProducer(String name, UimaAsConsumer consumer) {
this.consumer = consumer;
+ this.target = new UimaAsTarget(name, EndpointType.Direct);
+ switch(consumer.getType()) {
+ case GetMetaRequest:
+
+ break;
+ case GetMetaResponse:
+
+ break;
+
+ case ProcessCASRequest:
+
+ break;
+
+ case ProcessCASResponse:
+
+ break;
+
+ case CpcRequest:
+
+ break;
+
+ case CpcResponse:
+
+ break;
+
+ default:
+ }
+ }
+ public Target getTarget() {
+ return target;
+ }
+ public boolean requestProducer() {
+ return requestSet.contains(consumer.getType());
+ }
+ public boolean responseProducer() {
+ return responseSet.contains(consumer.getType());
}
-
@Override
public void start() throws Exception {
- // TODO Auto-generated method stub
}
@Override
public void stop() throws Exception {
- // TODO Auto-generated method stub
}
@@ -42,7 +82,11 @@ public class DirectUimaAsProducer implements UimaAsProducer{
}
public static void main(String[] args) {
- // TODO Auto-generated method stub
}
+
+ @Override
+ public ConsumerType getType() {
+ return consumer.getType();
+ }
}
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java
index c8fa5e5..0ef4562 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java
@@ -23,6 +23,7 @@ import org.apache.uima.aae.controller.EventSubscriber;
import org.apache.uima.aae.controller.LocalCache;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.error.AsynchAEException;
@@ -41,6 +42,7 @@ import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.spi.transport.UimaMessageListener;
import org.apache.uima.as.client.DirectInputChannel;
import org.apache.uima.as.client.Listener;
+import org.apache.uima.as.connectors.direct.DirectUimaAsEndpoint;
import org.apache.uima.cas.CAS;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceSpecifier;
@@ -52,20 +54,20 @@ public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineCo
private ThreadLocal<Long> threadLocalValue = new ThreadLocal<>();
private volatile ControllerLatch latch = new ControllerLatch(this);
private CyclicBarrier barrier;
- private Map<Origin, UimaAsEndpoint> endpoints =
+ private Map<EndpointType, UimaAsEndpoint> endpoints =
new HashMap<>();
private final Origin serviceOrigin;
public MockUpAnalysisEngineController(String name, int scaleout) {
this.name = name;
- serviceOrigin = new UimaAsOrigin(name);
+ serviceOrigin = new UimaAsOrigin(name, EndpointType.Direct);
barrier = new CyclicBarrier(scaleout);
}
public Origin getOrigin() {
return serviceOrigin;
}
- public void addEndpoint(Origin origin, UimaAsEndpoint endpoint) {
- endpoints.put(origin, endpoint);
+ public void addEndpoint(EndpointType type, UimaAsEndpoint endpoint) {
+ endpoints.put(type, endpoint);
}
@Override
public void terminate() {
@@ -87,17 +89,40 @@ public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineCo
@Override
public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException {
+ // There is one instance of endpoint per type [Direct, JMS,..]
UimaAsEndpoint endpoint =
- endpoints.get(anEndpoint.getMessageOrigin());//anEndpoint.getDelegateKey());
- MessageContext message = endpoint.createMessage(AsynchAEMessage.GetMeta,AsynchAEMessage.Response,anEndpoint);
+ endpoints.get(anEndpoint.getMessageOrigin().getType());//anEndpoint.getDelegateKey());
+ // endpoints are locally cached for future access. If endpoint
+ // of a given type is not availble, lazily create it and cache it
try {
+ if ( Objects.isNull(endpoint)) {
+ endpoint = createEndpoint(anEndpoint);
+ }
+
+ MessageContext message =
+ endpoint.newMessageBuilder()
+ .newGetMetaReplyMessage(getOrigin())
+ .withSenderKey(anEndpoint.getDelegateKey())
+ .build();
+
+ message.getEndpoint().
+ setReplyDestination(anEndpoint.getReplyDestination());
+
+ // send the reply to target consumer.
endpoint.dispatch(message);
} catch( Exception e) {
throw new AsynchAEException(e);
}
}
-
+ private UimaAsEndpoint createEndpoint(Endpoint e) throws Exception {
+
+ // FIX THE NAMING HERE: Two things are Endpoint now
+ UimaAsEndpoint endpoint =
+ new DirectUimaAsEndpoint( e.getMessageOrigin().getName());
+ addEndpoint(e.getMessageOrigin().getType(), endpoint);
+ return endpoint;
+ }
@Override
public ControllerLatch getControllerLatch() {
return latch;
@@ -260,6 +285,7 @@ public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineCo
@Override
public void initialize() throws AsynchAEException {
+
System.out.println(".....Thread["+Thread.currentThread().getId()+"] "+ getComponentName()+" - Initializing AE");
}
@@ -655,5 +681,20 @@ public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineCo
public boolean threadAssignedToAE() {
return Objects.nonNull(threadLocalValue.get());
}
+ @Override
+ public void addEndpoint(UimaAsEndpoint endpoint) {
+ // TODO Auto-generated method stub
+
+ }
+ @Override
+ public UimaAsEndpoint getEndpoint(EndpointType type) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public void start() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
}
\ No newline at end of file
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java
index e700b00..b74ae35 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java
@@ -3,15 +3,17 @@ package org.apache.uima.as.connectors.mockup;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.message.MessageProcessor;
+import org.apache.uima.aae.message.ServiceMessageProcessor;
import org.apache.uima.aae.service.command.CommandFactory;
import org.apache.uima.aae.service.command.UimaAsCommand;
-public class TestMessageProcessor implements MessageProcessor {
+public class TestMessageProcessor implements ServiceMessageProcessor {
AnalysisEngineController controller;
public TestMessageProcessor(AnalysisEngineController ctlr) {
this.controller = ctlr;
}
+
@Override
public void process(MessageContext message) throws Exception {
UimaAsCommand command =
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class
deleted file mode 100644
index e0b179d..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class
deleted file mode 100644
index cad42cc..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class
deleted file mode 100644
index 61330b1..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class
deleted file mode 100644
index 77468d9..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class
deleted file mode 100644
index 1c96817..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class
deleted file mode 100644
index 3c89bb3..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class
deleted file mode 100644
index 60cb1e6..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class
deleted file mode 100644
index 3754d56..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class
deleted file mode 100644
index 158bd7c..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class
deleted file mode 100644
index 4ca6681..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class
deleted file mode 100644
index 93bd910..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class and /dev/null differ