You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/10/18 14:12:01 UTC

svn commit: r1844241 [2/4] - in /uima/uima-as/branches/uima-as-3: aggregate-uima-as/ uima-as-parent/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/ uimaj-as-activ...

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,267 @@
+package org.apache.uima.aae.component.dd;
+
+import java.io.File;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.uima.aae.UimaASUtils;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.CasMultiplierComponent;
+import org.apache.uima.aae.component.CasMultiplierNature;
+import org.apache.uima.aae.component.RemoteAnalysisEngineComponent;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
+import org.apache.uima.aae.component.factory.AnalysisEngineComponentFactory;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AnalysisEngineType;
+import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.apache.xmlbeans.XmlDocumentProperties;
+
+public class DeploymentDescriptorProcessor {
+	
+	private AnalysisEngineDeploymentDescriptionDocument dd = null;
+	public DeploymentDescriptorProcessor() {
+		
+	}
+	public DeploymentDescriptorProcessor(AnalysisEngineDeploymentDescriptionDocument dd) {
+		this.dd = dd;
+	}
+	public AnalysisEngineComponent newComponent(String descriptorPath) throws Exception {
+		this.dd = parseDD(descriptorPath);
+		return newComponent();
+	}
+	
+	public TopLevelServiceComponent newComponent() throws Exception {
+		ServiceType service =
+				dd.getAnalysisEngineDeploymentDescription().getDeployment().getService();
+		XmlDocumentProperties dp = dd.documentProperties();
+		System.out.println(dp.getSourceName());
+
+		// get absolute path to resource specifier
+		String  aeDescriptor = 
+				UimaASUtils.fixPath(dp.getSourceName(), getDescriptor(service));
+
+		// Get top level uima resource specifier
+		ResourceSpecifier resourceSpecifier = 
+				UimaClassFactory.produceResourceSpecifier(aeDescriptor);
+
+		AnalysisEngineComponentFactory componentFactory =
+				new AnalysisEngineComponentFactory();
+		// Process top level AE resource specifier and all its delegates.
+		// For aggregates, recursively walk through a delegate tree, producing 
+		// a tree of AnalysisEngineComponent instances, one for every delegate.
+		
+		AnalysisEngineComponent aeComponent = 
+				componentFactory.produce(resourceSpecifier, null);
+
+		// Decorate above with top level component functionality
+		TopLevelServiceComponent topLevelComponent = 
+				new TopLevelServiceComponent(aeComponent, dd);
+
+//		if ( aeComponent.isPrimitive()) {
+//			// The AE descriptor is for a primitive AE
+//		} else {
+			// the AE descriptor is for an aggregate AE. Check DD to
+			// see if its an async aggregate. Its async=true or
+			// has delegates.
+//		if ( isAggregate(service.getAnalysisEngine()) ) {
+		if ( topLevelComponent.isAggregate()) {
+				// All delegates will be colocated unless
+				// a delegate is remote. That is determined 
+				// below.
+//				aeComponent.enableAsync();
+				//if ( dd.getAnalysisEngineDeploymentDescription().getDeployment().getProtocol()
+				DelegateAnalysisEngineType[] colocatedDelegates = null;
+				if ( Objects.nonNull(service.getAnalysisEngine()) &&
+					 Objects.nonNull(service.getAnalysisEngine().getDelegates()) ) {
+					colocatedDelegates = service.getAnalysisEngine().
+											getDelegates().
+											getAnalysisEngineArray();
+					
+				}
+				handleColocatedDelegates(colocatedDelegates, aeComponent.getChildren());
+				
+				RemoteAnalysisEngineType[] remoteDelegates = null;
+				if ( Objects.nonNull(service.getAnalysisEngine()) &&
+					 Objects.nonNull(service.getAnalysisEngine().getDelegates())) {
+					remoteDelegates = service.getAnalysisEngine().
+									getDelegates().
+									getRemoteAnalysisEngineArray();
+				}
+				handleRemoteDelegates(remoteDelegates, aeComponent.getChildren());
+
+//				service.getAnalysisEngine().
+//		        getDelegates().
+			//}
+		} 
+
+		
+		return topLevelComponent;
+	}
+	public AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
+		return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath));	
+
+	}
+	private boolean isAggregate(AnalysisEngineType aet) {
+		return ("true".equals(aet.getAsync()) || aet.isSetAsync() || aet.isSetDelegates());
+	} 
+	private String getDescriptor(ServiceType service) {
+		String aeDescriptor = service.getTopDescriptor().getImport().getLocation();
+		if ( aeDescriptor == null ) {
+			aeDescriptor = service.getTopDescriptor().getImport().getName();
+		}
+		return aeDescriptor;
+	}
+	
+	private void markAllDelegatesAsAsync(List<AnalysisEngineComponent> resourceSpecifierDelegates) {
+		for ( AnalysisEngineComponent aec : resourceSpecifierDelegates ) {
+			if ( !aec.isPrimitive() ) {
+				handleColocatedDelegates(null, aec.getChildren());
+			}
+			aec.enableAsync();
+		}
+		
+	}
+	private void handleColocatedDelegates(DelegateAnalysisEngineType[] ddDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) {
+		if ( Objects.isNull(ddDelegates)) {
+			// the dd does not include delegates but is configured as an asynch service
+			// so process resource specifiers recursively marking each part of a pipeline
+			// as asynch so that it is deployed as a collocated asynch service.
+			handleDefaultColocatedDelegates(resourceSpecifierDelegates);
+			//markAllDelegatesAsAsync(resourceSpecifierDelegates);
+			return; 
+		}
+		// go through all delegates defined in the deployment descriptor (dd)
+		for( DelegateAnalysisEngineType ddDelegate : ddDelegates ) {
+			// find a matching delegate in the AE resource specifier
+			for( AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates ) {
+				if ( ddDelegate.getKey().equals(resourceSpecifierDelegate.getKey())) {
+					if (  resourceSpecifierDelegate.isCasMultiplier() && Objects.nonNull(ddDelegate.getCasMultiplier())) {
+						// plugin cas multiplier settings from dd
+						CasMultiplierNature casMultiplier =
+								new CasMultiplierComponent(ddDelegate.getCasMultiplier().getDisableJCasCache(), 
+										                   TypeConverter.convertStringToLong(ddDelegate.getCasMultiplier().getInitialFsHeapSize(), 1000), 
+										                   ddDelegate.getCasMultiplier().getPoolSize(),
+										                   TypeConverter.convertStringToBoolean(ddDelegate.getCasMultiplier().getProcessParentLast(),true) );
+						resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier);
+						
+					}
+					resourceSpecifierDelegate.enableAsync();   // delegate is async
+					
+					resourceSpecifierDelegate.
+					      withScaleout(Objects.isNull(ddDelegate.getScaleout()) ? 1 :ddDelegate.getScaleout().getNumberOfInstances()).
+					      withRequestThreadPoolSize( TypeConverter.convertStringToInt(ddDelegate.getInputQueueScaleout(), 1)).
+					      withReplyThreadPoolSize( TypeConverter.convertStringToInt(ddDelegate.getInternalReplyQueueScaleout(),1));
+										
+					if ( isAggregate(ddDelegate) ) {
+						
+						resourceSpecifierDelegate.enableAsync();
+						for ( AnalysisEngineComponent aec : resourceSpecifierDelegate.getChildren() ) {
+							aec.enableAsync();
+						}
+						if ( ddDelegate.getDelegates() != null ) {
+							// recursively process collocated delegates
+							handleColocatedDelegates(ddDelegate.getDelegates().getAnalysisEngineArray() , resourceSpecifierDelegate.getChildren());
+							handleRemoteDelegates(ddDelegate.getDelegates().getRemoteAnalysisEngineArray(), resourceSpecifierDelegate.getChildren());
+						}
+						
+					} 
+					break;  // found a match and completed processing it. We are done with it.
+				}
+			}
+		}
+	}
+
+	private void handleDefaultColocatedDelegates(List<AnalysisEngineComponent> resourceSpecifierDelegates) {
+		// find a matching delegate in the AE resource specifier
+		for (AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates) {
+			if (resourceSpecifierDelegate.isCasMultiplier()) {
+				// plugin cas multiplier settings from dd
+				CasMultiplierNature casMultiplier = new CasMultiplierComponent(false, 1000, 1, true);
+				resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier);
+			}
+			resourceSpecifierDelegate.withScaleout(1).
+				withRequestThreadPoolSize(1).
+				withReplyThreadPoolSize(1).
+				enableAsync();
+			
+			if (!resourceSpecifierDelegate.isPrimitive()) {
+				handleDefaultColocatedDelegates(resourceSpecifierDelegate.getChildren());
+			}
+		}
+	}
+	private void handleRemoteDelegates(RemoteAnalysisEngineType[] remoteDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) {
+		if ( Objects.isNull(remoteDelegates) ) {
+			return;
+		}
+		for( RemoteAnalysisEngineType remoteDelegateType : remoteDelegates ) {
+			// find a matching delegate in the AE resource specifier
+			for( AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates ) {
+				if ( remoteDelegateType.getKey().equals(resourceSpecifierDelegate.getKey())) {
+					// find an index of the current component in the list. We 
+					// will decorate this component as a remote, and replace
+					// it in the list.
+					int index = 
+							resourceSpecifierDelegates.indexOf(resourceSpecifierDelegate);
+					// Decorate existing component with remote flavor
+					RemoteAnalysisEngineComponent remoteDelegate = 
+							new RemoteAnalysisEngineComponent(resourceSpecifierDelegate, remoteDelegateType);
+					
+					//replace component with decorated remote
+					resourceSpecifierDelegates.set(index, remoteDelegate);
+				}
+				
+			}
+			
+		}
+	}
+/*	
+	public void parse(DelegateAnalysisEngineType colocatedDelegate, AnalysisEngineComponent component) {
+		if ( isAggregate(colocatedDelegate) ) {
+			DelegateAnalysisEngineType[] colocatedDelegates = 
+					colocatedDelegate.getDelegates().getAnalysisEngineArray();
+			handleColocatedDelegates(colocatedDelegates, component.getChildren());
+		} else {
+			
+		}
+	}
+*/
+	public static void main(String[] args) {
+		try {
+			DeploymentDescriptorProcessor ddp = 
+					new DeploymentDescriptorProcessor();
+			ddp.newComponent(args[0]);
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+	private static class TypeConverter {
+		private static int convertStringToInt(String value, int defaultValue) {
+			int returnValue = defaultValue;
+			try {
+				returnValue = Integer.parseInt(value);
+			} catch( Exception e) {
+			}
+			return returnValue;
+		}
+		private static boolean convertStringToBoolean(String value, boolean defaultValue) {
+			boolean returnValue = defaultValue;
+			try {
+				returnValue = Boolean.parseBoolean(value);
+			} catch( Exception e) {
+			}
+			return returnValue;
+		}
+		private static long convertStringToLong(String value, long defaultValue) {
+			long returnValue = defaultValue;
+			try {
+				returnValue = Long.parseLong(value);
+			} catch( Exception e) {
+			}
+			return returnValue;
+		}
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,55 @@
+package org.apache.uima.aae.component.factory;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.component.AggregateAnalysisEngineComponent;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.PrimitiveAnalysisEngineComponent;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public class AnalysisEngineComponentFactory {
+	
+    private AnalysisEngineDescription getAeDescription(ResourceSpecifier rs) {
+		return (AnalysisEngineDescription) rs;
+    }
+    
+	public AnalysisEngineComponent produce(ResourceSpecifier rs, String key) throws Exception {
+		AnalysisEngineDescription aeDescriptor = getAeDescription(rs);
+		AnalysisEngineComponent component = null;
+		
+		if ( aeDescriptor.isPrimitive() ) {
+			component = new PrimitiveAnalysisEngineComponent(key, rs);
+		} else {
+			component = 
+					new AggregateAnalysisEngineComponent(key, rs);
+			Map<String, ResourceSpecifier> delegates =
+    				aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+    		for(Entry<String, ResourceSpecifier> delegateEntry: delegates.entrySet() ) {
+    			component.add(produce(delegateEntry.getValue(), delegateEntry.getKey() ));
+    		}
+		}
+		
+		if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed() ) {
+			component.enableScaleout();
+		}
+		if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() ) {
+			component.enableCasMultipler();
+		}
+
+		return component;
+	}
+	public static void main(String[] args ) {
+		try {
+			AnalysisEngineComponentFactory factory = 
+					new AnalysisEngineComponentFactory();
+			ResourceSpecifier resourceSpecifier = 
+					UimaClassFactory.produceResourceSpecifier(args[0]);
+			factory.produce(resourceSpecifier, "TopLevel");
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu Oct 18 14:12:00 2018
@@ -2028,25 +2028,54 @@ implements
     return retValue;
   }
 
-  private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) {
-   
-	  // Get the key of the Cas Producer
-    String casProducer = cacheEntry.getCasProducerAggregateName();
-    // CAS is considered new from the point of view of this service IF it was produced by it
-    boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(
-            casProducer));
-    if (parent != null && parent.isFailed() && isNewCas) {
-      return true; // no point to continue if the CAS was produced in this aggregate and its parent
-                   // failed here
-    }
-    // If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR
-    // this component
-    // is not a Cas Multiplier
-    if (isNewCas && parent.getSubordinateCasInPlayCount() == 0 && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
-      return true;
-    }
-    return false;
-  }
+	private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) {
+
+		// Get the key of the Cas Producer
+		String casProducer = cacheEntry.getCasProducerAggregateName();
+		// CAS is considered new from the point of view of this service IF it was
+		// produced by it
+		boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(casProducer));
+		// force to drop a child CAS if this service is not a Cas Multiplier
+		if ( isNewCas ) {
+			if ( !isCasMultiplier()) {
+				System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP THE CAS");
+				return true;
+			}
+			if (parent != null && parent.isFailed()) {
+				return true; // no point to continue if the CAS was produced in this aggregate and its parent
+								// failed 
+			}
+			// If the CAS was generated by this component but the Flow Controller wants to
+			// drop the CAS OR this component is not a Cas Multiplier
+			if ( parent != null && parent.getSubordinateCasInPlayCount() == 0
+					&& (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
+				return true;
+			}
+		}
+
+
+		/*
+		if (isNewCas && isTopLevelComponent() && !isCasMultiplier()) {
+			System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP THE CAS");
+			return true;
+		}
+		if (parent != null && parent.isFailed() && isNewCas) {
+			return true; // no point to continue if the CAS was produced in this aggregate and its parent
+							// failed here
+		}
+		// If the CAS was generated by this component but the Flow Controller wants to
+		// drop the CAS OR
+		// this component
+		// is not a Cas Multiplier
+		if (isNewCas && parent.getSubordinateCasInPlayCount() == 0
+				&& (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
+			return true;
+		}
+		
+		*/
+		
+		return false;
+	}
 
   private boolean casHasExceptions(CasStateEntry casStateEntry) {
     return (casStateEntry.getErrors().size() > 0) ? true : false;

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Thu Oct 18 14:12:00 2018
@@ -40,6 +40,7 @@ import org.apache.uima.aae.jmx.JmxManage
 import org.apache.uima.aae.jmx.ServiceErrors;
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.Origin;
 import org.apache.uima.aae.monitor.Monitor;
 import org.apache.uima.aae.spi.transport.UimaMessageListener;
 import org.apache.uima.aae.spi.transport.UimaTransport;
@@ -55,6 +56,8 @@ public interface AnalysisEngineControlle
 
   public static final String AEInstanceCount = "AEInstanceCount";
 
+  public Origin getOrigin();
+  
   public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException;
 
   public ControllerLatch getControllerLatch();

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Thu Oct 18 14:12:00 2018
@@ -78,6 +78,8 @@ import org.apache.uima.aae.jmx.ServiceEr
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.jmx.ServicePerformance;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
+import org.apache.uima.aae.message.UimaAsOrigin;
 import org.apache.uima.aae.monitor.Monitor;
 import org.apache.uima.aae.monitor.MonitorBaseImpl;
 import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
@@ -111,6 +113,7 @@ public abstract class BaseAnalysisEngine
 	JMS,
 	DIRECT
   };
