You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2020/06/10 13:15:11 UTC

[uima-async-scaleout] 27/34: UIMA-5501 refactored to use pluggagble endpoints

This is an automated email from the ASF dual-hosted git repository.

cwiklik pushed a commit to branch uima-as-3
in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git

commit 2a1f37fe4ed6afad66cf1e5582998a271b1b6207
Author: cwiklik <cwiklik>
AuthorDate: Thu Nov 29 17:31:09 2018 +0000

    UIMA-5501 refactored to use pluggagble endpoints
---
 uimaj-as-connectors/pom.xml                        |  66 ++-
 .../as/connectors/direct/DirectUimaAsConsumer.java | 142 +++---
 .../as/connectors/direct/DirectUimaAsEndpoint.java | 550 ++++++++++++++++-----
 .../as/connectors/direct/DirectUimaAsProducer.java |  60 ++-
 .../mockup/MockUpAnalysisEngineController.java     |  55 ++-
 .../as/connectors/mockup/TestMessageProcessor.java |   4 +-
 .../connectors/direct/DirectUimaAsConnector.class  | Bin 1276 -> 0 bytes
 .../direct/DirectUimaAsConsumer$1$1.class          | Bin 2023 -> 0 bytes
 .../connectors/direct/DirectUimaAsConsumer$1.class | Bin 3352 -> 0 bytes
 ...rectUimaAsConsumer$DirectListenerCallback.class | Bin 1358 -> 0 bytes
 .../connectors/direct/DirectUimaAsConsumer.class   | Bin 8575 -> 0 bytes
 ...int$MockupService$ServiceMessageProcessor.class | Bin 2619 -> 0 bytes
 .../DirectUimaAsEndpoint$MockupService.class       | Bin 3518 -> 0 bytes
 .../connectors/direct/DirectUimaAsEndpoint.class   | Bin 8402 -> 0 bytes
 .../connectors/direct/DirectUimaAsProducer.class   | Bin 1534 -> 0 bytes
 .../mockup/MockUpAnalysisEngineController.class    | Bin 16652 -> 0 bytes
 .../connectors/mockup/TestMessageProcessor.class   | Bin 1378 -> 0 bytes
 17 files changed, 663 insertions(+), 214 deletions(-)

diff --git a/uimaj-as-connectors/pom.xml b/uimaj-as-connectors/pom.xml
index da30e8c..2bb5f29 100644
--- a/uimaj-as-connectors/pom.xml
+++ b/uimaj-as-connectors/pom.xml
@@ -8,8 +8,31 @@
 	</parent>
 
   <artifactId>uimaj-as-connectors</artifactId>
-  
-  
+	<name>Apache UIMA-AS: ${project.artifactId}</name>
+	<description>UIMA-AS Connectors</description>
+	<url>${uimaWebsiteUrl}</url>
+
+  	<!-- Special inheritance note even though the <scm> element that follows 
+		is exactly the same as those in super poms, it cannot be inherited because 
+		there is some special code that computes the connection elements from the 
+		chain of parent poms, if this is omitted. Keeping this a bit factored allows 
+		cutting/pasting the <scm> element, and just changing the following two properties -->
+	<scm>
+		<connection>
+      scm:svn:http://svn.apache.org/repos/asf/uima/uima-as/tags/uima-as-2.10.2/uimaj-as-connectors
+    </connection>
+		<developerConnection>
+      scm:svn:https://svn.apache.org/repos/asf/uima/uima-as/tags/uima-as-2.10.2/uimaj-as-connectors
+    </developerConnection>
+		<url>
+      http://svn.apache.org/viewvc/uima/uima-as/tags/uima-as-2.10.2/uimaj-as-connectors
+    </url>
+	</scm>
+	
+	<properties>
+		<uimaScmProject>${project.artifactId}</uimaScmProject>
+	</properties>
+  	
   	<dependencies>
 		<dependency>
 			<groupId>org.apache.uima</groupId>
@@ -17,9 +40,44 @@
 			<version>${project.version}</version>
 			<scope>compile</scope>
 		</dependency>
- 
-		
+ 		
 	</dependencies>
 
+	<build>
+		<finalName>${project.artifactId}</finalName>
+		<plugins>
+			<!-- This plugin makes the tests run by - giving more memory to them, 
+				and - eliminating the very slow integration test -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<argLine>-Xmx300M</argLine>
+					<includes>
+						<include>**/TestUimaASBasic.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+		</plugins>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.apache.rat</groupId>
+					<artifactId>apache-rat-plugin</artifactId>
+					<executions>
+						<execution>
+							<id>default-cli</id>
+							<configuration>
+								<excludes>
+									<exclude>release.properties</exclude> <!-- release generated artifact -->
+									<exclude>src/test/resources/data/DoubleByteText.txt</exclude> <!-- test data -->
+								</excludes>
+							</configuration>
+						</execution>
+					</executions>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
 
 </project>
\ No newline at end of file
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java
index a52b1b1..b87308d 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.java
@@ -2,28 +2,26 @@ package org.apache.uima.as.connectors.direct;
 
 import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.uima.aae.UimaAsThreadFactory;
 import org.apache.uima.aae.controller.AnalysisEngineController;
-import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
 import org.apache.uima.aae.definition.connectors.AbstractUimaAsConsumer;
+import org.apache.uima.aae.definition.connectors.Initializer;
 import org.apache.uima.aae.definition.connectors.ListenerCallback;
 import org.apache.uima.aae.definition.connectors.UimaAsConsumer;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
 import org.apache.uima.aae.message.MessageProcessor;
-import org.apache.uima.aae.spi.transport.vm.UimaVmQueue;
+import org.apache.uima.aae.message.Target;
+import org.apache.uima.aae.message.UimaAsTarget;
 import org.apache.uima.as.client.DirectMessage;
 import org.apache.uima.as.client.DirectMessageContext;
 
 public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
-
+	//private static final String DIRECT = "direct:";
 	private BlockingQueue<DirectMessage> inQueue= new LinkedBlockingQueue<>();;
 	private MessageProcessor processor;
 	private boolean started = false;
