You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [8/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima...

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,1074 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.builder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.uima.aae.AsynchAECasManager_impl;
+import org.apache.uima.aae.InProcessCache;
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.UimaASUtils;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.DelegateEndpoint;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
+import org.apache.uima.aae.error.ErrorHandler;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.error.Threshold;
+import org.apache.uima.aae.error.Thresholds;
+import org.apache.uima.aae.error.UimaAsDelegateException;
+import org.apache.uima.aae.error.handler.CpcErrorHandler;
+import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
+import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
+import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.handler.input.MetadataRequestHandler_impl;
+import org.apache.uima.aae.handler.input.MetadataResponseHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessRequestHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessResponseHandler;
+import org.apache.uima.aae.service.AsynchronousUimaASService;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.delegate.AggregateAnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.CasMultiplierNature;
+import org.apache.uima.aae.service.delegate.RemoteAnalysisEngineDelegate;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.analysis_engine.metadata.FlowConstraints;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.DirectListener;
+import org.apache.uima.as.client.DirectMessage;
+import org.apache.uima.as.client.DirectOutputChannel;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.cas.SerialFormat;
+import org.apache.uima.resource.ResourceCreationSpecifier;
+import org.apache.uima.resource.ResourceManager;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AnalysisEngineType;
+import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
+import org.apache.uima.resourceSpecifier.CasMultiplierType;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
+import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.DelegatesType;
+import org.apache.uima.resourceSpecifier.EnvironmentVariableType;
+import org.apache.uima.resourceSpecifier.EnvironmentVariablesType;
+import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
+import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.apache.uima.resourceSpecifier.TopLevelAnalysisEngineType;
+import org.apache.xmlbeans.XmlDocumentProperties;
+
+public abstract class AbstractUimaAsServiceBuilder implements ServiceBuilder {
+	protected InProcessCache cache;
+	protected AsynchAECasManager_impl casManager;
+    protected ResourceManager resourceManager;
+    private static final String NoParent= "NoParent";
+    private  enum FlowControllerType {
+		FIXED
+	}
+    
+    protected abstract void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception;
+    
+    public AsyncPrimitiveErrorConfigurationType addDefaultErrorHandling(ServiceType s) {
+    	AsyncPrimitiveErrorConfigurationType pec;
+    	if ( s.getAnalysisEngine() == null ) {
+    		s.addNewAnalysisEngine();
+    	}
+    	pec = s.getAnalysisEngine().addNewAsyncPrimitiveErrorConfiguration();
+		// Create default error handling for process CAS requests
+		// By default, any error will result in process termination
+		ProcessCasErrorsType pcet = pec.addNewProcessCasErrors();
+		pcet.setContinueOnRetryFailure("false");
+		pcet.setMaxRetries(0);
+		pcet.setThresholdAction("terminate");
+		pcet.setThresholdCount(0);
+		pcet.setThresholdWindow(0);
+		pcet.setTimeout(0); // no timeout
+
+		CollectionProcessCompleteErrorsType cpcet = pec.addNewCollectionProcessCompleteErrors();
+		cpcet.setTimeout(0);
+    	
+    	return pec;
+    }
+    
+    /**
+     * A DD may optionally specify env vars for this process. If specified in DD, add
+     * all env vars to this process environment.
+     * 
+     * @param dd - deployment descriptor object
+     */
+    public void addEnvironmentVariablesFromDD(AnalysisEngineDeploymentDescriptionDocument dd) {
+    	
+    	ServiceType st = getService(dd);
+    	if ( st != null && st.getEnvironmentVariables() != null ) {
+        	EnvironmentVariablesType vars =
+        			st.getEnvironmentVariables();
+        	for( EnvironmentVariableType envVar : vars.getEnvironmentVariableArray() ) {
+        		System.getenv().put(envVar.getName(), envVar.getStringValue());
+        	}
+    	}
+    }
+    /**
+     * Add JmsInputChannel to handle replies from remote delegates
+     * 
+     * @param controller
+     * @param type
+     * @throws Exception
+     */
+    private void addInputChannelForRemoteReplies(AnalysisEngineController controller, ChannelType type) throws Exception {
+/*
+ 
+     	DISABLE THIS FOR NOW. ENABLE WHEN READY TO TEST AGGRREGATE WITH
+     	REMOTE DELEGATE
+     	
+     	
+    	// This needs to be JmsInputChannel since it will process remote replies which are JMS based
+    	InputChannel inputChannel = UimaAsJmsServiceBuilder.createInputChannel(type);
+    	// setup bi-directional link between controller and an input channel
+    	inputChannel.setController(controller);
+    	controller.addInputChannel(inputChannel);
+    			
+    	inputChannel.setEndpointName("");
+    	// add message handlers. These will handle GetMeta, Process, CPC replies
+    	inputChannel.setMessageHandler(getMessageHandler(controller));
+*/
+ 
+    }
+    /**
+     * Check if a given delegate is defined in a DD. This is optional in a DD, but
+     * if present there is a configuration which may override default scalout, error
+     * handling (remotes only), and if the delegate is remote.
+     * 
+     * @param targetDelegateKey - key of the delegate to find
+     * @param ddAggregate - DD aggregate instance
+     * @return
+     */
+    private AnalysisEngineDelegate getMatchingDelegateFromDD(String targetDelegateKey, AggregateAnalysisEngineDelegate ddAggregate) {
+		if ( ddAggregate.getDelegates() != null ) {
+	    	for( AnalysisEngineDelegate delegate : ddAggregate.getDelegates() ) {
+				if ( targetDelegateKey.equals(delegate.getKey())) {
+					return delegate;
+				}
+			}
+		}
+		
+		return null;
+    }
+    private void addDelegateDefaultErrorHandling(AnalysisEngineController controller, String delegatKey) {
+    	ErrorHandlerChain erc = controller.getErrorHandlerChain();
+    	for( ErrorHandler eh : erc ) {
+    		if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
+    			// add default error handling
+    			eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold());
+    		}
+    	}	
+    }
+    private OutputChannel getOutputChannel(AnalysisEngineController controller ) throws Exception {
+    	OutputChannel outputChannel = null;
+//		if (controller.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+			outputChannel = UimaAsDirectServiceBuilder.createOutputChannel(controller);
+			controller.addOutputChannel(outputChannel);
+//		} else {
+//			outputChannel = controller.getOutputChannel(ENDPOINT_TYPE.DIRECT);
+//		}
+    	
+		return outputChannel;
+    }
+    private InputChannel getInputChannel(AnalysisEngineController controller) throws Exception {
+    	InputChannel inputChannel;
+//    	if ((controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+    		inputChannel = UimaAsDirectServiceBuilder.createInputChannel(ChannelType.REQUEST_REPLY);
+    		inputChannel.setController(controller); 
+			
+//		} else {
+//			inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT);
+//		}
+		return inputChannel;
+    }
+    private int getScaleout(AnalysisEngineDelegate d) {
+		int scaleout = 1;  // default
+		if ( d != null && d.getScaleout() > 1 ) {
+			// fetch scaleout from the DD spec for this delegate
+			scaleout = d.getScaleout();
+		}
+		return scaleout;
+    }
+    private int getReplyScaleout(AnalysisEngineDelegate d) {
+		int scaleout = 1;  // default
+		if ( d != null && d.getReplyScaleout() > 1 ) {
+			// fetch scaleout from the DD spec for this delegate
+			scaleout = d.getReplyScaleout();
+		}
+		return scaleout;
+    }
+    private void setDelegateDestinations(AnalysisEngineController controller, AsynchronousUimaASService service, DirectListener replyListener) {
+		Map<String,Endpoint> aggregateDelegateEndpoints =
+				((AggregateAnalysisEngineController_impl)controller.getParentController()).getDestinations();
+		Endpoint endpoint = aggregateDelegateEndpoints.get(controller.getKey());
+		endpoint.setReplyDestination(replyListener.getEndpoint());
+		endpoint.setGetMetaDestination(service.getMetaRequestQueue());
+		endpoint.setDestination(service.getProcessRequestQueue());
+
+    }
+    private DirectListener addDelegateReplyListener(AnalysisEngineController controller, AnalysisEngineDelegate d) throws Exception {
+		DirectInputChannel parentInputChannel;
+		// create parent controller's input channel if necessary
+		if ((controller.getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+			// create delegate 
+			parentInputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).
+					withController(controller.getParentController());
+			Handler messageHandlerChain = getMessageHandler(controller.getParentController());
+			parentInputChannel.setMessageHandler(messageHandlerChain);
+			controller.getParentController().addInputChannel(parentInputChannel);
+		} else {
+			parentInputChannel = (DirectInputChannel) controller.
+					getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT);
+		}
+		int replyScaleout = 1;
+		if ( d != null && d.getReplyScaleout() > 1) {
+			replyScaleout = d.getReplyScaleout();
+		}
+
+		// USE FACTORY HERE. CHANGE DirectListener to interface
+		// DirectListner replyListener = DirectListenerFactory.newReplyListener();
+		DirectListener replyListener = new DirectListener(Type.Reply).
+				withController(controller.getParentController()).
+				withConsumerThreads(replyScaleout).
+				withInputChannel(parentInputChannel).
+				withQueue(new LinkedBlockingQueue<DirectMessage>()).
+				withName(controller.getKey()).
+				initialize();
+		parentInputChannel.registerListener(replyListener);
+		return replyListener;
+    }
+
+    private UimaASService createUimaASServiceWrapper(AnalysisEngineController controller, AnalysisEngineDelegate d) throws Exception {
+    
+    	AsynchronousUimaASService service = 
+    			new AsynchronousUimaASService(controller.getComponentName()).withController(controller);
+    	// Need an OutputChannel to dispatch messages from this service
+    	OutputChannel outputChannel;
+		if ( ( outputChannel = controller.getOutputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+			outputChannel = getOutputChannel(controller);
+		}
+    	 
+    	// Need an InputChannel to handle incoming messages
+    	InputChannel inputChannel;
+    	if ((inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+    		inputChannel = getInputChannel(controller);
+    		Handler messageHandlerChain = getMessageHandler(controller);
+			inputChannel.setMessageHandler(messageHandlerChain);
+			controller.addInputChannel(inputChannel);
+    	}
+
+		// add reply queue listener to the parent aggregate controller
+		if ( !controller.isTopLevelComponent() ) {
+			// For every delegate the parent controller needs a reply listener.
+			DirectListener replyListener = 
+					addDelegateReplyListener(controller, d);
+			// add process, getMeta, reply queues to an endpoint
+			setDelegateDestinations(controller, service, replyListener);
+		}
+
+		DirectListener processListener = new DirectListener(Type.ProcessCAS).
+				withController(controller).
+				withConsumerThreads(getScaleout(d)).
+				withInputChannel((DirectInputChannel)inputChannel).
+				withQueue(service.getProcessRequestQueue()).
+				initialize();
+		inputChannel.registerListener(processListener);
+
+		DirectListener getMetaListener = new DirectListener(Type.GetMeta).
+				withController(controller).
+				withConsumerThreads(getReplyScaleout(d)).
+				withInputChannel((DirectInputChannel)inputChannel).
+				withQueue(service.getMetaRequestQueue()).initialize();
+		inputChannel.registerListener(getMetaListener);
+
+		if (controller.isCasMultiplier()) {
+			DirectListener freCASChannelListener = 
+					new DirectListener(Type.FreeCAS).
+					withController(controller).
+					withConsumerThreads(getScaleout(d)).
+					withInputChannel((DirectInputChannel)inputChannel).
+					withQueue(service.getFreeCasQueue()).
+					initialize();
+			inputChannel.registerListener(freCASChannelListener);
+			((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+		}
+    	
+    	return service;
+    }
+    private boolean isAggregate(AnalysisEngineDelegate d, ResourceSpecifier resourceSpecifier) {
+    	boolean aggregate = false;
+    	if (resourceSpecifier instanceof AnalysisEngineDescription ) {
+    		AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+    		if ( d != null ) {
+    			if ((d instanceof AggregateAnalysisEngineDelegate) || 
+    				(d.isAsync() && !d.isPrimitive()) ) {
+    				aggregate = true;
+    			}
+    		} else if ( !aeDescriptor.isPrimitive() ) {
+    			aggregate = true;
+    		}
+    	}
+    	return aggregate;
+    }
+
+    private void createDelegateControllers( AnalysisEngineDelegate d, Map<String, ResourceSpecifier> delegates, AnalysisEngineController controller) throws Exception {
+		for (Map.Entry<String, ResourceSpecifier> delegate : delegates.entrySet()) {
+			// if error handling threshold has not been defined for the delegate, add 
+			// default thresholds.
+			addDelegateDefaultErrorHandling(controller, delegate.getKey());
+			AnalysisEngineDelegate aed = null;
+			int scaleout = 1;
+			// fetch delegate configuration from a DD if it exists. The DD configuration 
+			// is optional but may exist to support default overrides for scaleout, 
+			// error handling(remotes only), and connectivity to a remote (broker,queue)
+			if ( d != null ) {
+				aed = getMatchingDelegateFromDD(delegate.getKey(), (AggregateAnalysisEngineDelegate)d);
+			}
+			// check if DD configuration exists and this delegate has been configured as a remote
+   	        if ( aed != null && aed.isRemote() ) { 
+   	        	Endpoint_impl endpoint = 
+   	        			(Endpoint_impl)((AggregateAnalysisEngineController)controller).getDestinations().get(delegate.getKey());
+   	        	if ( endpoint.getServerURI().equals("java")) {
+   	        		endpoint.setJavaRemote();
+   	        	}
+   				configureRemoteDelegate((RemoteAnalysisEngineDelegate)aed, (AggregateAnalysisEngineController)controller, endpoint);
+   			} else {
+   				// Not a remote, but it still may have DD configuration to 
+				if ( aed != null ) {
+					scaleout = aed.getScaleout();
+				}
+				if ( controller.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+					OutputChannel oc = new DirectOutputChannel().withController(controller);
+					oc.initialize();
+					controller.addOutputChannel(oc);
+				}
+				    if ( (controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null ) {
+				    	DirectInputChannel inputChannel = 
+				    			new DirectInputChannel(ChannelType.REQUEST_REPLY).
+				    			withController(controller);
+					    Handler messageHandlerChain = getMessageHandler(controller);
+					    inputChannel.setMessageHandler(messageHandlerChain);
+					    controller.addInputChannel(inputChannel);
+				    }
+   				ResourceSpecifier delegateResourceSpecifier = UimaClassFactory.produceResourceSpecifier(delegate.getValue().getSourceUrlString());
+   				createController( aed, delegateResourceSpecifier, delegate.getKey(), controller, scaleout);
+			}
+		}
+
+    }
+    /**
+     * Recursively walks through the AE descriptor creating instances of AnalysisEngineController
+     * and linking them in parent-child tree. 
+     * 
+     * @param d - wrapper around delegate defined in DD (may be null)
+     * @param resourceSpecifier - AE descriptor specifier
+     * @param name - name of the delegate
+     * @param parentController - reference to a parent controller. TopLevel has no parent
+     * @param howManyInstances - scalout for the delegate
+     * 
+     * @return
+     * @throws Exception
+     */
+    public AnalysisEngineController createController( AnalysisEngineDelegate d, ResourceSpecifier resourceSpecifier, String name, AnalysisEngineController parentController, int howManyInstances) throws Exception {
+    	AnalysisEngineController controller = null;
+     	System.out.println("---------Controller:"+name+" resourceSpecifier:"+resourceSpecifier.getClass().getName()+" ResourceCreationSpecifier:"+(resourceSpecifier instanceof ResourceCreationSpecifier) );
+
+     	if ( isAggregate(d, resourceSpecifier)) {
+    		AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+    		// the following should be removed at some point. Its just
+    		// catching unexpected behaviour which should not happen
+    		// after the code is debugged
+    		if ( !(d instanceof AggregateAnalysisEngineDelegate) ) {
+    			System.out.println("............ what?!");
+    			Thread.dumpStack();
+    		}
+    		AggregateAnalysisEngineDelegate aggregateDeleagte = null;
+    		
+    		if ( d != null ) {
+    			aggregateDeleagte = (AggregateAnalysisEngineDelegate)d;
+    		}
+    		// add an endpoint for each delegate in this aggregate. The endpoint Map is required
+    		// during initialization of an aggregate controller.
+    		Map<String, Endpoint> endpoints = getDelegateEndpoints( aeDescriptor, aggregateDeleagte );
+    		
+    		controller = new AggregateAnalysisEngineController_impl(parentController, name, resourceSpecifier.getSourceUrlString(), casManager, cache, endpoints);
+    		
+    		if ( aggregateDeleagte != null ) {
+        		controller.setErrorHandlerChain(aggregateDeleagte.getDelegateErrorHandlerChain());
+    		}
+    		addFlowController((AggregateAnalysisEngineController)controller, aeDescriptor);
+     		
+    		Map<String, ResourceSpecifier> delegates = aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+    	
+    		createDelegateControllers(aggregateDeleagte, delegates, controller);
+     	} else {
+       		controller = new PrimitiveAnalysisEngineController_impl(parentController, name, resourceSpecifier.getSourceUrlString(),casManager, cache, 10, howManyInstances);
+      	}
+   	    if ( !controller.isTopLevelComponent() ) {
+       		UimaASService service = createUimaASServiceWrapper(controller, d);
+    	    service.start();
+	    }
+
+    	return controller;
+    }
+	private void configureRemoteDelegate( RemoteAnalysisEngineDelegate remote,AggregateAnalysisEngineController aggregateController, Endpoint_impl endpoint) throws Exception {
+		// check if this remote is actually deployed in the same process (collocated). 
+		// If is not local, then it must be a JMS service.
+		if ( remote.isCollocated() ) {
+			// 
+		 //   addListenerForReplyHandling(aggregateController, endpoint, remote );
+
+		} else {
+			// Remote delegate can only be reached via JMS. To communicate with such delegate
+			// the aggregate needs JMS based listener, JmsInputChannel and JmsOutputChannel.
+			//String brokerURL = ((RemoteAnalysisEngineType)o).getInputQueue().getBrokerURL();
+			//int prefetch = ((RemoteAnalysisEngineType)o).getInputQueue().getPrefetch();
+			//endpoint.setRemote(true);
+//		    addJmsListenerForReplyHandling(aggregateController, endpoint, remote );
+			// Single instance of JMS input channel is needed to handle all remote delegate replies. 
+			if ( aggregateController.getInputChannel(ENDPOINT_TYPE.JMS) == null ) {
+				// configure JMS input channel and add it to the controller
+				addInputChannelForRemoteReplies(aggregateController,ChannelType.REPLY);
+			}
+			// each remote delegate needs a dedicated JMS Listener to receive replies 
+//		    addJmsListenerForReplyHandling(aggregateController, endpoint, remote );
+
+		}
+	    addListenerForReplyHandling(aggregateController, endpoint, remote );
+
+	}
+	protected void addFlowController(AggregateAnalysisEngineController aggregateController, AnalysisEngineDescription rs) throws Exception {
+		String fcDescriptor=null;
+		System.out.println(rs.getSourceUrlString());
+		
+		// first check if the AE aggregate descriptor defines a custom flow controller  
+		if ( rs.getFlowControllerDeclaration() != null ) {
+			if( rs.getFlowControllerDeclaration().getImport() == null ) {
+				System.out.println("........................ What!!!!");
+			}
+		
+			// the fc is either imported by name or a location
+			fcDescriptor = rs.getFlowControllerDeclaration().getImport().getName();
+		    if ( fcDescriptor == null ) {
+		    	fcDescriptor = rs.getFlowControllerDeclaration().getImport().getLocation();
+		    	
+		    	fcDescriptor = UimaASUtils.fixPath(rs.getSourceUrlString(), fcDescriptor);
+		    } else {
+		    	throw new RuntimeException("*** Internal error - Invalid flowController specification - descriptor:"+rs.getFlowControllerDeclaration().getSourceUrlString());
+		    }
+		} else {
+			FlowConstraints fc = rs.getAnalysisEngineMetaData().getFlowConstraints();
+			if (FlowControllerType.FIXED.name().equals(fc.getFlowConstraintsType()) ) {
+				fcDescriptor = ("*importByName:org.apache.uima.flow.FixedFlowController");
+			}
+		}
+		((AggregateAnalysisEngineController_impl)aggregateController).setFlowControllerDescriptor(fcDescriptor);
+
+	}
+
+	protected Handler getMessageHandler(AnalysisEngineController controller) {
+		MetadataRequestHandler_impl metaHandler = new MetadataRequestHandler_impl("MetadataRequestHandler");
+		metaHandler.setController(controller);
+		ProcessRequestHandler_impl processHandler = new ProcessRequestHandler_impl("ProcessRequestHandler");
+		processHandler.setController(controller);
+		metaHandler.setDelegate(processHandler);
+		if ( !controller.isPrimitive() ) {
+			MetadataResponseHandler_impl metaResponseHandler = 
+					new MetadataResponseHandler_impl("MetadataResponseHandler");
+			metaResponseHandler.setController(controller);
+			processHandler.setDelegate(metaResponseHandler);
+			
+			ProcessResponseHandler processResponseHandler = 
+					new ProcessResponseHandler("ProcessResponseHandler");
+			processResponseHandler.setController(controller);
+			metaResponseHandler.setDelegate(processResponseHandler);
+			
+		}
+		return metaHandler;
+	}
+	protected void validateColocatedDelegates(DelegatesType dt, ResourceSpecifier resourceSpecifier) throws Exception {
+		for( DelegateAnalysisEngineType delegate : dt.getAnalysisEngineArray()) {
+			checkDelegateKey(delegate.getKey(), resourceSpecifier);
+		}
+	}
+
+	protected void validateRemoteDelegates(DelegatesType dt, ResourceSpecifier resourceSpecifier) throws Exception {
+		for( RemoteAnalysisEngineType remoteDelegate : dt.getRemoteAnalysisEngineArray() ) {
+			checkDelegateKey(remoteDelegate.getKey(), resourceSpecifier);
+		}
+		
+
+	}
+
+	private void addCasMultiplierProperties(Endpoint endpoint, AnalysisEngineDelegate aed  ) {
+		if ( aed.isCasMultiplier() ) {
+			endpoint.setIsCasMultiplier(true);
+			endpoint.setProcessParentLast(aed.getCasMultiplier().isProcessParentLast());
+			if ( aed.getCasMultiplier().getPoolSize() > 1) {
+				endpoint.setShadowCasPoolSize(aed.getCasMultiplier().getPoolSize());
+			}
+			if ( aed.getCasMultiplier().getInitialHeapSize() > 0 ) {
+				endpoint.setInitialFsHeapSize(aed.getCasMultiplier().getInitialHeapSize());
+			}
+			endpoint.setDisableJCasCache(aed.getCasMultiplier().disableJCasCache());		
+		}	
+
+	}
+	private void addTimeouts(Endpoint endpoint, AnalysisEngineDelegate aed ) {
+		endpoint.setMetadataRequestTimeout(aed.getGetMetaTimeout());
+		endpoint.setProcessRequestTimeout(aed.getProcessTimeout());
+		endpoint.setCollectionProcessCompleteTimeout(aed.getCpcTimeout());
+
+	}
+	private Endpoint getEndpoint(Entry<String,ResourceSpecifier> delegate, AggregateAnalysisEngineDelegate ddAggregate) {
+		// assume this delegate is not remote
+		boolean isRemote = false;
+		// Check if there is a DD configuration for this delegate
+		String serviceEndpoint = delegate.getKey();
+		AnalysisEngineDelegate aed = getMatchingDelegateFromDD(delegate.getKey(), ddAggregate);
+		if ( aed != null && aed.isRemote()) {
+			isRemote = true;
+			serviceEndpoint = resolvePlaceholder(((RemoteAnalysisEngineDelegate)aed).getQueueName());
+		}
+		// For each delegate create an Endpoint object. 
+		Endpoint endpoint =  new DelegateEndpoint(). new Builder().
+					  withDelegateKey(delegate.getKey()).
+					  withEndpointName(serviceEndpoint).
+					  setRemote(isRemote).
+				      withResourceSpecifier(delegate.getValue()).
+				      build();
+		// if there is a DD configuration for this delegate, override defaults with
+		// configured values.
+		if (aed != null ) { 
+			// if this service is a CasMultiplier, add CM properties to the endpoint object
+			addCasMultiplierProperties(endpoint, aed);
+			addTimeouts(endpoint, aed);
+			if ( endpoint.isRemote() ) {
+				configureRemoteEndpoint((RemoteAnalysisEngineDelegate)aed, endpoint);
+			} else {
+				configureColocatedEndpoint(aed, endpoint);
+			}
+		}
+		return endpoint;
+	}
+	private void configureColocatedEndpoint(AnalysisEngineDelegate aed, Endpoint endpoint) {
+		// co-located delegate may have optional scaleout info. Override
+		// defaults if necessary
+		if ( aed.getScaleout() > 1) {
+			endpoint.setConcurrentRequestConsumers(aed.getScaleout());
+		}
+		if ( aed.getReplyScaleout() > 1) {
+			endpoint.setConcurrentReplyConsumers(aed.getReplyScaleout());
+		}
+
+	}
+	private void configureRemoteEndpoint(RemoteAnalysisEngineDelegate remoteDelegate, Endpoint endpoint ) {
+		// a remote delegate must include broker and queue
+		endpoint.setServerURI(resolvePlaceholder(remoteDelegate.getBrokerURI()));
+		if ( remoteDelegate.getReplyScaleout() > 1 ) {
+			endpoint.setConcurrentRequestConsumers(remoteDelegate.getReplyScaleout());
+		}
+		if (remoteDelegate.getSerialization() != null ) {
+			SerialFormat sf = SerialFormat.valueOf(remoteDelegate.getSerialization().toUpperCase());
+			endpoint.setSerialFormat(sf);
+		}
+		
+	}
+	protected Map<String, Endpoint> getDelegateEndpoints(AnalysisEngineDescription aeDescriptorAggregate, AggregateAnalysisEngineDelegate ddAggregate ) throws Exception { 
+		Map<String, Endpoint> endpoints = new HashMap<>();
+		
+		Map<String, ResourceSpecifier> delegates = aeDescriptorAggregate.getDelegateAnalysisEngineSpecifiers();
+		// Create an endpoint object for each delegate of this aggregate
+		for (Map.Entry<String, ResourceSpecifier> delegate : delegates.entrySet()) {
+			Endpoint endpoint = getEndpoint(delegate, ddAggregate);
+		    endpoints.put(delegate.getKey(), endpoint);
+		}
+		return endpoints;
+	}
+	protected void validateDD(AnalysisEngineDeploymentDescriptionDocument dd, ResourceSpecifier resourceSpecifier) throws Exception {
+
+		if (resourceSpecifier instanceof AnalysisEngineDescription) {
+			AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+			// verify delegate keys in DD if this is an aggregate uima-as service
+			if (!aeDescriptor.isPrimitive()) {  
+				AnalysisEngineType aet = getService(dd).getAnalysisEngine();
+				if ( aet == null ) {
+					aet = getService(dd).addNewAnalysisEngine();
+				}
+				if ( aet.getDelegates() != null ) {
+					DelegatesType dt = aet.getDelegates();
+					if ( dt.getAnalysisEngineArray() != null ) {
+						validateRemoteDelegates(dt, resourceSpecifier);
+						validateColocatedDelegates(dt, resourceSpecifier);
+					}
+				}
+			}
+		}
+	}
+	protected boolean validDelegate(ResourceSpecifier resourceSpecifier, String delegateKey) throws Exception {
+		if (resourceSpecifier instanceof AnalysisEngineDescription) {
+			AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+			if (aeDescriptor.isPrimitive()) {
+				
+			} else {
+				Map<String, ResourceSpecifier> delegates = aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+				for (Map.Entry<String, ResourceSpecifier> delegate : delegates.entrySet()) {
+					String key = delegate.getKey();
+					if ( delegateKey.equals(key)) {
+						return true;
+					}
+				}
+			}
+		}
+		return false;
+	}
+	
+	protected void checkDelegateKey(String key, ResourceSpecifier resourceSpecifier) throws Exception {
+		if ( key == null || key.trim().length() == 0) {
+			throw new UimaAsDelegateException("*** ERROR ** The delegate key in the deployment descriptor not specified. The delegate key is required and must match a delegate in the referenced descriptor", 
+					new RuntimeException("Invalid Delegate Key in Deployment Descriptor ("+key+")"));
+		}
+		if (!validDelegate(resourceSpecifier, key)) {
+			throw new UimaAsDelegateException("*** ERROR ** The delegate in the deployment descriptor with key="+key+" does not match any delegates in the referenced descriptor", 
+					new RuntimeException("Invalid Delegate in Deployment Descriptor ("+key+")"));
+		}
+	}
+	protected boolean isDelegate(String parentController) {
+		return NoParent.equals(parentController);
+	}
+	protected boolean isTopLevel(String parentController) {
+		return NoParent.equals(parentController);
+	}
+	protected String getDelegateKey(String parentName, String name) {
+		String key = name;
+	//	if ( parentName != "NoParent" && parentName != "TLS") {
+
+		if ( !parentName.equals("NoParent") && !parentName.equals("TLS")) {
+			key = parentName+"/"+name;
+		} 
+//			else {
+//			key = name;
+//		}
+		return key;
+	}
+	protected CasPoolType getCasPoolConfiguration(AnalysisEngineDeploymentDescriptionDocument doc) {
+		CasPoolType cp;
+		if ( (cp = doc.getAnalysisEngineDeploymentDescription().getDeployment().getCasPool())  == null ) {
+			cp = doc.getAnalysisEngineDeploymentDescription().getDeployment().addNewCasPool();
+			cp.setInitialFsHeapSize(512);
+			cp.setNumberOfCASes(1);
+			cp.setDisableJCasCache(false);
+		} 
+		return cp;
+	}
+	protected int howManyInstances(AnalysisEngineDeploymentDescriptionDocument doc) {
+		int howMany = 1; //default
+		TopLevelAnalysisEngineType topLevel =
+				doc.getAnalysisEngineDeploymentDescription().
+				    getDeployment().
+				    getService().
+				    getAnalysisEngine();
+		if ( topLevel != null && topLevel.getScaleout() != null ) {
+			howMany = topLevel.getScaleout().getNumberOfInstances();
+		} 
+		
+		return howMany;
+	}
+	protected void initialize(UimaASService service, CasPoolType cp, Transport transport) {
+
+		resourceManager = UimaClassFactory.produceResourceManager();
+		casManager = new AsynchAECasManager_impl(resourceManager);
+		casManager.setCasPoolSize(cp.getNumberOfCASes());
+		casManager.setDisableJCasCache(cp.getDisableJCasCache());
+		casManager.setInitialFsHeapSize(cp.getInitialFsHeapSize());
+
+		if ( transport.equals(Transport.JMS)) {
+			cache = new InProcessCache();
+		} else if ( transport.equals(Transport.Java)) {
+			if ( (cache = (InProcessCache)System.getProperties().get("InProcessCache")) == null) {
+				cache = new InProcessCache();
+				System.getProperties().put("InProcessCache", cache);
+			} 
+	
+		}
+//		if ( cache == null ) {
+//			cache = new InProcessCache();
+//		}
+	}
+	protected String getAEDescriptorPath(AnalysisEngineDeploymentDescriptionDocument doc) {
+		ServiceType  s = getService(doc);
+		if ( s.getTopDescriptor() == null ) {
+			throw new RuntimeException("Missing <topDescriptor> element in the deployment descriptor");
+		}
+		String aeDescriptor = "";
+		if ( s.getTopDescriptor().getImport().getName() != null ) {
+			aeDescriptor = s.getTopDescriptor().getImport().getName();
+		} else if ( s.getTopDescriptor().getImport().getLocation() != null ) {
+			aeDescriptor = s.getTopDescriptor().getImport().getLocation();
+		} else {
+			throw new RuntimeException("Missing <import> element in the deployment descriptor");
+		}
+		  XmlDocumentProperties dp = doc.documentProperties();
+		  System.out.println(dp.getSourceName());
+		  aeDescriptor = UimaASUtils.fixPath(dp.getSourceName(), aeDescriptor);
+
+		return aeDescriptor;
+	}
+	protected ProcessCasErrorsType getTopLevelServiceErrorConfiguration(AnalysisEngineDeploymentDescriptionDocument doc) {
+		ProcessCasErrorsType topLevelErrorHandlingConfig = null;
+		ServiceType s = getService(doc);
+		AsyncPrimitiveErrorConfigurationType pec;
+		if ( s.getAnalysisEngine() != null && s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration() != null) {
+			pec = s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration();
+		} else {
+			if ( s.getAnalysisEngine() == null ) {
+				s.addNewAnalysisEngine();
+			}
+			pec = s.getAnalysisEngine().addNewAsyncPrimitiveErrorConfiguration();
+			// Create default error handling for process CAS requests
+			// By default, any error will result in process termination
+			ProcessCasErrorsType pcet = pec.addNewProcessCasErrors();
+			pcet.setContinueOnRetryFailure("false");
+			pcet.setMaxRetries(0);
+			pcet.setThresholdAction("terminate");
+			pcet.setThresholdCount(0);
+			pcet.setThresholdWindow(0);
+			pcet.setTimeout(0);  // no timeout
+			
+			CollectionProcessCompleteErrorsType cpcet = pec.addNewCollectionProcessCompleteErrors();
+			cpcet.setTimeout(0);
+
+			
+		}
+		topLevelErrorHandlingConfig = pec.getProcessCasErrors();
+		return topLevelErrorHandlingConfig;
+	}
+	public AnalysisEngineDeploymentDescriptionDocument getDD(String deploymentDescriptorPath ) throws Exception {
+		return parseDD(deploymentDescriptorPath);
+	}
+	protected AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
+		return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath));	
+
+	}
+	private CasMultiplierNature getCasMultiplier(CasMultiplierType cm) {
+		String initialHeapSize = cm.getInitialFsHeapSize();
+		int poolSize = cm.getPoolSize();
+		String processParentLast = cm.getProcessParentLast();
+		CasMultiplierNature cmn = new CasMultiplierNature();
+		if ( initialHeapSize == null ) {
+			initialHeapSize = "512";
+		}
+		cmn.setInitialHeapSize(Integer.parseInt(initialHeapSize));
+		cmn.setPoolSize(poolSize);
+		if ( processParentLast == null ) {
+			processParentLast = "false";
+		}
+		cmn.setProcessParentLast(Boolean.parseBoolean(processParentLast));
+		cmn.disableJCasCache(cm.getDisableJCasCache());
+		return cmn;
+	}
+	private Map<String, Threshold> getThresholdMap(Class<? extends ErrorHandler> errorHandlerClass, ErrorHandlerChain errorHandlerChain ) {
+		Map<String, Threshold> thresholdMap = null;
+		for( ErrorHandler handler : errorHandlerChain ) {
+			if ( errorHandlerClass.isInstance(handler)) {
+				thresholdMap = handler.getEndpointThresholdMap();
+				break;
+			} 
+		}
+		if ( thresholdMap == null ) {
+			thresholdMap = new HashMap<>();
+		}
+		return thresholdMap;
+	}
+
+	private void addRemoteDelegates(RemoteAnalysisEngineType[] remoteAnalysisEngineArray, AggregateAnalysisEngineDelegate aggregate ) {
+		ErrorHandlerChain errorHandlerChain = null;
+		
+		// at this point the error handler chain must exist. See parse() method.
+		errorHandlerChain = aggregate.getDelegateErrorHandlerChain();
+		Map<String, Threshold> processThresholdMap =
+				getThresholdMap(ProcessCasErrorHandler.class,errorHandlerChain);
+		Map<String, Threshold> getMetaThresholdMap =
+				getThresholdMap(GetMetaErrorHandler.class,errorHandlerChain);
+		Map<String, Threshold> cpcThresholdMap =
+				getThresholdMap(CpcErrorHandler.class,errorHandlerChain);
+		
+		for( RemoteAnalysisEngineType remote : remoteAnalysisEngineArray) {
+			String key = remote.getKey();
+			Threshold defaultThreshold = Thresholds.newThreshold();
+			RemoteAnalysisEngineDelegate delegate = new RemoteAnalysisEngineDelegate(key);
+			String brokerURL = resolvePlaceholder(remote.getInputQueue().getBrokerURL());
+			String queueName = resolvePlaceholder(remote.getInputQueue().getEndpoint());
+			int prefetch = remote.getInputQueue().getPrefetch();
+			delegate.setBrokerURI(brokerURL);
+			delegate.setQueueName(queueName);
+			delegate.setPrefetch(prefetch);
+			if ( remote.isSetCasMultiplier()) {
+				delegate.setCm(getCasMultiplier(remote.getCasMultiplier()));
+			
+			}
+			int consumerCount = remote.getRemoteReplyQueueScaleout();
+			String serialization = SerialFormat.XMI.getDefaultFileExtension(); // default if not specified
+
+			if ( remote.getSerializer() != null ) {
+				serialization = remote.getSerializer().getStringValue();
+			}
+			if ( serialization != null && serialization.trim().length() > 0) {
+				delegate.setSerialization(serialization);
+			}
+
+			// if consumer count is defined override default which is 1
+			if ( consumerCount > 0 ) {
+				delegate.setScaleout(consumerCount);
+			}
+			
+			if ( remote.getAsyncAggregateErrorConfiguration() != null ) {
+				Thresholds.addDelegateErrorThreshold(delegate,remote.getAsyncAggregateErrorConfiguration().getGetMetadataErrors(),getMetaThresholdMap);
+				Thresholds.addDelegateErrorThreshold(delegate,remote.getAsyncAggregateErrorConfiguration().getProcessCasErrors(),processThresholdMap);
+				Thresholds.addDelegateErrorThreshold(delegate,remote.getAsyncAggregateErrorConfiguration().getCollectionProcessCompleteErrors(),cpcThresholdMap);
+
+
+			} else {
+				// Error configuration is optional in the DD. Add defaults.
+				getMetaThresholdMap.put(key, defaultThreshold);
+				processThresholdMap.put(key, defaultThreshold);
+				cpcThresholdMap.put(key, defaultThreshold);
+			}
+			
+			aggregate.addDelegate(delegate);
+			
+		}
+	}
+	private void addColocatedDelegates(DelegateAnalysisEngineType[] analysisEngineArray, AggregateAnalysisEngineDelegate aggregate ) {
+
+		
+		// at this point the error handler chain must exist. See parse() method.
+		ErrorHandlerChain errorHandlerChain = aggregate.getDelegateErrorHandlerChain();
+		Map<String, Threshold> processThresholdMap =
+				getThresholdMap(ProcessCasErrorHandler.class,errorHandlerChain);
+		Map<String, Threshold> getMetaThresholdMap =
+				getThresholdMap(GetMetaErrorHandler.class,errorHandlerChain);
+		Map<String, Threshold> cpcThresholdMap =
+				getThresholdMap(CpcErrorHandler.class,errorHandlerChain);
+
+		Threshold defaultThreshold = Thresholds.newThreshold();
+		
+		// Add default error handling to each co-located delegate
+		for( DelegateAnalysisEngineType delegate : analysisEngineArray ) {
+			String key = delegate.getKey();
+			getMetaThresholdMap.put(key, defaultThreshold);
+			processThresholdMap.put(key, defaultThreshold);
+			cpcThresholdMap.put(key, defaultThreshold);
+			// recursively iterate over delegates until no more aggregates found
+			aggregate.addDelegate(parse(delegate));
+		}
+	}
+	private boolean isAggregate(AnalysisEngineType aet) {
+		// Is this an aggregate? An aggregate has a property async=true or has delegates.
+		System.out.println("......"+aet.getKey()+" aet.getAsync()="+aet.getAsync()+" aet.isSetAsync()="+aet.isSetAsync()+" aet.isSetDelegates()="+aet.isSetDelegates() );
+
+		if ( "true".equals(aet.getAsync()) || aet.isSetDelegates() ) {
+			return true;
+		}
+		return false;
+	}
+	private ErrorHandlerChain createServiceErrorHandlers() {
+		List<org.apache.uima.aae.error.ErrorHandler> errorHandlers = 
+				new ArrayList<>();
+		// There is a dedicated handler for each of GetMeta, ProcessCas, and CPC
+		errorHandlers.add(new GetMetaErrorHandler(new HashMap<String, Threshold>()));
+		errorHandlers.add(new ProcessCasErrorHandler(new HashMap<String, Threshold>()));
+		errorHandlers.add(new CpcErrorHandler(new HashMap<String, Threshold>()));
+		
+		return new ErrorHandlerChain(errorHandlers);
+	}
+	private boolean hasCollocatedDelegates( AnalysisEngineType aet) {
+		return ( aet.getDelegates() != null && aet.getDelegates().sizeOfAnalysisEngineArray() > 0 );
+	}
+	private boolean hasRemoteDelegates(AnalysisEngineType aet) {
+		return( aet.getDelegates() != null && aet.getDelegates().sizeOfRemoteAnalysisEngineArray() > 0);
+	}
+	protected AnalysisEngineDelegate parse(AnalysisEngineType aet) {
+		AnalysisEngineDelegate delegate = null;
+		
+		if ( isAggregate(aet) ) { 
+
+			delegate = new AggregateAnalysisEngineDelegate(aet.getKey());
+			
+			ErrorHandlerChain errorHandlerChain = createServiceErrorHandlers();
+			((AggregateAnalysisEngineDelegate)delegate).setDelegateErrorHandlerChain(errorHandlerChain);
+			
+			// The DD object maintains two arrays, one for co-located delegates and the other for remotes.
+			// First handle co-located delegates.
+			if ( hasCollocatedDelegates(aet) ) {
+				DelegateAnalysisEngineType[] localAnalysisEngineArray =
+						aet.getDelegates().getAnalysisEngineArray();
+				addColocatedDelegates(localAnalysisEngineArray,(AggregateAnalysisEngineDelegate)delegate);
+			}
+			// Next add remote delegates of this aggregate
+			if ( hasRemoteDelegates(aet) ) {
+				RemoteAnalysisEngineType[] remoteAnalysisEngineArray =
+						aet.getDelegates().getRemoteAnalysisEngineArray();
+				addRemoteDelegates(remoteAnalysisEngineArray, (AggregateAnalysisEngineDelegate)delegate);
+			}
+		} else {
+			// This is a primitive
+			delegate = new AnalysisEngineDelegate(aet.getKey());
+		}
+		if ( Boolean.parseBoolean(aet.getAsync()) ) {
+			delegate.setAsync(true);
+		}
+		
+		if ( aet.getInputQueueScaleout() != null ) {
+			delegate.setScaleout(Integer.valueOf(aet.getInputQueueScaleout()) );
+		}
+		if ( aet.getInternalReplyQueueScaleout() != null ) {
+			delegate.setReplyScaleout(Integer.valueOf(aet.getInternalReplyQueueScaleout()) );
+		}
+		
+		if ( aet.getScaleout() != null ) {
+			delegate.setScaleout(aet.getScaleout().getNumberOfInstances() );
+		}
+		
+		if ( aet.isSetCasMultiplier() ) {
+			delegate.setCm(getCasMultiplier(aet.getCasMultiplier()));
+		}
+
+		return delegate;
+	}
+
+	protected ServiceType getService(AnalysisEngineDeploymentDescriptionDocument doc) {
+		return doc.getAnalysisEngineDeploymentDescription().getDeployment().getService();
+	}
+	public static String resolvePlaceholder(String placeholderName) {
+        Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}");
+        Matcher matcher = pattern.matcher(placeholderName);
+        StringBuffer buffer = new StringBuffer();
+
+        while (matcher.find()) {
+            String replacement = getValue(matcher.group(1));
+            if (replacement != null) {
+               matcher.appendReplacement(buffer, "");
+               buffer.append(replacement);
+            }
+        }
+        matcher.appendTail(buffer);
+        return buffer.toString();
+	}
+	public static String getValue(String placeholderName) {
+		System.out.println(".....Finding Match For Placeholder:"+placeholderName);
+		String value = System.getProperty(placeholderName);
+		if (value == null) {
+		// Check the environment
+		   value = System.getenv(placeholderName);
+		}
+		return value;
+	}
+	protected boolean isPrimitive(AnalysisEngineDescription aeDescriptor, ServiceType st) {
+		if ( aeDescriptor.isPrimitive()) {
+			return true;
+		}
+		if (st.getAnalysisEngine() != null && 
+				(st.getAnalysisEngine().getAsync() != null && 
+				st.getAnalysisEngine().getAsync().equals("true")) ||
+				st.getAnalysisEngine().getDelegates() != null
+			) {
+			return false;
+		}
+		
+		return true;
+	}
+	
+	protected boolean isRemote(AnalysisEngineType type) {
+		if ( type == null ) {
+			return false;
+		}
+		return (type instanceof RemoteAnalysisEngineType);
+	}
+	protected boolean isRemote(DelegatesType dt, String delegateKey) {
+		if ( dt == null || dt.getRemoteAnalysisEngineArray() == null ) {
+			return false;  // there are no remote delegates
+		}
+		RemoteAnalysisEngineType[] remoteDelegates = dt.getRemoteAnalysisEngineArray();
+		for( RemoteAnalysisEngineType remoteDelegate : remoteDelegates ) {
+			if ( remoteDelegate.getKey().equals(delegateKey)) {
+				return true;
+			}
+		}
+		return false;
+	}
+	protected class AnalysisEngineDeployment {
+		private Object deploymentConfiguration;
+		
+		public AnalysisEngineDeployment( Object aet ) {
+			deploymentConfiguration = aet;
+		}
+		
+		public boolean isRemote() {
+			if ( deploymentConfiguration == null || deploymentConfiguration instanceof AnalysisEngineType ) {
+				return false;
+			}
+			return true;
+		}
+		public boolean withConfiguration() {
+			return deploymentConfiguration != null;
+		}
+		public AnalysisEngineType asAnalysisEngineType() {
+			return (AnalysisEngineType)deploymentConfiguration;
+		}
+		public RemoteAnalysisEngineType asRemoteAnalysisEngineType() {
+			return (RemoteAnalysisEngineType)deploymentConfiguration;
+		}
+		public int getScaleout() {
+			int scaleout = 1; // default
+	    	if ( deploymentConfiguration != null && deploymentConfiguration instanceof AnalysisEngineType ) {
+	    		if ( ((AnalysisEngineType)deploymentConfiguration).getScaleout() != null ) {
+		    		scaleout = ((AnalysisEngineType)deploymentConfiguration).getScaleout().getNumberOfInstances();
+	    		}
+	    	}
+	    	return scaleout;
+		}
+	}
+	
+	public enum Serialization {
+		XMI, Binary;
+	}
+	
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.builder;
+
+public interface ServiceBuilder {
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.builder;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.management.ServiceNotFoundException;
+
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.error.ErrorHandler;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.error.Threshold;
+import org.apache.uima.aae.error.Thresholds;
+import org.apache.uima.aae.error.handler.CpcErrorHandler;
+import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
+import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
+import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.service.AsynchronousUimaASService;
+import org.apache.uima.aae.service.ServiceRegistry;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.UimaAsServiceRegistry;
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.RemoteAnalysisEngineDelegate;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.DirectListener;
+import org.apache.uima.as.client.DirectMessage;
+import org.apache.uima.as.client.DirectOutputChannel;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
+import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+
+public class UimaAsDirectServiceBuilder extends AbstractUimaAsServiceBuilder  {
+	private static final String NoParent = "NoParent";
+	/*
+	 * private static enum FlowControllerType { FIXED }
+	 */
+	static Map<String, Object> ddAEMap = new HashMap<>();
+
+	private int scaleout = 1;
+	public static void main(String[] args) {
+		try {
+			String tla = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/tutorial/ex4/MeetingDetectorTAEGovNameDetector.xml";
+			String ptDescriptor = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/analysis_engine/PersonTitleAnnotator.xml";
+			// "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/tutorial/ex4/MeetingDetectorTAE.xml";
+
+			// String dd1 =
+			// "C:/uima/releases/builds/uima-as/2.8.1/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelBlueJAggregateCM.xml";
+			String dd2 = "C:/uima/releases/builds/uima-as/2.8.1/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopAggregateWithInnerAggregateCM.xml";
+			String dd = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/deploy/as/Deploy_MeetingDetectorTAE.xml";
+
+			String dd3 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml";
+			String dd4 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml";
+		
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+
+	
+	public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback)
+			throws Exception {
+		// get the top level AnalysisEngine descriptor path
+		String aeDescriptorPath = getAEDescriptorPath(dd);
+		// parse AE descriptor
+		ResourceSpecifier resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aeDescriptorPath);
+		validateDD(dd, resourceSpecifier);
+		ServiceType serviceDefinition = getService(dd);
+		AnalysisEngineDelegate topLevelService;
+		// in DD the analysisEngine specification is optional
+		if (serviceDefinition.getAnalysisEngine() == null) {
+			topLevelService = new AnalysisEngineDelegate();
+			topLevelService.setResourceSpecifier((AnalysisEngineDescription) resourceSpecifier);
+		} else {
+			topLevelService = parse(getService(dd).getAnalysisEngine());
+		}
+		// resolve if placeholder used
+		String endpoint = resolvePlaceholder(serviceDefinition.getInputQueue().getEndpoint());
+		
+		int howMany = 1;
+		AsynchronousUimaASService service = null;
+		
+		// is this the only one resource specifier type supported  by the current uima-as?
+		if (resourceSpecifier instanceof AnalysisEngineDescription) {
+			AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+			// Create a Top Level Service (TLS) wrapper. This wrapper may contain
+			// references to multiple TLS service instances if the TLS is scaled
+			// up.
+			service = new AsynchronousUimaASService(endpoint)
+					.withName(aeDescriptor.getAnalysisEngineMetaData().getName())
+					.withResourceSpecifier(resourceSpecifier).withScaleout(howMany);
+
+			this.buildAndDeploy(dd, topLevelService, service, callback);
+			
+
+		}
+		return service;
+	}
+
+	public UimaASService buildAndDeploy(AnalysisEngineDeploymentDescriptionDocument doc, AnalysisEngineDelegate delegate,
+			AsynchronousUimaASService service, ControllerCallbackListener callback) throws Exception {
+		// get top level CAS pool to
+		CasPoolType cp = getCasPoolConfiguration(doc);
+
+		super.addEnvironmentVariablesFromDD(doc);
+
+		System.setProperty("BrokerURI", "Direct");
+		// NEED TO INJECT shared InProcessCache here
+		//UimaAsServiceRegistry.getInstance().lookupById("");
+		initialize(service, cp, Transport.Java); 
+		
+		service.withInProcessCache(super.cache);
+		// Number of Analysis Engine instances
+		int howMany = howManyInstances(doc);
+		if ( howMany == 0 ) {
+			throw new IllegalArgumentException("Number of instances should be greater than zero - check dd");
+		}
+		AnalysisEngineController topLevelController = createController(delegate, service.getResourceSpecifier(),
+				service.getEndpoint(), null, howMany);
+		topLevelController.addControllerCallbackListener(callback);
+
+		topLevelController.setServiceId(service.getId());
+
+		ServiceType s = getService(doc);
+
+		AsyncPrimitiveErrorConfigurationType pec;
+		if (s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration() != null) {
+			pec = s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration();
+
+		} else {
+			pec = addDefaultErrorHandling(s);
+		}
+
+		configureTopLevelService(topLevelController, service, pec, howMany);
+
+		return service;
+	}
+
+	private void addErrorHandling(AnalysisEngineController topLevelController, AsyncPrimitiveErrorConfigurationType pec) {
+		if (!topLevelController.isPrimitive() && pec != null) {
+
+			ErrorHandlerChain chain = topLevelController.getErrorHandlerChain();
+			Iterator<ErrorHandler> handlers = chain.iterator();
+			while (handlers.hasNext()) {
+				ErrorHandler eh = handlers.next();
+				Map<String, Threshold> map = eh.getEndpointThresholdMap();
+				if (eh instanceof ProcessCasErrorHandler) {
+					if (pec.getProcessCasErrors() != null) {
+						map.put("", Thresholds.getThreshold(pec.getProcessCasErrors().getThresholdAction(),
+								pec.getProcessCasErrors().getMaxRetries()));
+					} else {
+						map.put("", Thresholds.newThreshold());
+					}
+				} else if (eh instanceof GetMetaErrorHandler) {
+					if (pec.getCollectionProcessCompleteErrors() != null) {
+						map.put("", Thresholds.getThreshold("terminate", 0));
+					}
+				} else if (eh instanceof CpcErrorHandler) {
+					map.put("", Thresholds.getThreshold("", 0));
+				}
+			}
+			
+		}
+	}
+	private void configureTopLevelService(AnalysisEngineController topLevelController,
+			AsynchronousUimaASService service, AsyncPrimitiveErrorConfigurationType pec, int howMany) throws Exception {
+		addErrorHandling(topLevelController, pec);
+
+
+		// create a single instance of OutputChannel for Direct communication if
+		// necessary
+		DirectOutputChannel outputChannel = null;
+		if (topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+			outputChannel = new DirectOutputChannel().withController(topLevelController);
+			topLevelController.addOutputChannel(outputChannel);
+		} else {
+			outputChannel = (DirectOutputChannel) topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT);
+		}
+
+		DirectInputChannel inputChannel;
+		if ((topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+			inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).withController(topLevelController);
+			Handler messageHandlerChain = getMessageHandler(topLevelController);
+			inputChannel.setMessageHandler(messageHandlerChain);
+			topLevelController.addInputChannel(inputChannel);
+		} else {
+			inputChannel = (DirectInputChannel) topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT);
+		}
+
+		if ( topLevelController instanceof AggregateAnalysisEngineController ) {
+			((AggregateAnalysisEngineController_impl)topLevelController).setServiceEndpointName(service.getEndpoint());
+		}
+		BlockingQueue<DirectMessage> pQ = null; // service.getProcessRequestQueue();
+		BlockingQueue<DirectMessage> mQ = null; //service.getMetaRequestQueue();
+
+		// Lookup queue name in service registry. If this queue exists, the new service
+		// being
+		// created here will share the same queue to balance the load.
+		UimaASService s;
+		try {
+			s = UimaAsServiceRegistry.getInstance().lookupByEndpoint(service.getEndpoint());
+			if ( s instanceof AsynchronousUimaASService) {
+//				if (s != null && s instanceof AsynchronousUimaASService) {
+				pQ = ((AsynchronousUimaASService) s).getProcessRequestQueue();
+				mQ = ((AsynchronousUimaASService) s).getMetaRequestQueue();
+			}
+
+		} catch( Exception ee) {
+			pQ = service.getProcessRequestQueue();
+			mQ = service.getMetaRequestQueue();
+		}
+
+
+		scaleout = howMany;
+		DirectListener processListener = new DirectListener(Type.ProcessCAS).withController(topLevelController)
+				.withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(pQ).
+				// withQueue(service.getProcessRequestQueue()).
+				initialize();
+
+		DirectListener getMetaListener = new DirectListener(Type.GetMeta).withController(topLevelController)
+				.withConsumerThreads(1).withInputChannel(inputChannel).
+				// withQueue(service.getMetaRequestQueue()).
+				withQueue(mQ).initialize();
+
+		addFreeCASListener(service, topLevelController, inputChannel, outputChannel, scaleout );
+
+		inputChannel.registerListener(getMetaListener);
+		inputChannel.registerListener(processListener);
+
+		service.withController(topLevelController);
+		
+	}
+
+	private void addFreeCASListener( AsynchronousUimaASService service, AnalysisEngineController controller, 
+			DirectInputChannel inputChannel, DirectOutputChannel outputChannel, int scaleout ) throws Exception {
+		DirectListener freCASChannelListener = null;
+		if (controller.isCasMultiplier()) {
+			freCASChannelListener = new DirectListener(Type.FreeCAS).withController(controller)
+					.withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(service.getFreeCasQueue())
+					.initialize();
+			inputChannel.registerListener(freCASChannelListener);
+			outputChannel.setFreeCASQueue(service.getFreeCasQueue());
+		}
+	}
+	public static InputChannel createInputChannel(ChannelType type) {
+		return new DirectInputChannel(type);
+	}
+
+	
+	public static OutputChannel createOutputChannel(AnalysisEngineController controller) {
+		return  new DirectOutputChannel().withController(controller);
+	}
+	private void createDirectOutputChannel(AggregateAnalysisEngineController controller) throws Exception {
+		// there should be one instance of OutputChannel for DIRECT. Create it, if one does not exist 
+		if ( controller.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+			OutputChannel oc = createOutputChannel(controller);
+			oc.initialize();
+			controller.addOutputChannel(oc);
+		}
+	}
+	/*
+	private DirectInputChannel createDirectInputChannel(AggregateAnalysisEngineController controller) throws Exception {
+		DirectInputChannel inputChannel;
+	    if ( (controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null ) {
+	    	inputChannel = (DirectInputChannel)createInputChannel(ChannelType.REQUEST_REPLY);
+	    	((DirectInputChannel)inputChannel).withController(controller);
+		    Handler messageHandlerChain = getMessageHandler(controller);
+		    inputChannel.setMessageHandler(messageHandlerChain);
+		    controller.addInputChannel(inputChannel);
+	    } else {
+	    	inputChannel = (DirectInputChannel)controller.getInputChannel(ENDPOINT_TYPE.DIRECT );
+	    }
+	    return inputChannel;
+	}
+*/
+	@Override
+	protected void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception {
+		// there should be one instance of OutputChannel for DIRECT. Create it, if one does not exist 
+		createDirectOutputChannel(controller);
+		
+	    DirectInputChannel inputChannel;
+	    if ( (controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null ) {
+	    	inputChannel = 
+	    			new DirectInputChannel(ChannelType.REQUEST_REPLY).
+	    			withController(controller);
+		    Handler messageHandlerChain = getMessageHandler(controller);
+		    inputChannel.setMessageHandler(messageHandlerChain);
+		    controller.addInputChannel(inputChannel);
+	    } else {
+	    	inputChannel = (DirectInputChannel)controller.getInputChannel(ENDPOINT_TYPE.DIRECT );
+	    }
+	    BlockingQueue<DirectMessage> replyQueue = 
+	    		new LinkedBlockingQueue<>();
+	    DirectListener processListener = 
+	    		new DirectListener(Type.Reply).
+	    			withController(controller).
+	    			withConsumerThreads(remoteDelegate.getReplyScaleout()).
+	    			withInputChannel(inputChannel).
+		            withQueue(replyQueue).
+		            initialize(); 
+	    inputChannel.registerListener(processListener);
+	    
+	    UimaASService service =
+	    		UimaAsServiceRegistry.getInstance().lookupByEndpoint(endpoint.getEndpoint());
+	    if ( service != null && service instanceof AsynchronousUimaASService ) {
+		    endpoint.setGetMetaDestination(((AsynchronousUimaASService)service).getMetaRequestQueue());
+		    endpoint.setDestination(((AsynchronousUimaASService)service).getProcessRequestQueue());
+		    endpoint.setReplyDestination(replyQueue);
+	    }
+	    
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.SerializerCache;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.UimaSerializer;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
+import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.aae.monitor.statistics.DelegateStats;
+import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
+import org.apache.uima.aae.monitor.statistics.TimerStats;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.Marker;
+import org.apache.uima.cas.SerialFormat;
+import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo;
+import org.apache.uima.cas.impl.Serialization;
+import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.util.Level;
+
+public abstract class AbstractUimaAsCommand implements UimaAsCommand {
+	protected AnalysisEngineController controller;
+	private Object mux = new Object();
+
+	protected AbstractUimaAsCommand(AnalysisEngineController controller) {
+		this.controller = controller;
+	}
+
+	protected String getCasReferenceId(Class<?> concreteClassName, MessageContext aMessageContext) throws AsynchAEException {
+		if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) {
+			if (UIMAFramework.getLogger(concreteClassName).isLoggable(Level.INFO)) {
+				UIMAFramework.getLogger(concreteClassName).logrb(Level.INFO, concreteClassName.getName(),
+						"getCasReferenceId", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+						"UIMAEE_message_has_cas_refid__INFO",
+						new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+			}
+			return null;
+		}
+		return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+	}
+
+	protected CacheEntry getCacheEntryForCas(String casReferenceId) {
+		try {
+			return controller.getInProcessCache().getCacheEntryForCAS(casReferenceId);
+		} catch (AsynchAEException e) {
+			return new InProcessCache.UndefinedCacheEntry();
+		}
+	}
+
+	protected CasStateEntry createCasStateEntry(String casReferenceId) {
+		return controller.getLocalCache().createCasStateEntry(casReferenceId);
+	}
+
+	protected CasStateEntry getCasStateEntry(String casReferenceId) {
+		CasStateEntry casStateEntry = null;
+		if ((casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId)) == null) {
+			// Create a new entry in the local cache for the CAS received from the remote
+			casStateEntry = createCasStateEntry(casReferenceId);
+		}
+		return casStateEntry;
+	}
+
+	protected boolean isTopLevelAggregate() {
+		return (controller.isTopLevelComponent() && controller instanceof AggregateAnalysisEngineController);
+	}
+
+	protected void handleError(Exception e, CacheEntry cacheEntry, MessageContext mc) {
+		if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+			UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "handleError",
+					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+					controller.getComponentName());
+
+			UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "handleError",
+					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+		}
+		ErrorContext errorContext = new ErrorContext();
+		errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+		errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+		errorContext.add(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId());
+		controller.dropCAS(cacheEntry.getCas());
+		controller.getErrorHandlerChain().handle(e, errorContext, controller);
+
+	}
+
+	protected void saveStats(CacheEntry entry, long inTime, long t1, long timeWaitingForCAS) {
+		long timeToDeserializeCAS = controller.getCpuTime() - t1;
+		controller.incrementDeserializationTime(timeToDeserializeCAS);
+		entry.incrementTimeToDeserializeCAS(timeToDeserializeCAS);
+		entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+
+		LongNumericStatistic statistic;
+		if ((statistic = controller.getMonitor().getLongNumericStatistic("", Monitor.TotalDeserializeTime)) != null) {
+			statistic.increment(timeToDeserializeCAS);
+		}
+		if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+			UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "saveStats",
+					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialize_cas_time_FINE",
+					new Object[] { (double) timeToDeserializeCAS / 1000000.0 });
+		}
+
+		// Update Stats
+		ServicePerformance casStats = controller.getCasStatistics(entry.getCasReferenceId());
+		casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
+		if (controller.isTopLevelComponent()) {
+			synchronized (mux) {
+				controller.getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
+			}
+		}
+		controller.saveTime(inTime, entry.getCasReferenceId(), controller.getName());
+		if (!controller.isPrimitive()) {
+			DelegateStats stats = new DelegateStats();
+			if (entry.getStat() == null) {
+				entry.setStat(stats);
+				// Add entry for self (this aggregate). MessageContext.getEndpointName()
+				// returns the name of the queue receiving the message.
+				stats.put(controller.getServiceEndpointName(), new TimerStats());
+			} else {
+				if (!stats.containsKey(controller.getServiceEndpointName())) {
+					stats.put(controller.getServiceEndpointName(), new DelegateStats());
+				}
+			}
+		}
+
+	}
+
+	protected static ErrorContext populateErrorContext(MessageContext aMessageCtx) {
+		ErrorContext errorContext = new ErrorContext();
+		if (aMessageCtx != null) {
+			try {
+				if (aMessageCtx.propertyExists(AsynchAEMessage.Command)) {
+					errorContext.add(AsynchAEMessage.Command,
+							aMessageCtx.getMessageIntProperty(AsynchAEMessage.Command));
+				}
+
+				if (aMessageCtx.propertyExists(AsynchAEMessage.MessageType)) {
+					errorContext.add(AsynchAEMessage.MessageType,
+							aMessageCtx.getMessageIntProperty(AsynchAEMessage.MessageType));
+				}
+
+				if (aMessageCtx.propertyExists(AsynchAEMessage.CasReference)) {
+					errorContext.add(AsynchAEMessage.CasReference,
+							aMessageCtx.getMessageStringProperty(AsynchAEMessage.CasReference));
+				}
+				errorContext.add(UIMAMessage.RawMsg, aMessageCtx.getRawMessage());
+			} catch (Exception e) { /* ignore */
+			}
+		}
+		return errorContext;
+	}
+
+	protected Endpoint fetchParentCasOrigin(String parentCasId) throws AsynchAEException {
+		Endpoint endpoint = null;
+		String parentId = parentCasId;
+		// Loop through the parent tree until an origin is found
+		while (parentId != null) {
+			// Check if the current parent has an associated origin. Only input CAS
+			// has an origin of the request. The origin is an endpoint of a client
+			// who sent an input CAS for processing
+			endpoint = ((AggregateAnalysisEngineController) controller).getMessageOrigin(parentId);
+			// Check if there is an origin. If so, we are done
+			if (endpoint != null) {
+				break;
+			}
+			// The current parent has no origin, get its parent and try again
+			CacheEntry entry = controller.getInProcessCache().getCacheEntryForCAS(parentId);
+			parentId = entry.getInputCasReferenceId();
+		}
+		return endpoint;
+	}
+
+	protected CAS getNewCAS(CASFactory factory, String casRequestorOrigin) { 
+		CAS cas = null;
+		// Aggregate time spent waiting for a CAS in the service cas pool
+		controller.getServicePerformance().beginWaitOnCASPool();
+		if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.FINE)) {
+			UIMAFramework.getLogger(this.getClass()).logrb(Level.FINE, this.getClass().getName(), "deserializeChildCAS",
+					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
+					new Object[] { casRequestorOrigin });
+		}
+		cas = factory.newCAS();
+
+		if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+			UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "deserializeChildCAS",
+					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted_cm__FINE",
+					new Object[] { casRequestorOrigin });
+		}
+
+		controller.getServicePerformance().endWaitOnCASPool();
+
+		ServicePerformance sp = controller.getServicePerformance();
+		sp.incrementCasPoolWaitTime(sp.getTimeWaitingForCAS());
+
+		return cas;
+	}
+
+	protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint,
+			MessageContext mc) throws Exception {
+		SerializationResult result = new SerializationResult();
+
+		// Aggregate time spent waiting for a CAS in the shadow cas pool
+		((AggregateAnalysisEngineController) controller).getDelegateServicePerformance(casMultiplierDelegateKey)
+				.beginWaitOnShadowCASPool();
+
+		long t1 = controller.getCpuTime();
+		CAS cas = getNewCAS(new ChildCASFactory(casMultiplierDelegateKey), casMultiplierDelegateKey);
+		result.setCas(cas);
+
+		result.setTimeWaitingForCAS(controller.getCpuTime() - t1);
+
+		((AggregateAnalysisEngineController) controller).getDelegateServicePerformance(casMultiplierDelegateKey)
+				.endWaitOnShadowCASPool();
+
+		// Check if we are still running
+		if (controller.isStopped()) {
+			// The Controller is in shutdown state.
+			controller.dropCAS(cas);
+			return null;
+		}
+		// Create deserialized wrapper for XMI, BINARY, COMPRESSED formats. To add
+		// a new serialization format add a new class which implements
+		// UimaASDeserializer and modify DeserializerFactory class.
+		UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+		deserializer.deserialize(result);
+
+		return result;
+	}
+
+	protected SerializationResult deserializeInputCAS( MessageContext mc)
+			throws Exception {
+		SerializationResult result = new SerializationResult();
+		String origin = mc.getEndpoint().getEndpoint();
+		Endpoint endpoint = mc.getEndpoint();
+		
+		// Time how long we wait on Cas Pool to fetch a new CAS
+		long t1 = controller.getCpuTime();
+		CAS cas = getNewCAS(new InputCASFactory(), origin);
+		result.setCas(cas);
+
+		result.setTimeWaitingForCAS(controller.getCpuTime() - t1);
+
+		// Check if we are still running
+		if (controller.isStopped()) {
+			// The Controller is in shutdown state.
+			controller.dropCAS(cas);
+			return null;
+		}
+
+		UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+		deserializer.deserialize(result);
+
+		return result;
+	}
+	protected Delegate getDelegate(MessageContext mc) throws AsynchAEException {
+		String delegateKey = null;
+		if (mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length() == 0) {
+			String fromEndpoint = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+			delegateKey = ((AggregateAnalysisEngineController) controller)
+					.lookUpDelegateKey(fromEndpoint);
+		} else {
+			delegateKey = ((AggregateAnalysisEngineController) controller)
+					.lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+		}
+		return ((AggregateAnalysisEngineController) controller).lookupDelegate(delegateKey);
+	}
+
+	public static class DeserializerFactory {
+		// only static reference allowed
+		private DeserializerFactory() {
+		}
+		public static UimaASDeserializer newDeserializer(Endpoint endpoint, MessageContext mc) throws AsynchAEException {
+			switch (endpoint.getSerialFormat()) {
+			case XMI:
+				return new XMIDeserializer(mc.getStringMessage());
+			case BINARY:
+				return new BinaryDeserializer(mc.getByteMessage(), endpoint);
+			case COMPRESSED_FILTERED:
+				return new CompressedFilteredDeserializer(mc.getByteMessage(), endpoint);
+			default:
+				throw new AsynchAEException("Never Happen");
+
+			}
+		}
+	}
+
+	public static interface UimaASDeserializer {
+		public void deserialize(SerializationResult result) throws Exception;
+	}
+
+	public static class CompressedFilteredDeserializer implements UimaASDeserializer {
+		byte[] binarySource;
+		Endpoint endpoint;
+
+		public CompressedFilteredDeserializer(byte[] binarySource, Endpoint endpoint) {
+			this.binarySource = binarySource;
+			this.endpoint = endpoint;
+		}
+
+		public void deserialize(SerializationResult result) throws Exception {
+			ByteArrayInputStream bais = new ByteArrayInputStream(binarySource);
+			ReuseInfo reuseInfo = Serialization
+					.deserializeCAS(result.getCas(), bais, endpoint.getTypeSystemImpl(), null).getReuseInfo();
+			result.setReuseInfo(reuseInfo);
+		}
+	}
+
+	public static class BinaryDeserializer implements UimaASDeserializer {
+		byte[] binarySource;
+		Endpoint endpoint;
+
+		public BinaryDeserializer(byte[] binarySource, Endpoint endpoint) {
+			this.binarySource = binarySource;
+			this.endpoint = endpoint;
+		}
+
+		public void deserialize(SerializationResult result) throws Exception {
+			UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId();
+			// BINARY format may be COMPRESSED etc, so update it upon reading
+			SerialFormat serialFormat = 
+					uimaSerializer.deserializeCasFromBinary(binarySource, result.getCas());
+			// BINARY format may be COMPRESSED etc, so update it upon reading
+			endpoint.setSerialFormat(serialFormat);
+		}
+	}
+
+	public static class XMIDeserializer implements UimaASDeserializer {
+		String xmi;
+
+		public XMIDeserializer(String xmi) {
+			this.xmi = xmi;
+		}
+
+		public void deserialize(SerializationResult result) throws Exception {
+			UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId();
+			XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+			uimaSerializer.deserializeCasFromXmi(xmi, result.getCas(), deserSharedData, true, -1);
+			result.setDeserSharedData(deserSharedData);
+		}
+
+	}
+
+	public static class SerializationResult {
+		Marker marker = null;
+		CAS cas = null;
+		XmiSerializationSharedData deserSharedData = null;
+		ReuseInfo reuseInfo = null;
+		boolean acceptsDeltaCas = false;
+		long timeWaitingForCAS = 0;
+
+		public long getTimeWaitingForCAS() {
+			return timeWaitingForCAS;
+		}
+
+		public void setTimeWaitingForCAS(long timeWaitingForCAS) {
+			this.timeWaitingForCAS = timeWaitingForCAS;
+		}
+
+		public boolean acceptsDeltaCas() {
+			return acceptsDeltaCas;
+		}
+
+		public void setAcceptsDeltaCas(boolean acceptsDeltaCas) {
+			this.acceptsDeltaCas = acceptsDeltaCas;
+		}
+
+		public Marker getMarker() {
+			return marker;
+		}
+
+		public void setMarker(Marker marker) {
+			this.marker = marker;
+		}
+
+		public CAS getCas() {
+			return cas;
+		}
+
+		public void setCas(CAS cas) {
+			this.cas = cas;
+		}
+
+		public XmiSerializationSharedData getDeserSharedData() {
+			return deserSharedData;
+		}
+
+		public void setDeserSharedData(XmiSerializationSharedData deserSharedData) {
+			this.deserSharedData = deserSharedData;
+		}
+
+		public ReuseInfo getReuseInfo() {
+			return reuseInfo;
+		}
+
+		public void setReuseInfo(ReuseInfo reuseInfo) {
+			this.reuseInfo = reuseInfo;
+		}
+
+	}
+
+	public interface CASFactory {
+		public CAS newCAS();
+
+	}
+
+	public class ChildCASFactory implements CASFactory {
+		String casMultiplierDelegateKey;
+
+		public ChildCASFactory(String casMultiplierDelegateKey) {
+			this.casMultiplierDelegateKey = casMultiplierDelegateKey;
+		}
+
+		public CAS newCAS() {
+			return controller.getCasManagerWrapper().getNewCas(casMultiplierDelegateKey);
+		}
+
+	}
+
+	public class InputCASFactory implements CASFactory {
+		public CAS newCAS() {
+			return controller.getCasManagerWrapper().getNewCas();
+		}
+
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.message.MessageContext;
+
+public class CollectionProcessCompleteRequestCommand  extends AbstractUimaAsCommand {
+	private MessageContext mc;
+	
+	public CollectionProcessCompleteRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+	public void execute() throws Exception {
+        Endpoint endpoint = mc.getEndpoint();
+        controller.collectionProcessComplete(endpoint);
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+
+public class CollectionProcessCompleteResponseCommand  extends AbstractUimaAsCommand {
+	private MessageContext mc;
+	
+	public CollectionProcessCompleteResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+	public void execute() throws Exception {
+		Delegate delegate = super.getDelegate(mc);
+	    try {
+	          ((AggregateAnalysisEngineController)controller)
+	                  .processCollectionCompleteReplyFromDelegate(delegate.getKey(), true);
+	      } catch (Exception e) {
+	        ErrorContext errorContext = new ErrorContext();
+	        errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
+	        errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+	        controller.getErrorHandlerChain().handle(e, errorContext, controller);
+	      }
+	}
+}