+  private final Origin origin;
   private static final Class<?> CLASS_NAME = BaseAnalysisEngineController.class;
   private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
   public enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
@@ -284,7 +287,7 @@ public abstract class BaseAnalysisEngine
   protected abstract void doWarmUp(CAS cas, String casReferenceId) throws Exception;
 
   public BaseAnalysisEngineController() {
-
+	  origin = new UimaAsOrigin("");
   }
  
   public BaseAnalysisEngineController(AnalysisEngineController aParentController,
@@ -316,7 +319,7 @@ public abstract class BaseAnalysisEngine
           Map aDestinationMap, JmxManagement aJmxManagement,boolean disableJCasCache) throws Exception {
     
 	System.out.println("C'tor Called Descriptor:"+aDescriptor);
-
+    origin = new UimaAsOrigin(anEndpointName);
 	casManager = aCasManager;
     inProcessCache = anInProcessCache;
     localCache = new LocalCache(this);
@@ -529,7 +532,9 @@ public abstract class BaseAnalysisEngine
 	  return uimaContext;
   }
 
-
+  public Origin getOrigin() {
+	  return origin;
+  }
   public void setThreadFactory(ThreadPoolTaskExecutor factory) {
 	  threadFactory = factory;
   }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java Thu Oct 18 14:12:00 2018