@@ -31,38 +29,60 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
 	private final ConsumerType consumerType;
 	private int consumerThreadCount = 1;
 	private boolean doStop = false;
-	private final CountDownLatch latchToCountNumberOfInitedThreads;
-	private final CountDownLatch latchToCountNumberOfTerminatedThreads;
+//	private final CountDownLatch latchToCountNumberOfInitedThreads;
+//	private final CountDownLatch latchToCountNumberOfTerminatedThreads;
 	private AnalysisEngineController controller;
 	private final String name;
+	private Initializer initializer;
+	private DirectListenerCallback callback = new DirectListenerCallback(this);
+	private Target target;
+	private UimaAsConsumer delegate;
+	
     public DirectUimaAsConsumer(String name, BlockingQueue<DirectMessage> inQueue, ConsumerType type, int consumerThreadCount) {
 		this(name, type,consumerThreadCount);
 		this.inQueue = inQueue;
 	}
 
-	public DirectUimaAsConsumer(String name,String targetUri, ConsumerType type, int consumerThreadCount) {
-		this(name, type,consumerThreadCount);
-		
-	}
+//	public DirectUimaAsConsumer(String name, ConsumerType type, int consumerThreadCount) {
+//		this(name, type, consumerThreadCount);
+//		
+//	}
 
-	private DirectUimaAsConsumer( String name, ConsumerType type, int consumerThreadCount) {
-		this.name = name;
+	public DirectUimaAsConsumer( String name, ConsumerType type, int consumerThreadCount) {
+		if ( name.indexOf(EndpointType.Direct.getName()) > -1 ) {
+			this.name = name;
+		} else {
+			this.name = EndpointType.Direct.getName()+name;
+		}
+		
 		this.consumerType = type;
 		this.consumerThreadCount = consumerThreadCount;
-		latchToCountNumberOfInitedThreads = new CountDownLatch(consumerThreadCount);
-		latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerThreadCount);
+//		latchToCountNumberOfInitedThreads = new CountDownLatch(consumerThreadCount);
+//		latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerThreadCount);
+		target = new UimaAsTarget(name, EndpointType.Direct);
+	}
+	public void setInitializer(Initializer initializer) {
+		this.initializer = initializer;
+	}
+	public Target getTarget() {
+		return target;
+	}
+	public int getConsumerCount() {
+		return consumerThreadCount;
 	}
-	
 	public ConsumerType getType() {
 		return consumerType;
 	}
-	
+	public void delegateTo(UimaAsConsumer delegateConsumer) {
+		delegate = delegateConsumer;
+	}
 	protected void setMessageProcessor(MessageProcessor processor) {
 		this.processor = processor;
 	}
 	
 	public void initialize() throws Exception {
-		
+		initializer = new DefaultInitializer(consumerThreadCount);
+		executor = initializer.initialize(callback);
 	}
 
 	/**
@@ -71,50 +91,24 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
 	 */
 	@Override
 	public void consume(DirectMessage message) throws Exception {
-		inQueue.add(message);
-	}
-	
-	private void initializeUimaPipeline() throws Exception {
-//		workQueue = new UimaVmQueue();
-		if ( controller.isPrimitive() ) {
-			ThreadGroup threadGroup = new ThreadGroup("VmThreadGroup" + 1 + "_" + controller.getComponentName());
-			executor = new ThreadPoolExecutor(consumerThreadCount, consumerThreadCount, Long.MAX_VALUE, TimeUnit.DAYS, new UimaVmQueue());
-			UimaAsThreadFactory tf = null;
-			
-			DirectListenerCallback callback = new DirectListenerCallback(this);
-			
-			tf = new UimaAsThreadFactory().
-					withCallback(callback).
-					withThreadGroup(threadGroup).
-					withPrimitiveController((PrimitiveAnalysisEngineController)processor.getController()).
-					withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
-					withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
-			tf.setDaemon(true);
-			((ThreadPoolExecutor)executor).setThreadFactory(tf);
-			((ThreadPoolExecutor)executor).prestartAllCoreThreads();
-			latchToCountNumberOfInitedThreads.await();
-			if ( callback.failedInitialization() ) {
-				throw callback.getException();
-			}
-			System.out.println("Executor Started - All Process Threads Initialized");
+		// if this consumer has a delegate it does not handle
+		// messages itself. Instead messages are passed to the
+		// delegate consumer. An example of this is CPC Consumer
+		// which must delegate CPC requests to ProcessCASConsumer
+		// since CPC requires access to AE instance which is 
+		// only associated with Process Cas Consumers.
+		if ( Objects.nonNull(delegate)) {
+			delegate.consume(message);
 		} else {
-			 executor = Executors.newFixedThreadPool(consumerThreadCount);
+			inQueue.add(message);
 		}
 	}
+
 	public void initialize(AnalysisEngineController controller) throws Exception {
 		this.controller = controller;
 		// Consumer handling ProcessCAS must first initialize each
 		// UIMA pipeline.
-		if (ConsumerType.ProcessCAS.equals(consumerType)) {
-			if ( Objects.isNull(controller)) {
-				 executor = Executors.newFixedThreadPool(consumerThreadCount);
-			} else {
-				initializeUimaPipeline();
-			}
-
-		} else {
-			 executor = Executors.newFixedThreadPool(consumerThreadCount);
-		}
+		executor = initializer.initialize(callback);
 	}
 	
 	private boolean stopConsumingMessages(DirectMessage message ) throws Exception{
@@ -128,6 +122,15 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
 		if ( started ) {
 			return;
 		}
+		if ( Objects.isNull(executor)) {
+			try {
+				initialize();
+			} catch( Exception e) {
+				e.printStackTrace();
+				return;
+			}
+			
+		}
 		System.out.println(">>> "+name+" DirectConsumer.start() - Consumer Type:"+getType());
 		new Thread() {
 			@Override
@@ -139,8 +142,10 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
 					try {
 						
 						final DirectMessage message = inQueue.take(); //blocks if empty
-						System.out.println(">>> "+name+" DirectConsumer.run() - Consumer Type:"+getType()+" Got new message");
-						
+						//System.out.println(">>> "+name+" DirectConsumer.run() - Consumer Type:"+getType()+" Got new message");
+						System.out.println(getType()+" Consumer Received Message From:"+
+						        message.getOrigin());
+
 			            if ( stopConsumingMessages(message)) {  // special type of msg indicating end of processing
 						    System.out.println(">>> "+name+" Got END message - Stopping Queue Consumer");
 			            	doStop = true;
@@ -149,13 +154,9 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
 						        public void run() {
 						        	
 						            try {
-						            	//System.out.println(">>> "+controller.getComponentName()+" Got new message - processing on thread "+Thread.currentThread().getName()+" channel:"+getType());
-										//ic.onMessage(message);
-						            	
 						    			// every message is wrapped in the MessageContext
 						    			MessageContext messageContext = 
-						    					new DirectMessageContext(message, "", controller.getComponentName());
-
+						    					new DirectMessageContext(message, "", name);
 										processor.process(messageContext);
 						            } catch( Exception e) {
 						            	e.printStackTrace();
@@ -202,4 +203,17 @@ public class DirectUimaAsConsumer extends AbstractUimaAsConsumer {
 		}
 	}
 
+	private class DefaultInitializer implements Initializer {
+		private final int consumerThreadCount;
+		
+		public DefaultInitializer(int consumerThreadCount) {
+			this.consumerThreadCount = consumerThreadCount;
+		}
+		
+		@Override
+		public ExecutorService initialize(ListenerCallback callback) throws Exception {
+			return Executors.newFixedThreadPool(consumerThreadCount);
+		}
+		
+	}
 }
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java
index 012c89a..f22b107 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.java
@@ -1,139 +1,455 @@
 package org.apache.uima.as.connectors.direct;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.uima.UimaContext;
-import org.apache.uima.aae.AsynchAECasManager;
-import org.apache.uima.aae.InProcessCache;
-import org.apache.uima.aae.InputChannel;
-import org.apache.uima.aae.OutputChannel;
-import org.apache.uima.aae.UimaEEAdminContext;
+
 import org.apache.uima.aae.controller.AnalysisEngineController;
-import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
-import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
-import org.apache.uima.aae.controller.ControllerCallbackListener;
-import org.apache.uima.aae.controller.ControllerLatch;
-import org.apache.uima.aae.controller.Endpoint;
-import org.apache.uima.aae.controller.EventSubscriber;
-import org.apache.uima.aae.controller.LocalCache;
+import org.apache.uima.aae.controller.PrimitiveAeInitializer;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.definition.connectors.Initializer;
 import org.apache.uima.aae.definition.connectors.UimaAsConsumer;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
 import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
 import org.apache.uima.aae.definition.connectors.UimaAsProducer;
-import org.apache.uima.aae.error.AsynchAEException;
-import org.apache.uima.aae.error.ErrorContext;
-import org.apache.uima.aae.error.ErrorHandlerChain;
-import org.apache.uima.aae.jmx.JmxManagement;
-import org.apache.uima.aae.jmx.ServiceErrors;
-import org.apache.uima.aae.jmx.ServiceInfo;
-import org.apache.uima.aae.jmx.ServicePerformance;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageBuilder;
 import org.apache.uima.aae.message.MessageContext;
 import org.apache.uima.aae.message.MessageProcessor;
+import org.apache.uima.aae.message.Origin;
+import org.apache.uima.aae.message.ServiceMessageProcessor;
+import org.apache.uima.aae.message.Target;
 import org.apache.uima.aae.message.UimaAsOrigin;
-import org.apache.uima.aae.monitor.Monitor;
-import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
-import org.apache.uima.aae.service.command.UimaAsMessageProcessor;
-import org.apache.uima.aae.spi.transport.UimaMessageListener;
-import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.aae.message.UimaAsTarget;
 import org.apache.uima.as.client.DirectMessage;
-import org.apache.uima.as.client.DirectMessageContext;
-import org.apache.uima.as.client.Listener;
 import org.apache.uima.as.connectors.mockup.MockUpAnalysisEngineController;
+import org.apache.uima.as.connectors.mockup.TestClientMessageProcessor;
 import org.apache.uima.as.connectors.mockup.TestMessageProcessor;
-import org.apache.uima.cas.CAS;
-import org.apache.uima.resource.ResourceSpecifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
 
 public class DirectUimaAsEndpoint implements UimaAsEndpoint {
 
-	private Map<String,UimaAsConsumer> consumers = new ConcurrentHashMap<>();
-	private Map<String,UimaAsProducer> producers = new ConcurrentHashMap<>();
-	private final MessageProcessor processor;
-	private final String name;
+	private Map<Target,UimaAsConsumer> consumers = new ConcurrentHashMap<>();
+	private Map<Origin,UimaAsProducer> producers = new ConcurrentHashMap<>();
+	
+	private List<UimaAsProducer> getMetaRequestProducers = new ArrayList<>();
+	private List<UimaAsProducer> processCASRequestProducers = new ArrayList<>();
+	private List<UimaAsProducer> cpcRequestProducers = new ArrayList<>();
+	private List<UimaAsProducer> freeCASRequestProducers = new ArrayList<>();
+
+	private List<UimaAsProducer> getMetaResponseProducers = new ArrayList<>();
+	private List<UimaAsProducer> processCASResponseProducers = new ArrayList<>();
+	private List<UimaAsProducer> cpcResponseProducers = new ArrayList<>();
+
+	private List<UimaAsConsumer> getMetaRequestConsumers = new ArrayList<>();
+	private List<UimaAsConsumer> processCASRequestConsumers = new ArrayList<>();
+	private List<UimaAsConsumer> cpcRequestConsumers = new ArrayList<>();
+	private List<UimaAsConsumer> freeCASRequestConsumers = new ArrayList<>();
+
+	private List<UimaAsConsumer> getMetaResponseConsumers = new ArrayList<>();
+	private List<UimaAsConsumer> processCASResponseConsumers = new ArrayList<>();
+	private List<UimaAsConsumer> cpcResponseConsumers = new ArrayList<>();
+	
+	private Map<String, Target> targets = new HashMap<>();
 	
-	public DirectUimaAsEndpoint(MessageProcessor processor, String name) {
+	private MessageProcessor processor;
+	//private final String name;
+	private final EndpointType type = EndpointType.Direct;
+	
+	private final Origin origin;
+	// this ctor is used for Producer type endpoints. 
+	public DirectUimaAsEndpoint(final String name) {
+		
+		origin = new UimaAsOrigin(name, EndpointType.Direct);
+/*		
+		if ( name.indexOf(EndpointType.Direct.getName()) > -1 ) {
+			origin = new UimaAsOrigin(name);
+		} else {
+			origin = new UimaAsOrigin(EndpointType.Direct.getName()+name);
+		}
+		*/
+//		this.name = name;
+	}
+	// this ctor is used for Consumer type endpoints
+	public DirectUimaAsEndpoint(MessageProcessor processor,String name) {
+		this(name);
 		this.processor = processor;
-		this.name = name;
+		if ( this.processor == null ) {
+			System.out.println("...........DirectUimaAsEndpoint(MessageProcessor processor,String name).......processor is null............");
+			Thread.currentThread().dumpStack();
+		}
 	}
-	public MessageContext createMessage(int command, int messageType, Endpoint endpoint) {
-		DirectMessage message = 
-				new DirectMessage().
-				withCommand(command).
-				withMessageType(messageType).
-				withOrigin(processor.getController().getOrigin());
-		
-		MessageContext messageContext = 
-				new DirectMessageContext(message,name,name);
+	public Origin getOrigin() {
+		return origin;
+	}
+	@Override
+	public Map<Target, UimaAsConsumer> getConsumers() {
+		return consumers;
+	}
+	
+	public String getName() {
+		return origin.getName();
+	}
+	
+	@Override
+	public EndpointType getType() {
+		return type;
+	}
+	
+	@Override
+	public MessageBuilder newMessageBuilder() {
+		return new DirectMessageBuilder(this);
+	}
+
+	private ConsumerType consumerTypeForRequestCommand(int command) {
+		ConsumerType type = null;
+		switch(command) {
+		case AsynchAEMessage.GetMeta:
+			type = ConsumerType.GetMetaRequest;
+			break;
+		case AsynchAEMessage.Process:
+			type = ConsumerType.ProcessCASRequest;
+			break;
+		case AsynchAEMessage.CollectionProcessComplete:
+			type = ConsumerType.CpcRequest;
+			break;
+		case AsynchAEMessage.ReleaseCAS:
+			type = ConsumerType.FreeCASRequest;
+			break;
+		}
+		return type;
+	}
+	private ConsumerType consumerTypeForResponseCommand(int command) {
+		ConsumerType type = null;
+		switch(command) {
+		case AsynchAEMessage.GetMeta:
+			type = ConsumerType.GetMetaResponse;
+			break;
+		case AsynchAEMessage.Process:
+			type = ConsumerType.ProcessCASResponse;
+			break;
+		case AsynchAEMessage.CollectionProcessComplete:
+			type = ConsumerType.CpcResponse;
+		}
+		return type;
+	}
+
+	public ConsumerType type(MessageContext messageContext) throws Exception {
+		int command =
+				messageContext.getMessageIntProperty(AsynchAEMessage.Command);
+		int msgType =
+				messageContext.getMessageIntProperty(AsynchAEMessage.MessageType);
 		
-		message.withCommand(command).
-			withMessageType(messageType);
+		ConsumerType type = null;
 		
-		if ( command == AsynchAEMessage.GetMeta && messageType == AsynchAEMessage.Response ) {
-			message.withMetadata(null);
-//			message.withMetadata(processor.getController().getResourceSpecifier().);
+		if ( AsynchAEMessage.Request == msgType ) {
+			type = consumerTypeForRequestCommand(command);
+		} else if ( AsynchAEMessage.Response == msgType )  {
+			type = consumerTypeForResponseCommand(command);
+		} else {
+			
 		}
 		
-			
-		return messageContext;
+		
+		return type;
+		
 	}
+	/*
 	public void dispatch(MessageContext messageContext) throws Exception {
+		
 		UimaAsProducer producer;
-		if ( !producers.containsKey(messageContext.getEndpoint().getDelegateKey())) {
+		if ( !producers.containsKey( ((DirectMessage)messageContext.getRawMessage()).getOrigin())) {
 			producer = 
-					createProducer((UimaAsConsumer)messageContext.getEndpoint().getDestination(), messageContext.getEndpoint().getDelegateKey());
+					createProducer((UimaAsConsumer)messageContext.getEndpoint().getReplyDestination(), ((DirectMessage)messageContext.getRawMessage()).getOrigin());
+			System.out.println(".............. Creating new Producer for endpoint:"+(UimaAsConsumer)messageContext.getEndpoint().getReplyDestination());
 		} else {
-			producer = producers.get(messageContext.getEndpoint().getDelegateKey());
+			producer = producers.get(((DirectMessage)messageContext.getRawMessage()).getOrigin());
+			System.out.println(".............. Reusing existing producer with Origin:"+((DirectMessage)messageContext.getRawMessage()).getOrigin());
+		}
+		producer.dispatch((DirectMessage)messageContext.getRawMessage());
+	}
+	*/
+	public void dispatch(MessageContext messageContext, String serviceUri) throws Exception {
+		if ( !serviceUri.startsWith(EndpointType.Direct.getName())) {
+			serviceUri = EndpointType.Direct.getName()+serviceUri;
+		}
+		Target target = targets.get(serviceUri);
+		UimaAsProducer producer = producerFor(type(messageContext), target);
+		if ( Objects.isNull(producer)) {
+			producer = createProducer((UimaAsConsumer)messageContext.getMessageObjectProperty(AsynchAEMessage.ReplyToEndpoint), serviceUri);
 		}
 		producer.dispatch((DirectMessage)messageContext.getRawMessage());
 	}
 	
-	public UimaAsProducer createProducer(UimaAsConsumer consumer, String delegateKey) throws Exception {
+	public void dispatch(MessageContext messageContext, Origin origin) throws Exception {
+
+		Target target = targets.get(origin.getName());
+		UimaAsProducer producer = producerFor(type(messageContext), target);
+/*
+		UimaAsProducer producer;
+		if ( !producers.containsKey( target ) ) {
+			producer = 
+					createProducer((UimaAsConsumer)messageContext.getEndpoint().getReplyDestination(), target);
+		} else {
+			producer = producers.get(target);
+		}
+		*/
+		producer.dispatch((DirectMessage)messageContext.getRawMessage());
+	}
+
+	private UimaAsProducer matchProducer(Target target, List<UimaAsProducer> producerList) {
+		UimaAsProducer producer = null;
+		for( UimaAsProducer p : producerList) {
+			if ( p.getTarget().equals(target)) {
+				producer = p;
+			}
+		}
 		
-		UimaAsProducer producer = new DirectUimaAsProducer(consumer);
-		producers.put(delegateKey,producer);
 		return producer;
 	}
-	
-	public UimaAsProducer createProducer(String targetUri) throws Exception {
+	private UimaAsConsumer matchConsumer(Target target, List<UimaAsConsumer> consumerList) {
+		UimaAsConsumer consumer = null;
+		for( UimaAsConsumer c : consumerList) {
+			if ( c.getTarget().equals(target)) {
+				consumer = c;
+			}
+		}
 		
-		UimaAsProducer producer = new DirectUimaAsProducer(targetUri);
-		producers.put(targetUri, producer);
+		return consumer;
+	}
+	private UimaAsProducer producerFor(ConsumerType type, Target target) {
+		UimaAsProducer producer = null;
+		
+		switch(type) {
+		case GetMetaRequest:
+			producer = matchProducer( target, getMetaRequestProducers);
+			break;
+		case GetMetaResponse:
+			producer =  matchProducer( target,getMetaResponseProducers);
+			break;
+			
+		case ProcessCASRequest:
+			producer =  matchProducer( target,processCASRequestProducers);
+			break;
+			
+		case ProcessCASResponse:
+			producer =  matchProducer( target,processCASResponseProducers);
+			break;
+			
+		case CpcRequest:
+			producer =  matchProducer( target,cpcRequestProducers);
+			break;
+			
+		case CpcResponse:
+			producer =  matchProducer( target,cpcResponseProducers);
+			break;
+			
+		case FreeCASRequest:
+			producer =  matchProducer( target,freeCASRequestProducers);
+			break;
+
+		default:
+		}
 		return producer;
 	}
+	private void categorizeProducer(UimaAsProducer producer) {
+		
+		switch(producer.getType()) {
+		case GetMetaRequest:
+			getMetaRequestProducers.add(producer);
+			break;
+			
+		case GetMetaResponse:
+			getMetaResponseProducers.add(producer);
+			break;
+			
+		case ProcessCASRequest:
+			processCASRequestProducers.add(producer);
+			break;
+			
+		case ProcessCASResponse:
+			processCASResponseProducers.add(producer);
+			break;
+			
+		case CpcRequest:
+			cpcRequestProducers.add(producer);
+			break;
+			
+		case CpcResponse:
+			cpcResponseProducers.add(producer);
+			break;
+			
+		case FreeCASRequest:
+			freeCASRequestProducers.add(producer);
+			break;
+	
+		default:
+		}
+	}
+	private void categorizeConsumer(UimaAsConsumer consumer) {
+		
+		switch(consumer.getType()) {
+		case GetMetaRequest:
+			getMetaRequestConsumers.add(consumer);
+			break;
+			
+		case GetMetaResponse:
+			getMetaResponseConsumers.add(consumer);
+			break;
+			
+		case ProcessCASRequest:
+			processCASRequestConsumers.add(consumer);
+			break;
+			
+		case ProcessCASResponse:
+			processCASResponseConsumers.add(consumer);
+			break;
+			
+		case CpcRequest:
+			cpcRequestConsumers.add(consumer);
+			break;
+			
+		case CpcResponse:
+			cpcResponseConsumers.add(consumer);
+			break;
+			
+		case FreeCASRequest:
+			freeCASRequestConsumers.add(consumer);
+			break;
+		default:
+		}
+	}
+	protected UimaAsConsumer consumerFor(ConsumerType type, Target target) {
+		UimaAsConsumer consumer = null;
+		
+		switch(type) {
+		case GetMetaRequest:
+			consumer = matchConsumer( target, getMetaRequestConsumers);
+			break;
+		case GetMetaResponse:
+			consumer =  matchConsumer( target,getMetaResponseConsumers);
+			break;
+			
+		case ProcessCASRequest:
+			consumer =  matchConsumer( target,processCASRequestConsumers);
+			break;
+			
+		case ProcessCASResponse:
+			consumer =  matchConsumer( target,processCASResponseConsumers);
+			break;
+			
+		case CpcRequest:
+			consumer =  matchConsumer( target,cpcRequestConsumers);
+			break;
+			
+		case CpcResponse:
+			consumer =  matchConsumer( target,cpcResponseConsumers);
+			break;
+		case FreeCASRequest:
+			consumer =  matchConsumer( target,freeCASRequestConsumers);
+			break;
 
-	public UimaAsConsumer createConsumer(String targetUri, ConsumerType type, int consumerThreadCount) throws Exception {
+		default:
+		}
+		return consumer;
+	}
+	/*
+	public UimaAsProducer createProducer(UimaAsConsumer consumer, Origin origin) throws Exception {
 		
-		DirectUimaAsConsumer consumer = new DirectUimaAsConsumer(name, targetUri, type, consumerThreadCount);
-		consumer.setMessageProcessor(processor);
-		consumers.put(targetUri+type.name(), consumer);
+		UimaAsProducer producer = new DirectUimaAsProducer(consumer);
+		targets.put(origin.getName(), producer.getTarget());
+
+		producers.put(origin,producer);
+		categorizeProducer(producer);
+		return producer;
+	}
+	*/
+	public UimaAsProducer createProducer(UimaAsConsumer consumer, String serviceUri) throws Exception {
+		
+		UimaAsProducer producer = new DirectUimaAsProducer(serviceUri, consumer);
+		StringBuilder sb = new StringBuilder();
+		if ( serviceUri != null && serviceUri.startsWith(EndpointType.Direct.getName())) {
+			sb.append(serviceUri).append(":").append(consumer.getType().toString());
+			
+		} else {
+			sb.append(EndpointType.Direct.getName()).append(serviceUri).append(":").append(consumer.getType().toString());
+			
+		}
+		targets.put(sb.toString(), producer.getTarget());
+
+		categorizeProducer(producer);
+		return producer;
+	}
+	
+	public UimaAsProducer createProducer(Origin origin) throws Exception {
+		// THIS SHOULD NOT BE CALLED
+		UimaAsProducer producer = new DirectUimaAsProducer(origin.getName());
+		targets.put(origin.getName(), producer.getTarget());
+		producers.put(origin, producer);
+		return producer;
+	}
+
+	public UimaAsConsumer createConsumer( ConsumerType type, int consumerThreadCount) throws Exception {
+		String cid = new StringBuilder(getName()).append(":").append(type.name()).toString();
+		UimaAsConsumer consumer;
+		Target target;
+		if ( (target = targets.get(cid)) == null ) {
+			consumer =
+					new DirectUimaAsConsumer(getName(), type, consumerThreadCount);
+			((DirectUimaAsConsumer)consumer).setMessageProcessor(processor);
+			categorizeConsumer(consumer);
+			targets.put(cid, consumer.getTarget());
+			consumers.put(consumer.getTarget(), consumer);
+		} else {
+			consumer = consumers.get(target);
+		}
+
+		System.out.println(".......... Added new consumer - key:"+consumer.getTarget().getName()+":"+type.name()+" Consumer Count:"+consumers.size());
 		return consumer;
 	}
 	
 	public UimaAsConsumer getConsumer(String targetUri, ConsumerType type) {
-		return consumers.get(targetUri+type.name());
+//		return consumers.get(targetUri+type.name());
+		Target target = targets.get(new StringBuilder(targetUri).append(":").append(type.name()).toString());
+		return consumerFor(type, target);
 	}
 
+	
+
 	@Override
 	public void start() throws Exception {
-		for(Entry<String, UimaAsConsumer> entry : consumers.entrySet()) {
-			entry.getValue().initialize(processor.getController());
+		System.out.println("Consumer Count:"+consumers.size());
+		for(Entry<Target, UimaAsConsumer> entry : consumers.entrySet()) {
+			if ( ConsumerType.ProcessCASRequest.equals(entry.getValue().getType())) {
+				if ( processor == null ) {
+					System.out.println(".... Processor is null");
+				}
+			
+				AnalysisEngineController controller = ((ServiceMessageProcessor)processor).getController();
+				if ( controller.isPrimitive() ) {
+					Initializer initializer = 
+							new PrimitiveAeInitializer((PrimitiveAnalysisEngineController)controller, entry.getValue().getConsumerCount());
+					entry.getValue().setInitializer(initializer);
+					entry.getValue().initialize(controller);
+				}
+			}
 			entry.getValue().start();
 		}
-		for(Entry<String, UimaAsProducer> entry : producers.entrySet()) {
+		for(Entry<Origin, UimaAsProducer> entry : producers.entrySet()) {
 			entry.getValue().start();
 		}
 
 	}
 	@Override
 	public void stop() throws Exception {
-		for(Entry<String, UimaAsConsumer> entry : consumers.entrySet()) {
+		for(Entry<Target, UimaAsConsumer> entry : consumers.entrySet()) {
 			entry.getValue().stop();
 		}
-		for(Entry<String, UimaAsProducer> entry : producers.entrySet()) {
+		for(Entry<Origin, UimaAsProducer> entry : producers.entrySet()) {
 			entry.getValue().stop();
 		}
 		
@@ -142,12 +458,7 @@ public class DirectUimaAsEndpoint implements UimaAsEndpoint {
 	public static void main(String[] args) {
 		try {
 			MessageProcessor dummyProcessor = 
-					new TestMessageProcessor(null);
-					//new TestMessageProcessor(new MockUpAnalysisEngineController("MockupClient", 4));
-
-//			MessageProcessor processor = 
-	//				new UimaAsMessageProcessor(null);
-			//TestMessageProcessor processor = new TestMessageProcessor(null);
+					new TestClientMessageProcessor();
 			
 			DirectUimaAsEndpoint endpoint = 
 					new DirectUimaAsEndpoint(dummyProcessor, "Client");
@@ -158,26 +469,26 @@ public class DirectUimaAsEndpoint implements UimaAsEndpoint {
 			service.initialize();
 			service.start();
 			
-			endpoint.createConsumer("direct:", ConsumerType.GetMeta, 1);
-			endpoint.createConsumer("direct:", ConsumerType.ProcessCAS, 4);
-			endpoint.createConsumer("direct:", ConsumerType.Cpc, 1);
+//			endpoint.createConsumer("direct:", ConsumerType.GetMetaResponse, 1);
+//			endpoint.createConsumer("direct:", ConsumerType.ProcessCASResponse, 4);
+//			endpoint.createConsumer("direct:", ConsumerType.CpcResponse, 1);
 			
 			UimaAsProducer producer = 
-					endpoint.createProducer( "direct:serviceA");
+					endpoint.createProducer( new UimaAsOrigin(EndpointType.Direct.getName()+"serviceA", EndpointType.Direct));
 		
 			endpoint.start();
 			
-			DirectMessage getMetaRequestMessage = 
-					new DirectMessage().
-					    withCommand(AsynchAEMessage.GetMeta).
-					    withMessageType(AsynchAEMessage.Request).
-					    withOrigin(new UimaAsOrigin("Client")).
-					    withReplyDestination(endpoint.getConsumer("direct:", ConsumerType.GetMeta)).
-					    withPayload(AsynchAEMessage.None);
+			MessageContext getMetaRequestMessage = 
+				endpoint.newMessageBuilder()
+				   .newGetMetaRequestMessage(new UimaAsOrigin("Client",EndpointType.Direct))
+				   .withPayload(AsynchAEMessage.None)
+				   .withReplyDestination(endpoint.getConsumer("direct:", ConsumerType.GetMetaResponse))
+				   .build();
+
 			UimaAsConsumer target = 
-					service.getEndpoint().getConsumer("direct:", ConsumerType.GetMeta);
+					service.getEndpoint().getConsumer("direct:", ConsumerType.GetMetaRequest);
 			
-			producer.dispatch(getMetaRequestMessage, target);
+			producer.dispatch((DirectMessage)getMetaRequestMessage.getRawMessage(), target);
 			
 		} catch( Exception e) {
 			e.printStackTrace();
@@ -185,25 +496,23 @@ public class DirectUimaAsEndpoint implements UimaAsEndpoint {
 
 	}
 
-	
+
 	private class MockupService {
 		UimaAsEndpoint endpoint;
-		UimaAsProducer producer;
 		
 		public void initialize() throws Exception {
 			
 			MockUpAnalysisEngineController controller =
 					new MockUpAnalysisEngineController("MockupService",4);
-			MessageProcessor dummyProcessor = 
+			TestMessageProcessor dummyProcessor = 
 					new TestMessageProcessor(controller);
 			
 			endpoint = new DirectUimaAsEndpoint(dummyProcessor, "Service");
-			UimaAsConsumer getMetaReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.GetMeta, 1);
-			UimaAsConsumer processCasReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.ProcessCAS, 4);
-			UimaAsConsumer cpcReplyConsumer = endpoint.createConsumer("direct:", ConsumerType.Cpc, 1);
+//			UimaAsConsumer getMetaRequestConsumer = endpoint.createConsumer("direct:", ConsumerType.GetMetaRequest, 1);
+//			UimaAsConsumer processCasRequestConsumer = endpoint.createConsumer("direct:", ConsumerType.ProcessCASRequest, 4);
+//			UimaAsConsumer cpcRequestConsumer = endpoint.createConsumer("direct:", ConsumerType.CpcRequest, 1);
 			
-			producer = endpoint.createProducer( "direct:serviceA");
-			controller.addEndpoint(new UimaAsOrigin("Service"), endpoint);
+			controller.addEndpoint(EndpointType.Direct, endpoint);
 		}
 		public void process(MessageContext mc) throws Exception {
 			
@@ -217,35 +526,16 @@ public class DirectUimaAsEndpoint implements UimaAsEndpoint {
 		public UimaAsEndpoint getEndpoint() {
 			return endpoint;
 		}
-		
-		
-		private class ServiceMessageProcessor implements MessageProcessor {
-			MockupService service;
-			public ServiceMessageProcessor(MockupService service) {
-				this.service = service;
-			}
-			@Override
-			public void process(MessageContext message) throws Exception {
-				DirectMessage request = 
-						(DirectMessage)message.getRawMessage();
-				DirectMessage getMetaReply = 
-						new DirectMessage().
-						    withCommand(AsynchAEMessage.GetMeta).
-						    withMessageType(AsynchAEMessage.Response).
-						    withOrigin(message.getEndpoint().getMessageOrigin()).
-						    withReplyDestination(request.getReplyDestination()).
-						    withPayload(AsynchAEMessage.Metadata);
-				MessageContext reply = new DirectMessageContext(getMetaReply, "", "");
-				service.getEndpoint().dispatch(reply);
-				
-			}
-			@Override
-			public AnalysisEngineController getController() {
-				// TODO Auto-generated method stub
-				return null;
-			}
-			
-		}
+
+
 	}
 
+
+
+
+
+
+
+
+
 }
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java
index be7c9e3..5285da7 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.java
@@ -1,31 +1,71 @@
 package org.apache.uima.as.connectors.direct;
 
 
+import java.util.EnumSet;
+
 import org.apache.uima.aae.definition.connectors.UimaAsConsumer;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
+import org.apache.uima.aae.message.Target;
+import org.apache.uima.aae.message.UimaAsTarget;
 import org.apache.uima.aae.definition.connectors.UimaAsProducer;
-import org.apache.uima.aae.message.MessageContext;
-import org.apache.uima.aae.message.UimaAsMessage;
 import org.apache.uima.as.client.DirectMessage;
 
 public class DirectUimaAsProducer implements UimaAsProducer{
-
+	private Target target;
 	private UimaAsConsumer consumer;
+	EnumSet<ConsumerType> requestSet = EnumSet.of(ConsumerType.GetMetaRequest,ConsumerType.ProcessCASRequest,ConsumerType.CpcRequest);
+	EnumSet<ConsumerType> responseSet = EnumSet.of(ConsumerType.GetMetaResponse,ConsumerType.ProcessCASResponse,ConsumerType.CpcResponse);
 	
 	public DirectUimaAsProducer(String targetUri) {
 		
 	}
-	public DirectUimaAsProducer(UimaAsConsumer consumer) {
+
+	public DirectUimaAsProducer(String name, UimaAsConsumer consumer) {
 		this.consumer = consumer;
+		this.target = new UimaAsTarget(name, EndpointType.Direct);
+		switch(consumer.getType()) {
+		case GetMetaRequest:
+			
+			break;
+		case GetMetaResponse:
+			
+			break;
+			
+		case ProcessCASRequest:
+			
+			break;
+			
+		case ProcessCASResponse:
+			
+			break;
+			
+		case CpcRequest:
+			
+			break;
+			
+		case CpcResponse:
+			
+			break;
+			
+		default:
+		}
+	}
+	public Target getTarget() {
+		return target;
+	}
+	public boolean requestProducer() {
+		return requestSet.contains(consumer.getType());
+	}
+	public boolean responseProducer() {
+		return responseSet.contains(consumer.getType());
 	}
-
 	@Override
 	public void start() throws Exception {
-		// TODO Auto-generated method stub
 		
 	}
 	@Override
 	public void stop() throws Exception {
-		// TODO Auto-generated method stub
 		
 	}
 
@@ -42,7 +82,11 @@ public class DirectUimaAsProducer implements UimaAsProducer{
 		
 	}
 	public static void main(String[] args) {
-		// TODO Auto-generated method stub
 
 	}
+
+	@Override
+	public ConsumerType getType() {
+		return consumer.getType();
+	}
 }
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java
index c8fa5e5..0ef4562 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.java
@@ -23,6 +23,7 @@ import org.apache.uima.aae.controller.EventSubscriber;
 import org.apache.uima.aae.controller.LocalCache;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
 import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
 import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.error.AsynchAEException;
@@ -41,6 +42,7 @@ import org.apache.uima.aae.monitor.Monitor;
 import org.apache.uima.aae.spi.transport.UimaMessageListener;
 import org.apache.uima.as.client.DirectInputChannel;
 import org.apache.uima.as.client.Listener;
+import org.apache.uima.as.connectors.direct.DirectUimaAsEndpoint;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.resource.ResourceSpecifier;
@@ -52,20 +54,20 @@ public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineCo
 	private ThreadLocal<Long> threadLocalValue = new ThreadLocal<>();
 	private volatile ControllerLatch latch = new ControllerLatch(this);
 	private CyclicBarrier barrier;
-	private Map<Origin, UimaAsEndpoint> endpoints = 
+	private Map<EndpointType, UimaAsEndpoint> endpoints = 
 			new HashMap<>();
 	private final Origin serviceOrigin;
 	
 	public MockUpAnalysisEngineController(String name, int scaleout) {
 		this.name = name;
-		serviceOrigin = new UimaAsOrigin(name);
+		serviceOrigin = new UimaAsOrigin(name, EndpointType.Direct);
 		barrier = new CyclicBarrier(scaleout);
 	}
 	public Origin getOrigin() {
 		return serviceOrigin;
 	}
-	public void addEndpoint(Origin origin, UimaAsEndpoint endpoint) {
-		endpoints.put(origin, endpoint);
+	public void addEndpoint(EndpointType type, UimaAsEndpoint endpoint) {
+		endpoints.put(type, endpoint);
 	}
 	@Override
 	public void terminate() {
@@ -87,17 +89,40 @@ public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineCo
 
 	@Override
 	public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException {
+		// There is one instance of endpoint per type [Direct, JMS,..]
 		UimaAsEndpoint endpoint = 
-				endpoints.get(anEndpoint.getMessageOrigin());//anEndpoint.getDelegateKey());
-		MessageContext message = endpoint.createMessage(AsynchAEMessage.GetMeta,AsynchAEMessage.Response,anEndpoint);
+				endpoints.get(anEndpoint.getMessageOrigin().getType());//anEndpoint.getDelegateKey());
+		// endpoints are locally cached for future access. If endpoint
+		// of a given type is not availble, lazily create it and cache it
 		try {
+			if ( Objects.isNull(endpoint)) {
+				endpoint = createEndpoint(anEndpoint);
+			}
+			
+			MessageContext message = 
+				endpoint.newMessageBuilder()
+					.newGetMetaReplyMessage(getOrigin())
+					.withSenderKey(anEndpoint.getDelegateKey())
+					.build();
+					
+			message.getEndpoint().
+			   setReplyDestination(anEndpoint.getReplyDestination());
+			
+			// send the reply to target consumer.
 			endpoint.dispatch(message);
 		} catch( Exception e) {
 			throw new AsynchAEException(e);
 		}
 
 	}
-
+	private UimaAsEndpoint createEndpoint(Endpoint e) throws Exception {
+		
+		// FIX THE NAMING HERE: Two things are Endpoint now
+		UimaAsEndpoint endpoint =
+					new DirectUimaAsEndpoint( e.getMessageOrigin().getName());
+		addEndpoint(e.getMessageOrigin().getType(), endpoint);
+		return endpoint;
+	}
 	@Override
 	public ControllerLatch getControllerLatch() {
 		return latch;
@@ -260,6 +285,7 @@ public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineCo
 
 	@Override
 	public void initialize() throws AsynchAEException {
+
 		System.out.println(".....Thread["+Thread.currentThread().getId()+"] "+ getComponentName()+" - Initializing AE");
 	}
 
@@ -655,5 +681,20 @@ public class MockUpAnalysisEngineController implements PrimitiveAnalysisEngineCo
 	public boolean threadAssignedToAE() {
 		return Objects.nonNull(threadLocalValue.get());
 	}
+	@Override
+	public void addEndpoint(UimaAsEndpoint endpoint) {
+		// TODO Auto-generated method stub
+		
+	}
+	@Override
+	public UimaAsEndpoint getEndpoint(EndpointType type) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	@Override
+	public void start() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
 	
 }
\ No newline at end of file
diff --git a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java
index e700b00..b74ae35 100644
--- a/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java
+++ b/uimaj-as-connectors/src/main/java/org/apache/uima/as/connectors/mockup/TestMessageProcessor.java
@@ -3,15 +3,17 @@ package org.apache.uima.as.connectors.mockup;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.message.MessageContext;
 import org.apache.uima.aae.message.MessageProcessor;
+import org.apache.uima.aae.message.ServiceMessageProcessor;
 import org.apache.uima.aae.service.command.CommandFactory;
 import org.apache.uima.aae.service.command.UimaAsCommand;
 
-public class TestMessageProcessor implements MessageProcessor {
+public class TestMessageProcessor implements ServiceMessageProcessor {
 	AnalysisEngineController controller;
 	
 	public TestMessageProcessor(AnalysisEngineController ctlr) {
 		this.controller = ctlr;
 	}
+	
 	@Override
 	public void process(MessageContext message) throws Exception {
 		UimaAsCommand command = 
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class
deleted file mode 100644
index e0b179d..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConnector.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class
deleted file mode 100644
index cad42cc..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1$1.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class
deleted file mode 100644
index 61330b1..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$1.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class
deleted file mode 100644
index 77468d9..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer$DirectListenerCallback.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class
deleted file mode 100644
index 1c96817..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsConsumer.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class
deleted file mode 100644
index 3c89bb3..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService$ServiceMessageProcessor.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class
deleted file mode 100644
index 60cb1e6..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint$MockupService.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class
deleted file mode 100644
index 3754d56..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsEndpoint.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class
deleted file mode 100644
index 158bd7c..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/direct/DirectUimaAsProducer.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class
deleted file mode 100644
index 4ca6681..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/MockUpAnalysisEngineController.class and /dev/null differ
diff --git a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class b/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class
deleted file mode 100644
index 93bd910..0000000
Binary files a/uimaj-as-connectors/target/classes/org/apache/uima/as/connectors/mockup/TestMessageProcessor.class and /dev/null differ