@@ -21,6 +21,7 @@ package org.apache.uima.aae.controller;
 
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.jmx.ServiceInfo;
+import org.apache.uima.aae.message.Origin;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.cas.impl.TypeSystemImpl;
 
@@ -31,6 +32,12 @@ public interface Endpoint {
 
   public static final int DISABLED = 3;
 
+  public void setMessageOrigin(Origin origin);
+  
+  public Origin getMessageOrigin();
+  
+  public String getUniqueId();
+  
   public boolean isJavaRemote();
   
   public void setJavaRemote();

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java Thu Oct 18 14:12:00 2018
@@ -20,11 +20,13 @@
 package org.apache.uima.aae.controller;
 
 import java.util.Timer;
+import java.util.UUID;
 
 import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.cas.impl.TypeSystemImpl;
 import org.apache.uima.resource.ResourceSpecifier;
@@ -32,6 +34,8 @@ import org.apache.uima.resource.Resource
 public class Endpoint_impl implements Endpoint, Cloneable {
   private static final Class<?> CLASS_NAME = Endpoint_impl.class;
 
+  private String uniqueId = UUID.randomUUID().toString();
+  
   private volatile boolean javaRemote=false;
   
   private volatile Object destination = null;
@@ -130,6 +134,18 @@ public class Endpoint_impl implements En
   
  private ResourceSpecifier resourceSpecifier;
   
+ private Origin messageOrigin;
+ 
+ public void setMessageOrigin(Origin origin) {
+	 this.messageOrigin = origin;
+ }
+ 
+ public Origin getMessageOrigin() {
+	 return messageOrigin;
+ }
+  public String getUniqueId() {
+	  return uniqueId;
+  }
   public void setJavaRemote() {
 	 javaRemote = true;
   }

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,9 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.message.MessageProcessor;
+
+public abstract class AbstractUimaAsConsumer implements UimaAsConsumer{
+	
+	protected abstract void setMessageProcessor(MessageProcessor processor);
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,5 @@
+package org.apache.uima.aae.definition.connectors;
+
+public interface ComponentConnector {
+	public Object getConnectionInfo();
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,31 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.definition.connectors.basic.BasicConnector;
+import org.apache.uima.aae.definition.connectors.basic.DirectConnector;
+
+public class ConnectorFactory {
+
+	public static ComponentConnector newConnector(String protocol, String vendor) {
+		ComponentConnector connector=null;
+		switch(protocol.toLowerCase()) {
+		case "jms":
+			connector = getJmsConnector(vendor);
+			break;
+		case "direct":
+			connector = new DirectConnector();
+			break;
+			
+		default:
+			connector = new BasicConnector();
+		}
+		return connector;
+	}
+	private static ComponentConnector getJmsConnector(String vendor) {
+		return null;
+	}
+	public static void main(String[] args) {
+		// TODO Auto-generated method stub
+
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,7 @@
+package org.apache.uima.aae.definition.connectors;
+
+public interface ListenerCallback {
+	public void onInitializationError(Exception e);
+	public boolean failedInitialization();
+	public Exception getException();
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,11 @@
+package org.apache.uima.aae.definition.connectors;
+
+import java.util.Map;
+
+public interface UimaAsConnector {
+
+	public UimaAsEndpoint createEndpoint(String uri, Map<String, Object> params) 
+	throws Exception;
+	
+	
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,17 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.as.client.DirectMessage;
+
+public interface UimaAsConsumer extends Lifecycle {
+	public enum ConsumerType {GetMeta,ProcessCAS,Cpc,FreeCAS,Reply,Info};
+	
+	public void initialize() throws Exception;
+	public void initialize(AnalysisEngineController controller) throws Exception;
+	
+	public void consume(DirectMessage message) throws Exception;
+	
+	public ConsumerType getType();
+	
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,15 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.message.MessageContext;
+
+public interface UimaAsEndpoint extends Lifecycle {
+	public UimaAsProducer createProducer(String targetUri) throws Exception;
+	public UimaAsProducer createProducer(UimaAsConsumer consumer, String delegateKey)  throws Exception;
+	public UimaAsConsumer createConsumer(String targetUri, ConsumerType type, int consumerThreadCount) throws Exception;
+	public void dispatch(MessageContext messageContext) throws Exception;
+	public UimaAsConsumer getConsumer(String targetUri, ConsumerType type);
+	public MessageContext createMessage(int command, int messageType, Endpoint endpoint);
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,13 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UimaAsMessage;
+import org.apache.uima.as.client.DirectMessage;
+
+public interface UimaAsProducer extends Lifecycle {
+
+	public void dispatch(DirectMessage message) throws Exception;
+	public void dispatch(DirectMessage message, UimaAsConsumer target) throws Exception;
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,12 @@
+package org.apache.uima.aae.definition.connectors.basic;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class BasicConnector implements ComponentConnector {
+
+	@Override
+	public Object getConnectionInfo() {
+		return "";
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,13 @@
+package org.apache.uima.aae.definition.connectors.basic;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class DirectConnector implements ComponentConnector {
+
+	@Override
+	public Object getConnectionInfo() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,36 @@
+package org.apache.uima.aae.definition.connectors.jms;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class ActiveMqConnector implements ComponentConnector {
+	private final Object connection;
+	
+	ActiveMqConnector(String queue, String broker, int prefetch) {
+		connection = new ActiveMqConnection(queue, broker, prefetch);
+	}
+	@Override
+	public Object getConnectionInfo() {
+		return connection;
+	}
+
+	public class ActiveMqConnection {
+		private final String queueName;
+		private final String brokerUrl;
+		private final int prefetch;
+		
+		ActiveMqConnection(String queue, String broker, int prefetch) {
+			this.queueName = queue;
+			this.brokerUrl = broker;
+			this.prefetch = prefetch;
+		}
+		public String getQueueName() {
+			return queueName;
+		}
+		public String getBrokerUrl() {
+			return brokerUrl;
+		}
+		public int getPrefetch() {
+			return prefetch;
+		}
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,9 @@
+package org.apache.uima.aae.message;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+
+public interface MessageProcessor {
+
+	public void process(MessageContext message) throws Exception;
+	public AnalysisEngineController getController();
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,6 @@
+package org.apache.uima.aae.message;
+
+public interface Origin {
+	public String getUniqueId();
+	public String getName();
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,36 @@
+package org.apache.uima.aae.message;
+
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.AsynchAEException;
+
+public interface UimaAsMessage {
+	public enum Command {GetMetaRequest, GetMetaResponse, CpcRegeuest, CpcResponse, ProcessRequest, ProcessResponse };
+	 
+	  public String getMessageStringProperty(String aMessagePropertyName) throws AsynchAEException;
+
+	  public int getMessageIntProperty(String aMessagePropertyName) throws AsynchAEException;
+
+	  public long getMessageLongProperty(String aMessagePropertyName) throws AsynchAEException;
+
+	  public Object getMessageObjectProperty(String aMessagePropertyName) throws AsynchAEException;
+
+	  public boolean getMessageBooleanProperty(String aMessagePropertyName) throws AsynchAEException;
+
+	  public Endpoint getEndpoint();
+
+	  public String getStringMessage() throws AsynchAEException;
+
+	  public Object getObjectMessage() throws AsynchAEException;
+
+	  public byte[] getByteMessage() throws AsynchAEException;
+
+	  public Object getRawMessage();
+
+	  public boolean propertyExists(String aKey) throws AsynchAEException;
+
+	  public void setMessageArrivalTime(long anArrivalTime);
+
+	  public long getMessageArrivalTime();
+
+	  public String getEndpointName();
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,55 @@
+package org.apache.uima.aae.message;
+
+import java.util.UUID;
+
+public class UimaAsOrigin implements Origin {
+
+	private final String uniqueId = UUID.randomUUID().toString();
+	private final String name;
+	
+	public UimaAsOrigin(String name) {
+		this.name = name;
+	}
+	@Override
+	public String getUniqueId() {
+		return uniqueId;
+	}
+
+	@Override
+	public int hashCode() {
+		final int prime = 31;
+		int result = 1;
+		result = prime * result + ((name == null) ? 0 : name.hashCode());
+		result = prime * result + ((uniqueId == null) ? 0 : uniqueId.hashCode());
+		return result;
+	}
+	@Override
+	public boolean equals(Object obj) {
+		if (this == obj)
+			return true;
+		if (obj == null)
+			return false;
+		if (getClass() != obj.getClass())
+			return false;
+		UimaAsOrigin other = (UimaAsOrigin) obj;
+		if (name == null) {
+			if (other.name != null)
+				return false;
+		} else if (!name.equals(other.name))
+			return false;
+		if (uniqueId == null) {
+			if (other.uniqueId != null)
+				return false;
+		} else if (!uniqueId.equals(other.uniqueId))
+			return false;
+		return true;
+	}
+	@Override
+	public String getName() {
+		return name;
+	}
+	@Override
+    public String toString() {
+        return "Origin[name: " + name + "] [id:"+uniqueId+"]";
+    }
+}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java Thu Oct 18 14:12:00 2018
@@ -20,6 +20,7 @@ package org.apache.uima.aae.service;
 
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.uima.aae.InProcessCache;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.as.client.DirectMessage;
 import org.apache.uima.cas.CAS;
@@ -34,6 +35,7 @@ public interface UimaASService {
 	public static final int STOP_NOW = 1001;
 	  
 	public String getEndpoint();
+	public int getScaleout();
 	public String getId();
 	public void start() throws Exception;
 	public void stop() throws Exception;
@@ -46,5 +48,5 @@ public interface UimaASService {
 	public void releaseCAS(String casReferenceId, BlockingQueue<DirectMessage> releaseCASQueue ) throws Exception;
 	public AnalysisEngineMetaData getMetaData() throws Exception; 
 	public void removeFromCache(String casReferenceId);
-
+	public UimaASService withInProcessCache(InProcessCache cache);
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java Thu Oct 18 14:12:00 2018
@@ -24,7 +24,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -37,9 +38,15 @@ import org.apache.uima.aae.OutputChannel
 import org.apache.uima.aae.UimaASUtils;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.AggregateAnalysisEngineComponent;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.ComponentCasPool;
+import org.apache.uima.aae.component.RemoteAnalysisEngineComponent;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
 import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.DelegateEndpoint;
 import org.apache.uima.aae.controller.Endpoint;
@@ -51,7 +58,6 @@ import org.apache.uima.aae.error.ErrorHa
 import org.apache.uima.aae.error.Threshold;
 import org.apache.uima.aae.error.Thresholds;
 import org.apache.uima.aae.error.Thresholds.Action;
-import org.apache.uima.aae.error.UimaAsDelegateException;
 import org.apache.uima.aae.error.handler.CpcErrorHandler;
 import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
 import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
@@ -78,7 +84,6 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resource.ResourceManager;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
-import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionType;
 import org.apache.uima.resourceSpecifier.AnalysisEngineType;
 import org.apache.uima.resourceSpecifier.AsyncAggregateErrorConfigurationType;
 import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
@@ -109,6 +114,246 @@ public abstract class AbstractUimaAsServ
 	}
     
     protected abstract void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception;
+
+//    public AnalysisEngineController createController( AnalysisEngineComponent component, int howManyInstances) throws Exception {
+    public AnalysisEngineController createController( AnalysisEngineComponent component, ControllerCallbackListener aListener, String serviceId) throws Exception {
+    	AnalysisEngineController controller =
+    			createController(component, null /*, component.getScaleout() */);
+    	controller.setServiceId(serviceId);
+    	controller.addControllerCallbackListener(aListener);
+    	return controller;
+    }
+
+    /**
+     * Recursively walks through the AE descriptor creating instances of AnalysisEngineController
+     * and linking them in parent-child tree. 
+     * 
+     * @param d - wrapper around delegate defined in DD (may be null)
+     * @param resourceSpecifier - AE descriptor specifier
+     * @param name - name of the delegate
+     * @param parentController - reference to a parent controller. TopLevel has no parent
+     * @param howManyInstances - scalout for the delegate
+     * 
+     * @return
+     * @throws Exception
+     */
+    public AnalysisEngineController createController( AnalysisEngineComponent component, AnalysisEngineController parentController/*, int howManyInstances */) throws Exception {
+
+    	AnalysisEngineController controller = null;
+     	System.out.println("---------Controller:"+
+     			component.getKey()+
+     			" resourceSpecifier:"+
+     			component.getResourceSpecifier().getClass().getName()+
+     			" ResourceCreationSpecifier:"+(component.getResourceSpecifier() instanceof ResourceCreationSpecifier) );
+
+     	if ( component.isPrimitive()) {
+       		controller = new PrimitiveAnalysisEngineController_impl(parentController, component.getKey(), component.getResourceSpecifier().getSourceUrlString(),casManager, cache, 10, component.getScaleout());
+     	} else {
+    		// add an endpoint for each delegate in this aggregate. The endpoint Map is required
+    		// during initialization of an aggregate controller.
+    		Map<String, Endpoint> endpoints = new HashMap<>();
+    		AggregateAnalysisEngineComponent aggregate;
+    		if ( component instanceof AggregateAnalysisEngineComponent) {
+    			aggregate = (AggregateAnalysisEngineComponent)component;
+    		} else if ( component instanceof TopLevelServiceComponent) {
+    			aggregate = ((TopLevelServiceComponent)component).aggregateComponent();
+    		} else {
+    			throw new RuntimeException("Expected instance of AggregateAnalysisEngineComponent, instead is instanceof "+component.getClass().getName());
+    		}
+//    		List<AnalysisEngineComponent> delegateComponents = ((AggregateAnalysisEngineComponent)component).getChildren();
+    		List<AnalysisEngineComponent> delegateComponents = aggregate.getChildren();
+    		for( AnalysisEngineComponent delegateComponent : delegateComponents ) {
+    			endpoints.put(delegateComponent.getKey(), delegateComponent.getEndpoint());
+    		}
+    		controller = new AggregateAnalysisEngineController_impl(parentController, component.getKey(), component.getResourceSpecifier().getSourceUrlString(), casManager, cache, endpoints);
+    		addFlowController((AggregateAnalysisEngineController)controller, (AnalysisEngineDescription)component.getResourceSpecifier());
+    		// recursively create delegate controllers for all async delegates
+    		createDelegateControllers(aggregate, controller);
+     	}
+   	    if ( !controller.isTopLevelComponent() ) {
+       		UimaASService service = createUimaASServiceWrapper(controller, component);
+    	    service.start();
+	    }
+
+    	return controller;
+    }
+
+	
+
+	private void createDelegateControllers(AggregateAnalysisEngineComponent aggregateComponent, AnalysisEngineController controller) throws Exception {
+		for (AnalysisEngineComponent delegateComponent : aggregateComponent.getChildren()) {
+			// if error handling threshold has not been defined for the delegate, add
+			// default thresholds.
+			addDelegateDefaultErrorHandling(controller, delegateComponent.getKey());
+			if (delegateComponent.isRemote()) {
+				Endpoint endpoint = delegateComponent.getEndpoint();
+				if ("java".equals(endpoint.getServerURI()) ) {
+					endpoint.setJavaRemote();
+				}
+				
+			} else {
+				if (Objects.isNull(controller.getOutputChannel(ENDPOINT_TYPE.DIRECT))) {
+					OutputChannel oc = new DirectOutputChannel().withController(controller);
+					oc.initialize();
+					controller.addOutputChannel(oc);
+				}
+				if (Objects.isNull(controller.getInputChannel(ENDPOINT_TYPE.DIRECT))) {
+					DirectInputChannel inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY)
+							.withController(controller);
+// 10/11/18 For Direct messaging the message handlers are not needed. Its using command factory
+//					inputChannel.setMessageHandler(getMessageHandler(controller));
+					controller.addInputChannel(inputChannel);
+					
+				}
+				createController(delegateComponent,	controller /*, scaleout */);
+			}
+
+		}
+
+	}
+	
+	
+    private UimaASService createUimaASServiceWrapper(AnalysisEngineController controller, AnalysisEngineComponent component) throws Exception {
+        
+    	AsynchronousUimaASService service = 
+    			new AsynchronousUimaASService(controller.getComponentName()).withController(controller);
+    	// Need an OutputChannel to dispatch messages from this service
+    	OutputChannel outputChannel;
+		if ( ( outputChannel = controller.getOutputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+			outputChannel = getOutputChannel(controller);
+		}
+    	 
+    	// Need an InputChannel to handle incoming messages
+    	InputChannel inputChannel;
+    	if ((inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+    		inputChannel = getInputChannel(controller);
+    		Handler messageHandlerChain = getMessageHandler(controller);
+			inputChannel.setMessageHandler(messageHandlerChain);
+			controller.addInputChannel(inputChannel);
+    	}
+
+		// add reply queue listener to the parent aggregate controller
+		if ( !controller.isTopLevelComponent() ) {
+			// For every delegate the parent controller needs a reply listener.
+			DirectListener replyListener = 
+					addDelegateReplyListener(controller, component);
+			// add process, getMeta, reply queues to an endpoint
+			setDelegateDestinations(controller, service, replyListener);
+		}
+		DirectListener processListener =		
+				createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getProcessRequestQueue(),Type.ProcessCAS);
+		inputChannel.registerListener(processListener);
+		
+		DirectListener getMetaListener =
+				createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getMetaRequestQueue(),Type.GetMeta);
+		inputChannel.registerListener(getMetaListener);
+		if (controller.isCasMultiplier()) {
+			DirectListener freCASChannelListener = 
+				createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getFreeCasQueue(),Type.FreeCAS);	
+			inputChannel.registerListener(freCASChannelListener);
+			((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+		}			
+		
+		/*
+		DirectListener processListener = new DirectListener(Type.ProcessCAS).
+				withController(controller).
+				withConsumerThreads(component.getScaleout()).
+				withInputChannel((DirectInputChannel)inputChannel).
+				withQueue(service.getProcessRequestQueue()).
+				initialize();
+		inputChannel.registerListener(processListener);
+		
+		DirectListener getMetaListener = new DirectListener(Type.GetMeta).
+				withController(controller).
+				withConsumerThreads(getReplyScaleout(d)).
+				withInputChannel((DirectInputChannel)inputChannel).
+				withQueue(service.getMetaRequestQueue()).initialize();
+		inputChannel.registerListener(getMetaListener);
+
+		if (controller.isCasMultiplier()) {
+			DirectListener freCASChannelListener = 
+					new DirectListener(Type.FreeCAS).
+					withController(controller).
+					withConsumerThreads(component.getScaleout()).
+					withInputChannel((DirectInputChannel)inputChannel).
+					withQueue(service.getFreeCasQueue()).
+					initialize();
+			inputChannel.registerListener(freCASChannelListener);
+			((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+		}
+    	*/
+    	return service;
+    }
+	private DirectListener createDirectListener(AnalysisEngineController controller, int scaleout, DirectInputChannel inputChannel, BlockingQueue<DirectMessage> q, Type type) throws Exception{
+//		DirectListener listener = new DirectListener(type).
+		return new DirectListener(type).
+				withController(controller).
+				withConsumerThreads(scaleout).
+				withInputChannel(inputChannel).
+				withQueue(q).initialize();
+//		inputChannel.registerListener(listener);
+//		return listener;
+	}
+    private DirectListener addDelegateReplyListener(AnalysisEngineController controller, AnalysisEngineComponent component) throws Exception {
+		DirectInputChannel parentInputChannel;
+		// create parent controller's input channel if necessary
+		if ((controller.getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+			// create delegate 
+			parentInputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).
+					withController(controller.getParentController());
+			Handler messageHandlerChain = getMessageHandler(controller.getParentController());
+			parentInputChannel.setMessageHandler(messageHandlerChain);
+			controller.getParentController().addInputChannel(parentInputChannel);
+		} else {
+			parentInputChannel = (DirectInputChannel) controller.
+					getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT);
+		}
+		int replyScaleout = 1;
+		if ( component instanceof RemoteAnalysisEngineComponent) {
+			((RemoteAnalysisEngineComponent)component).getReplyConsumerCount();
+		}
+
+		// USE FACTORY HERE. CHANGE DirectListener to interface
+		// DirectListner replyListener = DirectListenerFactory.newReplyListener();
+		DirectListener replyListener = new DirectListener(Type.Reply).
+				withController(controller.getParentController()).
+				withConsumerThreads(replyScaleout).
+				withInputChannel(parentInputChannel).
+				withQueue(new LinkedBlockingQueue<DirectMessage>()).
+				withName(controller.getKey()).
+				initialize();
+		parentInputChannel.registerListener(replyListener);
+		
+		return replyListener;
+    }
+	protected void initialize(UimaASService service, ComponentCasPool cp, Transport transport) {
+
+		resourceManager = UimaClassFactory.produceResourceManager();
+		casManager = new AsynchAECasManager_impl(resourceManager);
+		casManager.setCasPoolSize(cp.getPoolSize());
+		casManager.setDisableJCasCache(cp.isDisableJCasCache());
+		casManager.setInitialFsHeapSize(cp.getInitialHeapSize());
+
+		if ( transport.equals(Transport.JMS)) {
+			cache = new InProcessCache();
+		} else if ( transport.equals(Transport.Java)) {
+			
+			// ?????????????????????????? is this test necessary?
+			if ( (cache = (InProcessCache)System.getProperties().get("InProcessCache")) == null) {
+				cache = new InProcessCache();
+				System.getProperties().put("InProcessCache", cache);
+			} 
+	
+		}
+//		if ( cache == null ) {
+//			cache = new InProcessCache();
+//		}
+	}
+	
+	
+    /* 
+     * OLD CODE *****************************
+     */
     
     public AsyncPrimitiveErrorConfigurationType addDefaultErrorHandling(ServiceType s) {
     	AsyncPrimitiveErrorConfigurationType pec;
@@ -196,13 +441,16 @@ public abstract class AbstractUimaAsServ
 		return null;
     }
     private void addDelegateDefaultErrorHandling(AnalysisEngineController controller, String delegatKey) {
-    	ErrorHandlerChain erc = controller.getErrorHandlerChain();
-    	for( ErrorHandler eh : erc ) {
-    		if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
-    			// add default error handling
-    			eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold());
-    		}
-    	}	
+    	if ( Objects.nonNull(controller.getErrorHandlerChain()) ) {
+    	   	ErrorHandlerChain erc = controller.getErrorHandlerChain();
+        	for( ErrorHandler eh : erc ) {
+        		if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
+        			// add default error handling
+        			eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold());
+        		}
+        	}	
+    	}
+ 
     }
     private OutputChannel getOutputChannel(AnalysisEngineController controller ) throws Exception {
     	OutputChannel outputChannel = null;
@@ -1244,7 +1492,7 @@ public abstract class AbstractUimaAsServ
 	private boolean isAggregate(AnalysisEngineType aet) {
 		// Is this an aggregate? An aggregate has a property async=true or has delegates.
 		System.out.println("......"+aet.getKey()+" aet.getAsync()="+aet.getAsync()+" aet.isSetAsync()="+aet.isSetAsync()+" aet.isSetDelegates()="+aet.isSetDelegates() );
-
+		Objects.requireNonNull(aet, "AnalysisEngineType must be non-null");
 		if ( "true".equals(aet.getAsync()) || aet.isSetDelegates() ) {
 			return true;
 		}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java Thu Oct 18 14:12:00 2018
@@ -24,13 +24,12 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import javax.management.ServiceNotFoundException;
-
 import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.InputChannel.ChannelType;
 import org.apache.uima.aae.OutputChannel;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
 import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -46,7 +45,6 @@ import org.apache.uima.aae.error.handler
 import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
 import org.apache.uima.aae.handler.Handler;
 import org.apache.uima.aae.service.AsynchronousUimaASService;
-import org.apache.uima.aae.service.ServiceRegistry;
 import org.apache.uima.aae.service.UimaASService;
 import org.apache.uima.aae.service.UimaAsServiceRegistry;
 import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
@@ -61,8 +59,6 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
 import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
 import org.apache.uima.resourceSpecifier.CasPoolType;
-import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
-import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
 import org.apache.uima.resourceSpecifier.ServiceType;
 
 public class UimaAsDirectServiceBuilder extends AbstractUimaAsServiceBuilder  {
@@ -92,6 +88,144 @@ public class UimaAsDirectServiceBuilder
 		}
 	}
 
+
+	public UimaASService build(TopLevelServiceComponent topLevelComponent, ControllerCallbackListener callback)
+			throws Exception {
+		AsynchronousUimaASService service = null;
+		
+		// is this the only one resource specifier type supported  by the current uima-as?
+		if (topLevelComponent.getResourceSpecifier() instanceof AnalysisEngineDescription) {
+			AnalysisEngineDescription aeDescriptor = 
+					(AnalysisEngineDescription) topLevelComponent.getResourceSpecifier();
+			String endpoint = resolvePlaceholder(topLevelComponent.getEndpoint().getEndpoint());
+			// Create a Top Level Service (TLS) wrapper. This wrapper may contain
+			// references to multiple TLS service instances if the TLS is scaled
+			// up.
+			service = new AsynchronousUimaASService(endpoint)
+					.withName(aeDescriptor.getAnalysisEngineMetaData().getName())
+					.withResourceSpecifier(topLevelComponent.getResourceSpecifier())
+					.withScaleout(topLevelComponent.getScaleout());
+
+			this.buildAndDeploy(topLevelComponent, service, callback);
+			
+
+		}
+		return service;
+	}
+	public UimaASService buildAndDeploy(TopLevelServiceComponent topLevelComponent, AsynchronousUimaASService service, ControllerCallbackListener callback) throws Exception {
+		// create ResourceManager, CasManager, and InProcessCache
+		initialize(service, topLevelComponent.getComponentCasPool(), Transport.Java); 
+
+		AnalysisEngineController topLevelController = 
+				createController(topLevelComponent, callback, service.getId());
+		
+		//topLevelController.addControllerCallbackListener(callback);
+
+		//topLevelController.setServiceId(service.getId());
+		
+		service.withInProcessCache(super.cache);
+		System.setProperty("BrokerURI", "Direct");
+		configureTopLevelService(topLevelController, service);//, topLevelComponent.getScaleout());
+		return service;
+
+	}
+	
+	private DirectOutputChannel outputChannel(AnalysisEngineController topLevelController) throws Exception {
+		DirectOutputChannel outputChannel = null;
+		if (topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+			outputChannel = new DirectOutputChannel().withController(topLevelController);
+			topLevelController.addOutputChannel(outputChannel);
+		} else {
+			outputChannel = (DirectOutputChannel) topLevelController.
+					getOutputChannel(ENDPOINT_TYPE.DIRECT);
+		}
+		return outputChannel;
+	}
+	private DirectInputChannel inputChannel(AnalysisEngineController topLevelController) throws Exception {
+		DirectInputChannel inputChannel;
+		if ((topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+			inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).
+					withController(topLevelController);
+			Handler messageHandlerChain = getMessageHandler(topLevelController);
+			inputChannel.setMessageHandler(messageHandlerChain);
+			topLevelController.addInputChannel(inputChannel);
+		} else {
+			inputChannel = (DirectInputChannel) topLevelController.
+					getInputChannel(ENDPOINT_TYPE.DIRECT);
+		}
+		return inputChannel;
+	}
+/*	
+	private void configureTopLevelService(AnalysisEngineController topLevelController,
+			AsynchronousUimaASService service,  int howMany) throws Exception {
+*/
+	private void configureTopLevelService(AnalysisEngineController topLevelController,
+			AsynchronousUimaASService service) throws Exception {
+		
+		//addErrorHandling(topLevelController, pec);
+
+
+		// create a single instance of OutputChannel for Direct communication if
+		// necessary
+		DirectOutputChannel outputChannel = outputChannel(topLevelController);
+
+		DirectInputChannel inputChannel = inputChannel(topLevelController);
+
+		if ( topLevelController instanceof AggregateAnalysisEngineController ) {
+			((AggregateAnalysisEngineController_impl)topLevelController).
+				setServiceEndpointName(service.getEndpoint());
+		}
+		BlockingQueue<DirectMessage> pQ = null; 
+		BlockingQueue<DirectMessage> mQ = null; 
+
+		// Lookup queue name in service registry. If this queue exists, the new service
+		// being
+		// created here will share the same queue to balance the load.
+		UimaASService s;
+		try {
+			s = UimaAsServiceRegistry.getInstance().lookupByEndpoint(service.getEndpoint());
+			if ( s instanceof AsynchronousUimaASService) {
+				pQ = ((AsynchronousUimaASService) s).getProcessRequestQueue();
+				mQ = ((AsynchronousUimaASService) s).getMetaRequestQueue();
+			}
+
+		} catch( Exception ee) {
+			pQ = service.getProcessRequestQueue();
+			mQ = service.getMetaRequestQueue();
+		}
+
+		scaleout = service.getScaleout();
+		DirectListener processListener = new DirectListener(Type.ProcessCAS).withController(topLevelController)
+				.withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(pQ).
+				initialize();
+
+		DirectListener getMetaListener = new DirectListener(Type.GetMeta).withController(topLevelController)
+				.withConsumerThreads(1).withInputChannel(inputChannel).
+				withQueue(mQ).initialize();
+
+		addFreeCASListener(service, topLevelController, inputChannel, outputChannel, scaleout );
+
+		inputChannel.registerListener(getMetaListener);
+		inputChannel.registerListener(processListener);
+
+		service.withController(topLevelController);
+		
+	}
+	
+	
+	
+	
+	
+	
+	
+	
+	
+	
+	/* 
+	 * OLD CODE **********************************************************************************
+	 */
+	
+	
 	
 	public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback)
 			throws Exception {

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java Thu Oct 18 14:12:00 2018
@@ -47,27 +47,30 @@ import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo;
 import org.apache.uima.cas.impl.Serialization;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.resource.metadata.ResourceMetaData;
 import org.apache.uima.util.Level;
 
 public abstract class AbstractUimaAsCommand implements UimaAsCommand {
 	protected AnalysisEngineController controller;
 	private Object mux = new Object();
-
-	protected AbstractUimaAsCommand(AnalysisEngineController controller) {
+	private final MessageContext messageContext;
+	
+	protected AbstractUimaAsCommand(AnalysisEngineController controller, MessageContext aMessageContext) {
 		this.controller = controller;
+		this.messageContext = aMessageContext;
 	}
 
-	protected String getCasReferenceId(Class<?> concreteClassName, MessageContext aMessageContext) throws AsynchAEException {
-		if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) {
+	protected String getCasReferenceId(Class<?> concreteClassName/*, MessageContext aMessageContext */) throws AsynchAEException {
+		if (!messageContext.propertyExists(AsynchAEMessage.CasReference)) {
 			if (UIMAFramework.getLogger(concreteClassName).isLoggable(Level.INFO)) {
 				UIMAFramework.getLogger(concreteClassName).logrb(Level.INFO, concreteClassName.getName(),
 						"getCasReferenceId", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
 						"UIMAEE_message_has_cas_refid__INFO",
-						new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+						new Object[] { messageContext.getEndpoint().getEndpoint() });
 			}
 			return null;
 		}
-		return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+		return messageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
 	}
 
 	protected CacheEntry getCacheEntryForCas(String casReferenceId) {
@@ -95,7 +98,7 @@ public abstract class AbstractUimaAsComm
 		return (controller.isTopLevelComponent() && controller instanceof AggregateAnalysisEngineController);
 	}
 
-	protected void handleError(Exception e, CacheEntry cacheEntry, MessageContext mc) {
+	protected void handleError(Exception e, CacheEntry cacheEntry/*, MessageContext mc */) {
 		if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
 			UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "handleError",
 					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
@@ -105,7 +108,7 @@ public abstract class AbstractUimaAsComm
 					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
 		}
 		ErrorContext errorContext = new ErrorContext();
-		errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+		errorContext.add(AsynchAEMessage.Endpoint, messageContext.getEndpoint());
 		errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
 		errorContext.add(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId());
 		controller.dropCAS(cacheEntry.getCas());
@@ -154,31 +157,69 @@ public abstract class AbstractUimaAsComm
 
 	}
 
-	protected static ErrorContext populateErrorContext(MessageContext aMessageCtx) {
+	protected ErrorContext populateErrorContext(/*MessageContext aMessageCtx */) {
 		ErrorContext errorContext = new ErrorContext();
-		if (aMessageCtx != null) {
+		if (messageContext != null) {
 			try {
-				if (aMessageCtx.propertyExists(AsynchAEMessage.Command)) {
+				if (messageContext.propertyExists(AsynchAEMessage.Command)) {
 					errorContext.add(AsynchAEMessage.Command,
-							aMessageCtx.getMessageIntProperty(AsynchAEMessage.Command));
+							messageContext.getMessageIntProperty(AsynchAEMessage.Command));
 				}
 
-				if (aMessageCtx.propertyExists(AsynchAEMessage.MessageType)) {
+				if (messageContext.propertyExists(AsynchAEMessage.MessageType)) {
 					errorContext.add(AsynchAEMessage.MessageType,
-							aMessageCtx.getMessageIntProperty(AsynchAEMessage.MessageType));
+							messageContext.getMessageIntProperty(AsynchAEMessage.MessageType));
 				}
 
-				if (aMessageCtx.propertyExists(AsynchAEMessage.CasReference)) {
+				if (messageContext.propertyExists(AsynchAEMessage.CasReference)) {
 					errorContext.add(AsynchAEMessage.CasReference,
-							aMessageCtx.getMessageStringProperty(AsynchAEMessage.CasReference));
+							messageContext.getMessageStringProperty(AsynchAEMessage.CasReference));
 				}
-				errorContext.add(UIMAMessage.RawMsg, aMessageCtx.getRawMessage());
+				errorContext.add(UIMAMessage.RawMsg, messageContext.getRawMessage());
 			} catch (Exception e) { /* ignore */
 			}
 		}
 		return errorContext;
 	}
-
+	protected Endpoint getEndpoint() {
+		return messageContext.getEndpoint();
+	}
+	protected int getMessageIntProperty(String propertyName) throws Exception {
+		return messageContext.getMessageIntProperty(propertyName);
+	}
+	protected String getMessageStringProperty(String propertyName) throws Exception {
+		return messageContext.getMessageStringProperty(propertyName);
+	}
+	protected ResourceMetaData getResourceMetaData() throws Exception {
+		return (ResourceMetaData)messageContext.getMessageObjectProperty(AsynchAEMessage.AEMetadata);
+	}
+	protected String getStringMessage() throws Exception {
+		return messageContext.getStringMessage();
+	}
+	protected Object getMessageObjectProperty(String propertyName) throws Exception {
+		return messageContext.getMessageObjectProperty(propertyName);
+	}
+	protected boolean getMessageBooleanProperty(String propertyName) throws Exception {
+		return messageContext.getMessageBooleanProperty(propertyName);
+	}
+	protected long getMessageLongProperty( String propertyName) throws Exception {
+		return messageContext.getMessageLongProperty(propertyName);
+	}
+	protected boolean propertyExists(String propertyName) throws Exception {
+		return messageContext.propertyExists(propertyName);
+	}
+	protected String getEndpointName() {
+		return messageContext.getEndpointName();
+	}
+	protected Object getObjectMessage() throws Exception {
+		return messageContext.getObjectMessage();
+	}
+	protected byte[] getByteMessage() throws Exception {
+		return messageContext.getByteMessage();
+	}
+	protected MessageContext getMessageContext() {
+		return messageContext;
+	}
 	protected Endpoint fetchParentCasOrigin(String parentCasId) throws AsynchAEException {
 		Endpoint endpoint = null;
 		String parentId = parentCasId;
@@ -224,8 +265,8 @@ public abstract class AbstractUimaAsComm
 		return cas;
 	}
 
-	protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint,
-			MessageContext mc) throws Exception {
+	protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint
+			/*MessageContext mc*/) throws Exception {
 		SerializationResult result = new SerializationResult();
 
 		// Aggregate time spent waiting for a CAS in the shadow cas pool
@@ -250,17 +291,17 @@ public abstract class AbstractUimaAsComm
 		// Create deserialized wrapper for XMI, BINARY, COMPRESSED formats. To add
 		// a new serialization format add a new class which implements
 		// UimaASDeserializer and modify DeserializerFactory class.
-		UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+		UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, messageContext);
 		deserializer.deserialize(result);
 
 		return result;
 	}
 
-	protected SerializationResult deserializeInputCAS( MessageContext mc)
+	protected SerializationResult deserializeInputCAS()
 			throws Exception {
 		SerializationResult result = new SerializationResult();
-		String origin = mc.getEndpoint().getEndpoint();
-		Endpoint endpoint = mc.getEndpoint();
+		String origin = messageContext.getEndpoint().getEndpoint();
+		Endpoint endpoint = messageContext.getEndpoint();
 		
 		// Time how long we wait on Cas Pool to fetch a new CAS
 		long t1 = controller.getCpuTime();
@@ -276,20 +317,20 @@ public abstract class AbstractUimaAsComm
 			return null;
 		}
 
-		UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+		UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, messageContext);
 		deserializer.deserialize(result);
 
 		return result;
 	}
-	protected Delegate getDelegate(MessageContext mc) throws AsynchAEException {
+	protected Delegate getDelegate(/* MessageContext mc */) throws AsynchAEException {
 		String delegateKey = null;
-		if (mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length() == 0) {
-			String fromEndpoint = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+		if (messageContext.getEndpoint().getEndpoint() == null || messageContext.getEndpoint().getEndpoint().trim().length() == 0) {
+			String fromEndpoint = messageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
 			delegateKey = ((AggregateAnalysisEngineController) controller)
 					.lookUpDelegateKey(fromEndpoint);
 		} else {
 			delegateKey = ((AggregateAnalysisEngineController) controller)
-					.lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+					.lookUpDelegateKey(messageContext.getEndpoint().getEndpoint());
 		}
 		return ((AggregateAnalysisEngineController) controller).lookupDelegate(delegateKey);
 	}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java Thu Oct 18 14:12:00 2018
@@ -23,14 +23,14 @@ import org.apache.uima.aae.controller.En
 import org.apache.uima.aae.message.MessageContext;
 
 public class CollectionProcessCompleteRequestCommand  extends AbstractUimaAsCommand {
-	private MessageContext mc;
+//	private MessageContext mc;
 	
 	public CollectionProcessCompleteRequestCommand(MessageContext mc, AnalysisEngineController controller) {
-		super(controller);
-		this.mc = mc;
+		super(controller, mc);
+//		this.mc = mc;
 	}
 	public void execute() throws Exception {
-        Endpoint endpoint = mc.getEndpoint();
-        controller.collectionProcessComplete(endpoint);
+//        Endpoint endpoint = mc.getEndpoint();
+        controller.collectionProcessComplete(super.getEndpoint());
 	}
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java Thu Oct 18 14:12:00 2018
@@ -26,14 +26,14 @@ import org.apache.uima.aae.message.Async
 import org.apache.uima.aae.message.MessageContext;
 
 public class CollectionProcessCompleteResponseCommand  extends AbstractUimaAsCommand {
-	private MessageContext mc;
+//	private MessageContext mc;
 	
 	public CollectionProcessCompleteResponseCommand(MessageContext mc, AnalysisEngineController controller) {
-		super(controller);
-		this.mc = mc;
+		super(controller,mc);
+//		this.mc = mc;
 	}
 	public void execute() throws Exception {
-		Delegate delegate = super.getDelegate(mc);
+		Delegate delegate = super.getDelegate();
 	    try {
 	    	System.out.println("..... Controller:"+controller.getComponentName()+" Handling CPC From "+delegate.getKey());
 	          ((AggregateAnalysisEngineController)controller)
@@ -41,7 +41,7 @@ public class CollectionProcessCompleteRe
 	      } catch (Exception e) {
 	        ErrorContext errorContext = new ErrorContext();
 	        errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
-	        errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+	        errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint());
 	        controller.getErrorHandlerChain().handle(e, errorContext, controller);
 	      }
 	}