You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [9/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima...

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+
+public class CommandFactory {
+	// Can't instantiate this factory. Use static methods only
+	private CommandFactory() {
+
+	}
+	public static UimaAsCommand newCommand(MessageContext mc, AnalysisEngineController controller)
+			throws AsynchAEException {
+		// Message type is either Request or Response
+		int messageType = mc.getMessageIntProperty(AsynchAEMessage.MessageType);
+
+		if (messageType == AsynchAEMessage.Request) {
+			return newRequestCommand(mc, controller);
+		} else if (messageType == AsynchAEMessage.Response) {
+			return newResponseCommand(mc, controller);
+		}
+
+		return CommandBuilder.createNoOpCommand(mc, controller);
+	}
+	private static UimaAsCommand newRequestCommand(MessageContext mc, AnalysisEngineController controller)
+			throws AsynchAEException {
+		int command = mc.getMessageIntProperty(AsynchAEMessage.Command);
+		UimaAsCommand command2Run;
+		switch (command) {
+		case AsynchAEMessage.Process:
+			if (mc.propertyExists(AsynchAEMessage.CasSequence)) {
+				command2Run = CommandBuilder.createProcessChildCasRequestCommand(mc, controller);
+			} else {
+				command2Run = CommandBuilder.createProcessInputCasRequestCommand(mc, controller);
+			}
+			break;
+		case AsynchAEMessage.GetMeta:
+			command2Run = CommandBuilder.createGetMetaRequestCommand(mc, controller);
+			break;
+		case AsynchAEMessage.CollectionProcessComplete:
+			command2Run = CommandBuilder.createCollectionProcessCompleteRequestCommand(mc, controller);
+			break;
+
+		case AsynchAEMessage.ReleaseCAS:
+			command2Run = CommandBuilder.createReleaseCASRequestCommand(mc, controller);
+			break;
+		default:
+			command2Run = CommandBuilder.createNoOpCommand(mc, controller);
+			break;
+		}
+		return command2Run;
+	}
+
+	private static UimaAsCommand newResponseCommand(MessageContext mc, AnalysisEngineController controller)
+			throws AsynchAEException {
+		int command = mc.getMessageIntProperty(AsynchAEMessage.Command);
+		UimaAsCommand command2Run;
+		switch (command) {
+		case AsynchAEMessage.Process:
+			if (mc.propertyExists(AsynchAEMessage.CasSequence)) {
+				command2Run = CommandBuilder.createProcessChildCasResponseCommand(mc, controller);
+			} else {
+				command2Run = CommandBuilder.createProcessInputCasResponseCommand(mc, controller);
+			}
+			break;
+		case AsynchAEMessage.GetMeta:
+			command2Run = CommandBuilder.createGetMetaResponseCommand(mc, controller);
+			break;
+
+		case AsynchAEMessage.CollectionProcessComplete:
+			command2Run = CommandBuilder.createCollectionProcessCompleteResponseCommand(mc, controller);
+			break;
+		default:
+			command2Run = CommandBuilder.createNoOpCommand(mc, controller);
+			break;
+		}
+		return command2Run;
+	}
+
+
+
+	private static class CommandBuilder {
+		// Can't instantiate the builder. Use static calls only
+		private CommandBuilder() {
+
+		}
+		static UimaAsCommand createProcessChildCasRequestCommand(MessageContext mc,
+				AnalysisEngineController controller) {
+			return new ProcessChildCasRequestCommand(mc, controller);
+		}
+
+		static UimaAsCommand createProcessInputCasRequestCommand(MessageContext mc,
+				AnalysisEngineController controller) {
+			return new ProcessInputCasRequestCommand(mc, controller);
+		}
+
+		static UimaAsCommand createProcessChildCasResponseCommand(MessageContext mc,
+				AnalysisEngineController controller) {
+			return new ProcessChildCasResponseCommand(mc, controller);
+		}
+
+		static UimaAsCommand createProcessInputCasResponseCommand(MessageContext mc,
+				AnalysisEngineController controller) {
+			return new ProcessInputCasResponseCommand(mc, controller);
+		}
+
+		static UimaAsCommand createGetMetaResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+			return new GetMetaResponseCommand(mc, controller);
+		}
+
+		static UimaAsCommand createGetMetaRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+			return new GetMetaRequestCommand(mc, controller);
+		}
+
+		static UimaAsCommand createReleaseCASRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+			return new ReleaseCASRequestCommand(mc, controller);
+		}
+
+		static UimaAsCommand createNoOpCommand(MessageContext mc, AnalysisEngineController controller) {
+			return new NoOpCommand(mc, controller);
+		}
+
+		static UimaAsCommand createCollectionProcessCompleteRequestCommand(MessageContext mc,
+				AnalysisEngineController controller) {
+			return new CollectionProcessCompleteRequestCommand(mc, controller);
+		}
+
+		static UimaAsCommand createCollectionProcessCompleteResponseCommand(MessageContext mc,
+				AnalysisEngineController controller) {
+			return new CollectionProcessCompleteResponseCommand(mc, controller);
+		}
+
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.util.Level;
+
+public class GetMetaRequestCommand extends AbstractUimaAsCommand  {
+	private MessageContext mc;
+	
+	public GetMetaRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+	public void execute() throws Exception {
+        Endpoint endpoint = mc.getEndpoint();
+        if (controller.isTopLevelComponent()) {
+          endpoint.setCommand(AsynchAEMessage.GetMeta);
+          controller.cacheClientEndpoint(endpoint);
+        }
+        if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.FINEST)) {
+          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(), "execute",
+                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAEE_handling_metadata_request__FINEST",
+                  new Object[] { endpoint.getEndpoint() });
+        }
+        controller.getControllerLatch().waitUntilInitialized();
+        // Check to see if the controller hasnt been aborted while we were waiting on the latch
+        if (!controller.isStopped()) {
+System.out.println("............................ Service:"+controller.getComponentName()+" Dispatching Metadata to Client");
+        	controller.sendMetadata(endpoint);
+        }
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.resource.metadata.ResourceMetaData;
+import org.apache.uima.util.XMLInputSource;
+
+public class GetMetaResponseCommand extends AbstractUimaAsCommand {
+	private MessageContext mc;
+	
+	public GetMetaResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+	public void execute() throws Exception {
+		System.out.println(".......... GetMetaResponseCommand.execute()- handling GetMeta response - Controller:"+controller.getName());
+      //  Endpoint endpoint = mc.getEndpoint();
+        int payload = mc
+                .getMessageIntProperty(AsynchAEMessage.Payload);
+        
+
+       if (AsynchAEMessage.Exception == payload) {
+            return;
+          }
+       
+       String fromEndpoint = mc
+           .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+
+       String delegateKey = ((AggregateAnalysisEngineController) controller)
+               .lookUpDelegateKey(fromEndpoint);
+          ResourceMetaData resource = null;
+          int serializationSupportedByRemote = AsynchAEMessage.None;
+//          ((MessageContext) anObjectToHandle).getMessageIntProperty(AsynchAEMessage.SERIALIZATION);
+
+          if ( serializationSupportedByRemote == AsynchAEMessage.None ) {
+          	resource = (ResourceMetaData)
+          			((MessageContext)mc).getMessageObjectProperty(AsynchAEMessage.AEMetadata);
+          } else {
+              String analysisEngineMetadata = ((MessageContext) mc).getStringMessage();
+              ByteArrayInputStream bis = new ByteArrayInputStream(analysisEngineMetadata.getBytes());
+              XMLInputSource in1 = new XMLInputSource(bis, null);
+              resource = UIMAFramework.getXMLParser().parseResourceMetaData(in1);
+          }
+          String fromServer = null;
+          if (((MessageContext) mc).propertyExists(AsynchAEMessage.EndpointServer)) {
+            fromServer = ((MessageContext) mc)
+                    .getMessageStringProperty(AsynchAEMessage.EndpointServer);
+          }
+          ((AggregateAnalysisEngineController)controller).changeCollocatedDelegateState(delegateKey, ServiceState.RUNNING);
+          // If old service does not echo back the external broker name then the queue name must
+          // be unique.
+          // The ServerURI set by the service may be its local name for the broker, e.g.
+          // tcp://localhost:61616
+          
+          ((AggregateAnalysisEngineController) controller).mergeTypeSystem(
+                  resource, fromEndpoint, fromServer);
+          ((AggregateAnalysisEngineController) controller).setRemoteSerializationSupported(serializationSupportedByRemote, fromEndpoint, fromServer);
+
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+
+public class NoOpCommand extends AbstractUimaAsCommand {
+	MessageContext mc;
+	public NoOpCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+	public void execute() throws Exception {
+		System.out.println("*******************************************************"
+				+ "\nNoOpCommand.execute() - Either wrong command or message type - Command:"+
+				mc.getMessageIntProperty(AsynchAEMessage.Command) + " MessageType:"+
+				mc.getMessageIntProperty(AsynchAEMessage.MessageType) + " Service:"+controller.getComponentName() +
+				"\n*******************************************************");
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.util.Level;
+
+public class PingRequestCommand extends AbstractUimaAsCommand  {
+	private MessageContext mc;
+
+	public PingRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+
+	public void execute() throws Exception {
+		try {
+			ENDPOINT_TYPE et = ENDPOINT_TYPE.DIRECT; // default
+			if ( mc.getEndpoint().isRemote() ) {
+				et = ENDPOINT_TYPE.JMS;
+			} 
+				
+			controller.getOutputChannel(et).sendReply(AsynchAEMessage.Ping, mc.getEndpoint(), null, false);
+		} catch (Exception e) {
+			if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.WARNING)) {
+				if (controller != null) {
+					UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(), "execute",
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+							controller.getComponentName());
+				}
+
+				UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, getClass().getName(), "execute",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+			}
+		}
+
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.Marker;
+import org.apache.uima.util.Level;
+
+public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
+	private MessageContext mc;
+
+	public ProcessChildCasRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+
+	public void execute() throws Exception {
+		int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+		String casReferenceId = super.getCasReferenceId(this.getClass(), mc);
+		String parentCasReferenceId = mc.getMessageStringProperty(AsynchAEMessage.InputCasReference);
+		System.out.println(">>>>>>>>>>>>>>> Controller:"+controller.getComponentName()+
+				" in ProcessChildCasRequestCommand.execute() - Child CAS:"+casReferenceId+
+				" Parent CAS:"+parentCasReferenceId+
+				" from "+mc
+                .getMessageStringProperty(AsynchAEMessage.MessageFrom)
+				);
+
+		if (parentCasReferenceId == null) {
+            if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+                UIMAFramework.getLogger(getClass()).logrb(
+                        Level.INFO,
+                        getClass().getName(),
+                        "execute",
+                        UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAEE_input_cas_invalid__INFO",
+                        new Object[] { controller.getComponentName(), mc.getEndpointName(),
+                        		parentCasReferenceId });
+              }
+
+			// LOG THIS
+			System.out.println(
+					"ProcessChildCasRequestCommand.execute() - Parent CasReferenceId is missing in MessageContext - Ignoring Message");
+			return; // Nothing to do
+
+		}
+		if (casReferenceId == null) {
+			// LOG THIS
+			System.out.println(
+					"ProcessChildCasRequestCommand.execute() - CasReferenceId is missing in MessageContext - Ignoring Message");
+			return; // Nothing to do
+		}
+		// Save Process command in the client endpoint.
+		Endpoint clientEndpoint = controller.getClientEndpoint();
+		if (clientEndpoint != null) {
+			clientEndpoint.setCommand(AsynchAEMessage.Process);
+		}
+
+		// Cas was passed by reference meaning InProcessCache must have it
+		if (AsynchAEMessage.CASRefID == payload) {
+			System.out.println(
+					"ProcessChildCasRequestCommand.execute() - Child CasReferenceId:"+casReferenceId+" From Co-located CM");
+
+			if (mc.getEndpoint() == null) {
+				if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "executeDirectRequest",
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
+							new Object[] { casReferenceId });
+				}
+				return;
+			}
+
+			executeDirectRequest(casReferenceId, parentCasReferenceId);
+		} else {
+			// Check if there is an XMI cargo in the message
+			if (mc.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload
+					&& mc.getStringMessage() == null) {
+				return; // No XMI just return
+			}
+			System.out.println(
+					"ProcessChildCasRequestCommand.execute() - Child CasReferenceId:"+casReferenceId+" From Remote CM");
+
+			executeRemoteRequest(casReferenceId, parentCasReferenceId);
+		}
+	}
+	private void saveFreeCasDestination(CasStateEntry childCasStateEntry) throws AsynchAEException {
+		if ( mc.getMessageObjectProperty(AsynchAEMessage.FreeCASQueue) != null ) {
+			Object freeCASQueue = 
+					mc.getMessageObjectProperty(AsynchAEMessage.FreeCASQueue);
+			Endpoint freeCasNotificationEndpoint =
+					new Endpoint_impl();
+			freeCasNotificationEndpoint.setServerURI("java");
+			freeCasNotificationEndpoint.setFreeCasEndpoint(true);
+			freeCasNotificationEndpoint.setJavaRemote();
+			freeCasNotificationEndpoint.setDestination(freeCASQueue);
+			// the aggregate will send FREE CAS request to the delegate deployed
+			// in the same JVM
+			childCasStateEntry.setFreeCasNotificationEndpoint(freeCasNotificationEndpoint);
+		}
+	}
+	private CasStateEntry getParentCasStateEntry(String parentCasReferenceId, Delegate delegateCasMultiplier ) throws AsynchAEException {
+		// fetch parent CAS entry from this aggregate's local cache. This must
+		// exist if we receive a child CAS generated from the parent
+		CasStateEntry parentCasStateEntry = controller.getLocalCache().lookupEntry(parentCasReferenceId);
+		if ( parentCasStateEntry == null ) {
+			throw new AsynchAEException("Parent CAS "+parentCasReferenceId+ " Not Found in InprocessCache");
+		}
+		return parentCasStateEntry;
+	}
+	private void associateInputCASOriginWithChildCAS(String childCasReferenceId, String parentCasReferenceId, Delegate delegateCasMultiplier ) 
+	throws AsynchAEException {
+		
+		Endpoint inputCasOrigin = fetchParentCasOrigin(parentCasReferenceId);
+		if ( controller.isCasMultiplier()) {
+			// associate this aggregate client reply endpoint with the child CAS
+			// Since this aggregate is a CM, the child CAS maybe sent to this client.
+			((AggregateAnalysisEngineController) controller).addMessageOrigin(childCasReferenceId, inputCasOrigin);
+		}
+		if (inputCasOrigin == null) {
+			if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+				UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "executeDirectRequest",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_msg_origin_not_found__INFO",
+						new Object[] { controller.getComponentName(), parentCasReferenceId });
+			}
+
+		} else {
+			((AggregateAnalysisEngineController) controller).addMessageOrigin(childCasReferenceId, inputCasOrigin);
+			if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINEST)) {
+				UIMAFramework.getLogger(getClass()).logrb(Level.FINEST, getClass().getName(), "executeDirectRequest",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_msg_origin_added__FINEST", new Object[] {
+								controller.getComponentName(), childCasReferenceId, delegateCasMultiplier.getKey() });
+			}
+		}
+	}
+	private void executeDirectRequest(String childCasReferenceId, String parentCasReferenceId)  {
+		CAS cas = null;
+		try {
+			// if the child CAS was produced by a co-located delegate CM
+			// of this aggregate, the CasStateEntry instance has already
+			// been created for the child by the CM controller in its 
+			// process() method. If the child CAS was
+			// produced by an *aggregate* CM delegate, a new CasStateEntry
+			// needs to be created. The CasStateEntry is local to each
+			// aggregate.
+			CasStateEntry childCasStateEntry = getCasStateEntry(childCasReferenceId);
+
+			// get the delegate CM where the child CAS was produced
+			Delegate delegateCasMultiplier = getLastDelegate(childCasStateEntry);
+			// associate the delegate CM with the child CAS
+			setLastDelegate(delegateCasMultiplier, childCasStateEntry);
+
+			// the parent CAS must be in this controller's local cache
+			CasStateEntry parentCasStateEntry = 
+					getParentCasStateEntry(parentCasReferenceId, delegateCasMultiplier );
+			
+			// if a child CAS has been created in a primitive CM delegate of
+			// this aggregate the parent CAS id has already been assigned by
+			// the CM. If the child CAS was created by a remote java CM delegate
+			// we need to associate the child with its parent CAS here. Remote 
+			// java CM is deployed in the same JVM but is considered as remote
+			if (childCasStateEntry.getParentCasReferenceId() == null) {
+				childCasStateEntry.setParentCasReferenceId(parentCasReferenceId);
+			}
+			// Check if the parent CAS is in a failed state first
+			if (parentCasStateEntry.isFailed()) {
+				// handle CAS release
+				controller.process(null, childCasReferenceId);
+				return;
+			}
+			// if this aggregate is a CM, we may send this child CAS to the aggregate's client.
+			// Associate this aggregate client with the child CAS
+			childCasStateEntry.setClientEndpoint(parentCasStateEntry.getClientEndpoint());
+			// every child CAS is associated with an input CAS. An input CAS may be different
+			// from the child CAS' parent CAS if this aggregate has more than one CM. An input 
+			// CAS is a top ancestor from which child CASes are produced.
+			childCasStateEntry.setInputCasReferenceId(parentCasStateEntry.getInputCasReferenceId());
+
+			mc.getEndpoint().setIsCasMultiplier(true);
+			associateInputCASOriginWithChildCAS(childCasReferenceId, parentCasReferenceId, delegateCasMultiplier );
+
+			Endpoint e = ((AggregateAnalysisEngineController_impl)controller).
+					getDestinations().get(delegateCasMultiplier.getKey());
+
+			// delegate of this aggregate can be deployed in the same JVM but its deployed
+			// with a Java queue instead of JMS. Such service is called JavaRemote delegate.
+			// This is an independent CM service and we need to send explicit FreeCAS request
+			// for every child CAS received from there. Its like a remote but runs in the
+			// same JVM as this aggregate.
+			if ( e.isJavaRemote() ) {
+				saveFreeCasDestination(childCasStateEntry);
+				// Increment parent CAS child count. This is needed to determine when the parent
+				// can be released. Only if child count is zero, the parent CAS can be released.
+				parentCasStateEntry.incrementSubordinateCasInPlayCount();
+				//System.out.println("..Controller:"+controller.getComponentName()+" Processing Child CAS:"+childCasReferenceId+" Incremented Parent CAS:"+parentCasReferenceId+ " Child Count to:"+parentCasStateEntry.getSubordinateCasInPlayCount());
+			}
+			long arrivalTime = System.nanoTime();
+			controller.saveTime(arrivalTime, childCasReferenceId, controller.getName());
+
+			// Save Process command in the client endpoint.
+			Endpoint clientEndpoint = controller.getClientEndpoint();
+			if (clientEndpoint != null) {
+				clientEndpoint.setCommand(AsynchAEMessage.Process);
+			}
+
+			if (controller.isStopped()) {
+				return;
+			}
+			cas = controller.getInProcessCache().getCasByReference(childCasReferenceId);
+			if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+				UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "executeDirectRequest",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
+						new Object[] { childCasReferenceId });
+			}
+			StringBuilder sb = new StringBuilder();
+			sb.append("############ Controller:").append(controller.getComponentName()).
+			   append(" Processing Child CAS:").append(childCasReferenceId).append(" From Parent CAS:").
+			   append(parentCasReferenceId).append(" Child Count to:").append(parentCasStateEntry.getSubordinateCasInPlayCount());
+			System.out.println(sb.toString());
+			((AggregateAnalysisEngineController) controller).process(cas, parentCasReferenceId,
+					childCasReferenceId, delegateCasMultiplier.getKey());
+
+		} catch (Exception e) {
+			handleException(e);
+		}
+	}
+	private void handleException(Exception e) {
+		if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+			UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+					controller.getComponentName());
+
+			UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+		}
+		if ( !(e instanceof AsynchAEException) ) {
+			e = new AsynchAEException(e);
+		}
+	    controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+
+		e.printStackTrace();
+
+	}
+	private void executeRemoteRequest(String childCasReferenceId, String parentCasReferenceId)
+			throws AsynchAEException {
+		SerializationResult result = null;
+		CacheEntry childCacheEntry = null;
+		long inTime = System.nanoTime();
+
+		try {
+			CasStateEntry parentCasStateEntry = saveFreeCasEndpointInParentCasStateEntry(parentCasReferenceId);
+
+			computeStats(mc, parentCasReferenceId);
+
+			mc.getEndpoint().setDestination(null);
+
+			// This CAS came in from a CAS Multiplier. Treat it differently than the
+			// input CAS. In case the Aggregate needs to send this CAS to the
+			// client, retrieve the client destination by looking up the client endpoint
+			// using input CAS reference id. CASes generated by the CAS multiplier will have
+			// the same Cas Reference id. Fetch Cache entry for the parent CAS
+			CacheEntry parentCasCacheEntry = controller.getInProcessCache().getCacheEntryForCAS(parentCasReferenceId);
+			Endpoint replyToEndpoint = parentCasCacheEntry.getMessageOrigin();
+			// The message context contains a Cas Multiplier endpoint. Since we dont want to
+			// send a generated CAS back to the CM, override with an endpoint provided by
+			// the client of
+			// this service. Client endpoint is attached to an input Cas cache entry.
+			if (replyToEndpoint != null) {
+				mc.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
+				mc.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
+			}
+			// create local cache entry for the child CAS
+			CasStateEntry childCasStateEntry = super.getCasStateEntry(childCasReferenceId);
+			// associate parent CAS with the child
+			childCasStateEntry.setParentCasReferenceId(parentCasReferenceId);
+			Delegate delegate = getLastDelegate(childCasStateEntry);
+			setLastDelegate(delegate, childCasStateEntry);
+			
+			// If there is one thread receiving messages from Cas Multiplier increment
+			// number of child CASes of the parent CAS. If there are more threads
+			// (consumers)
+			// a special object ConcurrentMessageListener has already incremented the count.
+			// This special object enforces order of processing for CASes
+			// coming in from the Cas Multiplier.
+			if (!delegate.hasConcurrentConsumersOnReplyQueue()) {
+				parentCasStateEntry.incrementSubordinateCasInPlayCount();
+			}
+
+			boolean failed = false;
+			// Time how long we wait on Cas Pool to fetch a new CAS
+			long t1 = controller.getCpuTime();
+			Exception cachedException = null;
+			try {
+				result = deserializeChildCAS(delegate.getKey(), mc.getEndpoint(), mc);
+
+			} catch (Exception e) {
+				failed = true;
+				cachedException = e;
+			} finally {
+				// create child CAS cache entry
+				childCacheEntry = controller.getInProcessCache().register(result.getCas(), mc,
+						result.getDeserSharedData(), result.getReuseInfo(), childCasReferenceId, result.getMarker(),
+						result.acceptsDeltaCas());
+				childCacheEntry.setInputCasReferenceId(parentCasReferenceId);
+				childCacheEntry.setFreeCasEndpoint(parentCasCacheEntry.getFreeCasEndpoint());
+				saveStats(childCacheEntry, inTime, t1, result.getTimeWaitingForCAS());
+				/*
+				 * ********************************************************* Throw an exception
+				 * if the deserialization above failed
+				 * *********************************************************
+				 */
+				if (failed) {
+					childCacheEntry.setFailed(true);
+					throw cachedException;
+				} else {
+					// *************************************************************************
+					// Check and set up for Delta CAS reply
+					// *************************************************************************
+					boolean acceptsDeltaCas = false;
+					Marker marker = null;
+					if (mc.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
+						acceptsDeltaCas = mc.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
+						if (acceptsDeltaCas) {
+							marker = result.getCas().createMarker();
+							result.setAcceptsDeltaCas(acceptsDeltaCas);
+							result.setMarker(marker);
+						}
+					}
+				}
+				if (controller.isStopped()) {
+					// The Controller is in shutdown state, release the CAS
+					controller.dropCAS(childCacheEntry.getCasReferenceId(), true);
+					return;
+				}
+				if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+					UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "executeRemoteRequest",
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
+							new Object[] { mc.getEndpoint().getEndpoint() });
+				}
+
+				((AggregateAnalysisEngineController) controller).process(result.getCas(), parentCasReferenceId,
+						childCasReferenceId, delegate.getKey());
+
+			}
+		} catch (Exception e) {
+			super.handleError(e, childCacheEntry, mc);
+		}
+
+	}
+	private void setLastDelegate(Delegate delegate, CasStateEntry childCasStateEntry) {
+		// Save the last delegate handling this CAS
+		childCasStateEntry.setLastDelegate(delegate);
+		controller.getInProcessCache().setCasProducer(childCasStateEntry.getCasReferenceId(), delegate.getKey());
+
+	}
+	private Delegate getLastDelegate(CasStateEntry childCasStateEntry) throws AsynchAEException {
+		String cmEndpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+		String newCASProducedBy = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(cmEndpointName);
+		Delegate delegate = ((AggregateAnalysisEngineController) controller).lookupDelegate(newCASProducedBy);
+
+		return delegate;
+	}
+
+	private CasStateEntry saveFreeCasEndpointInParentCasStateEntry(String parentCasReferenceId)
+			throws AsynchAEException {
+		// Fetch the name of the Cas Multiplier's input queue
+		// String cmEndpointName = aMessageContext.getEndpoint().getEndpoint();
+		String cmEndpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+		String newCASProducedBy = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(cmEndpointName);
+		Endpoint casMultiplierEndpoint = ((AggregateAnalysisEngineController) controller)
+				.lookUpEndpoint(newCASProducedBy, false);
+		Endpoint freeCasEndpoint = mc.getEndpoint();
+		// Clone an endpoint where Free Cas Request will be sent
+		freeCasEndpoint = (Endpoint) ((Endpoint_impl) freeCasEndpoint).clone();
+
+		if (casMultiplierEndpoint != null) {
+			// Save the URL of the broker managing the Free Cas Notification queue.
+			// This is needed when we try to establish a connection to the broker.
+			freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
+		}
+		CasStateEntry parentCasStateEntry = ((AggregateAnalysisEngineController) controller).getLocalCache()
+				.lookupEntry(parentCasReferenceId);
+
+		// Associate Free Cas Notification Endpoint with an input Cas
+		parentCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
+		return parentCasStateEntry;
+	}
+
+	protected void computeStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException {
+		if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
+			long departureTime = controller.getTime(aCasReferenceId, aMessageContext.getEndpoint().getEndpoint());
+			long currentTime = System.nanoTime();
+			long roundTrip = currentTime - departureTime;
+			long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+			long totalTimeInComms = currentTime - (departureTime - timeInService);
+
+			if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+				UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
+						new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),
+								(double) roundTrip / (double) 1000000 });
+
+				UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
+						new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000,
+								aMessageContext.getEndpoint() });
+
+				UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
+						new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000,
+								aMessageContext.getEndpoint() });
+			}
+		}
+		aggregateDelegateStats(aMessageContext, aCasReferenceId);
+
+	}
+
+	protected synchronized void aggregateDelegateStats(MessageContext aMessageContext, String aCasReferenceId)
+			throws AsynchAEException {
+		String delegateKey = "";
+		try {
+
+			delegateKey = ((AggregateAnalysisEngineController) controller)
+					.lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+			CacheEntry entry = controller.getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+			if (entry == null) {
+				throw new AsynchAEException("CasReferenceId:" + aCasReferenceId + " Not Found in the Cache.");
+			}
+			CacheEntry inputCasEntry = null;
+			String inputCasReferenceId = entry.getInputCasReferenceId();
+			ServicePerformance casStats = ((AggregateAnalysisEngineController) controller)
+					.getCasStatistics(aCasReferenceId);
+			if (inputCasReferenceId != null && controller.getInProcessCache().entryExists(inputCasReferenceId)) {
+				String casProducerKey = entry.getCasProducerKey();
+				if (casProducerKey != null
+						&& ((AggregateAnalysisEngineController) controller).isDelegateKeyValid(casProducerKey)) {
+					// Get entry for the input CAS
+					inputCasEntry = controller.getInProcessCache().getCacheEntryForCAS(inputCasReferenceId);
+				}
+
+			}
+			ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
+					.getServicePerformance(delegateKey);
+
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+				long timeToSerializeCAS = ((Long) aMessageContext
+						.getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+				if (timeToSerializeCAS > 0) {
+					if (delegateServicePerformance != null) {
+						delegateServicePerformance.incrementCasSerializationTime(timeToSerializeCAS);
+					}
+				}
+			}
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+				long timeToDeserializeCAS = ((Long) aMessageContext
+						.getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+				if (timeToDeserializeCAS > 0) {
+					if (delegateServicePerformance != null) {
+						delegateServicePerformance.incrementCasDeserializationTime(timeToDeserializeCAS);
+					}
+				}
+			}
+
+			if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
+				long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
+				if (idleTime > 0 && delegateServicePerformance != null) {
+					Endpoint endp = aMessageContext.getEndpoint();
+					if (endp != null && endp.isRemote()) {
+						delegateServicePerformance.incrementIdleTime(idleTime);
+					}
+				}
+			}
+
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+				long timeWaitingForCAS = ((Long) aMessageContext
+						.getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
+				if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+					entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+					delegateServicePerformance.incrementCasPoolWaitTime(
+							timeWaitingForCAS - delegateServicePerformance.getRawCasPoolWaitTime());
+					if (inputCasEntry != null) {
+						inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+					}
+				}
+			}
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+				long timeInProcessCAS = ((Long) aMessageContext
+						.getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
+				Endpoint endp = aMessageContext.getEndpoint();
+				if (endp != null && endp.isRemote()) {
+					if (delegateServicePerformance != null) {
+						// calculate the time spent in analysis. The remote service returns total time
+						// spent in the analysis. Compute the delta.
+						long dt = timeInProcessCAS - delegateServicePerformance.getRawAnalysisTime();
+						// increment total time in analysis
+						delegateServicePerformance.incrementAnalysisTime(dt);
+						controller.getServicePerformance().incrementAnalysisTime(dt);
+					}
+				} else {
+					controller.getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
+				}
+				casStats.incrementAnalysisTime(timeInProcessCAS);
+
+				if (inputCasReferenceId != null) {
+					ServicePerformance inputCasStats = ((AggregateAnalysisEngineController) controller)
+							.getCasStatistics(inputCasReferenceId);
+					// Update processing time for this CAS
+					if (inputCasStats != null) {
+						inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+					}
+				}
+
+			}
+		} catch (AsynchAEException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new AsynchAEException(e);
+		}
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.util.Level;
+
+public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand  {
+	private MessageContext mc;
+
+	public ProcessChildCasResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+	public void execute() throws Exception {
+//		System.out.println(">>>>>>>>>>>>>>> in ProcessChildCasResponseCommand.execute(");
+//		int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+//		String casReferenceId = super.getCasReferenceId(this.getClass(), mc);
+//		System.out.println(">>>>>>>>>>>>>>> Controller:"+controller.getComponentName()+
+//				" in ProcessChildCasResponseCommand.execute() - Child CAS:"+casReferenceId+
+//				" from "+mc
+//                .getMessageStringProperty(AsynchAEMessage.MessageFrom)
+//				);
+//
+//		if (casReferenceId == null) {
+//			// LOG THIS
+//			System.out.println(
+//					"ProcessInputCasResponseCommand.execute() - CasReferenceId is missing in MessageContext - Ignoring Message");
+//			return; // Nothing to do
+//		}
+//		// Save Process command in the client endpoint.
+//		Endpoint clientEndpoint = controller.getClientEndpoint();
+//		if (clientEndpoint != null) {
+//			clientEndpoint.setCommand(AsynchAEMessage.Process);
+//		}
+//
+//		// Cas was passed by reference meaning InProcessCache must have it
+//		if (AsynchAEMessage.CASRefID == payload) {
+//			executeDirectRequest(casReferenceId);
+//		} else {
+//			executeRemoteRequest(casReferenceId);
+//		}
+	}
+	private void executeDirectRequest(String casReferenceId) throws AsynchAEException {
+	    CacheEntry cacheEntry = null;
+
+	    try {
+	      casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+	      cacheEntry = controller.getInProcessCache().getCacheEntryForCAS(casReferenceId);
+	      CasStateEntry casStateEntry = ((AggregateAnalysisEngineController)controller)
+	              .getLocalCache().lookupEntry(casReferenceId);
+
+	      CAS cas = cacheEntry.getCas();
+	      String delegateKey = null;
+	      if ( mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length()==0) {
+	    	  String fromEndpoint = mc
+	                  .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+	    	  delegateKey = ((AggregateAnalysisEngineController) controller)
+	                  .lookUpDelegateKey(fromEndpoint);
+	      } else {
+	          delegateKey = ((AggregateAnalysisEngineController) controller)
+	                  .lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+	      }
+	      Delegate delegate = ((AggregateAnalysisEngineController) controller)
+	              .lookupDelegate(delegateKey);
+	      if (casStateEntry != null) {
+	        casStateEntry.setReplyReceived();
+	        casStateEntry.setLastDelegate(delegate);
+	      }
+	      delegate.removeCasFromOutstandingList(casReferenceId);
+
+	      if (cas != null) {
+	        cancelTimerAndProcess(mc, casReferenceId, cas);
+	      } else {
+	        if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+	          UIMAFramework.getLogger(getClass()).logrb(
+	                  Level.INFO,
+	                  getClass().getName(),
+	                  "handleProcessResponseWithCASReference",
+	                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAEE_cas_not_in_cache__INFO",
+	                  new Object[] { controller.getName(), casReferenceId,
+	                      mc.getEndpoint().getEndpoint() });
+	        }
+	        throw new AsynchAEException("CAS with Reference Id:" + casReferenceId
+	                + " Not Found in CasManager's CAS Cache");
+	      }
+	    } catch (Exception e) {
+
+	      ErrorContext errorContext = new ErrorContext();
+	      errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+	      errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
+	      errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+	      controller.getErrorHandlerChain().handle(e, errorContext, controller);
+	    } finally {
+	      incrementDelegateProcessCount(mc);
+	      if (controller instanceof AggregateAnalysisEngineController) {
+	        try {
+	          String endpointName = mc.getEndpoint().getEndpoint();
+	          String delegateKey = ((AggregateAnalysisEngineController) controller)
+	                  .lookUpDelegateKey(endpointName);
+	          if (delegateKey != null) {
+	            Endpoint endpoint = ((AggregateAnalysisEngineController) controller)
+	                    .lookUpEndpoint(delegateKey, false);
+
+	            // Check if the multiplier aborted during processing of this input CAS
+	            if (endpoint != null && endpoint.isCasMultiplier() && cacheEntry.isAborted()) {
+	              if (!controller.getInProcessCache().isEmpty()) {
+	                controller.getInProcessCache().registerCallbackWhenCacheEmpty(
+	                       controller.getEventListener());
+	              } else {
+	                // Callback to notify that the cache is empty
+	            	  
+	            	  // !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF CAS IS ABORTED?
+	            	  // !!!!!!!!!!!!!!!!!!!!!! ?????????????????????????????????
+//	                getController().getEventListener().onCacheEmpty();
+	              }
+	            }
+
+	          }
+	        } catch (Exception e) {
+	          if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+	              UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(),
+	                      "handleProcessResponseWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                      "UIMAEE_service_exception_WARNING", controller.getComponentName());
+
+	            UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(),
+	                    "handleProcessResponseWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAEE_exception__WARNING", e);
+	          }
+	        }
+	      }
+	    }
+
+	}
+	 private void incrementDelegateProcessCount(MessageContext aMessageContext) {
+		    Endpoint endpoint = aMessageContext.getEndpoint();
+		    if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
+		      try {
+		        String delegateKey = ((AggregateAnalysisEngineController) controller)
+		                .lookUpDelegateKey(endpoint.getEndpoint());
+		        LongNumericStatistic stat = controller.getMonitor().getLongNumericStatistic(
+		                delegateKey, Monitor.ProcessCount);
+		        stat.increment();
+		      } catch (Exception e) {
+		        if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+		            UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(),
+		                    "incrementDelegateProcessCount", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+		                    "UIMAEE_delegate_key_for_endpoint_not_found__INFO", new Object[] { controller.getComponentName(), endpoint.getEndpoint() });
+		        }
+		      }
+		    }
+
+		  }
+	private void executeRemoteRequest(String casReferenceId) throws AsynchAEException {
+
+	}
+    private void cancelTimerAndProcess(MessageContext aMessageContext, String aCasReferenceId,
+           CAS aCAS) throws AsynchAEException {
+//      computeStats(aMessageContext, aCasReferenceId);
+//      ((AggregateAnalysisEngineController) controller).process(aCAS, anInputCasReferenceId,
+//              aNewCasReferenceId, aNewCasProducedBy);
+//
+//      super.invokeProcess(aCAS, aCasReferenceId, null, aMessageContext, null);
+
+    }
+	  protected void computeStats(MessageContext aMessageContext, String aCasReferenceId)
+	          throws AsynchAEException {
+	    if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
+	      long departureTime = controller.getTime(aCasReferenceId,
+	              aMessageContext.getEndpoint().getEndpoint());
+	      long currentTime = System.nanoTime();
+	      long roundTrip = currentTime - departureTime;
+	      long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+	      long totalTimeInComms = currentTime - (departureTime - timeInService);
+
+	      if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+	        UIMAFramework.getLogger(getClass()).logrb(
+	                Level.FINE,
+	                getClass().getName(),
+	                "computeStats",
+	                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                "UIMAEE_show_roundtrip_time__FINE",
+	                new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),
+	                    (double) roundTrip / (double) 1000000 });
+
+	        UIMAFramework.getLogger(getClass()).logrb(
+	                Level.FINE,
+	                getClass().getName(),
+	                "computeStats",
+	                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                "UIMAEE_show_time_spent_in_delegate__FINE",
+	                new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000,
+	                    aMessageContext.getEndpoint() });
+
+	        UIMAFramework.getLogger(getClass()).logrb(
+	                Level.FINE,
+	                getClass().getName(),
+	                "computeStats",
+	                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                "UIMAEE_show_time_spent_in_comms__FINE",
+	                new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000,
+	                    aMessageContext.getEndpoint() });
+	      }
+	    }
+
+	    if (controller instanceof AggregateAnalysisEngineController) {
+	      aggregateDelegateStats(aMessageContext, aCasReferenceId);
+	    }
+	  }
+	  protected synchronized void aggregateDelegateStats(MessageContext aMessageContext,
+	          String aCasReferenceId) throws AsynchAEException {
+	    String delegateKey = "";
+	    try {
+
+	    	
+	        if ( aMessageContext.getEndpoint().getEndpoint() == null || aMessageContext.getEndpoint().getEndpoint().trim().length()==0) {
+	      	  String fromEndpoint = aMessageContext
+	                    .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+	      	  delegateKey = ((AggregateAnalysisEngineController) controller)
+	                    .lookUpDelegateKey(fromEndpoint);
+	        } else {
+	            delegateKey = ((AggregateAnalysisEngineController) controller)
+	                    .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+	        }
+	 //     delegateKey = ((AggregateAnalysisEngineController) getController())
+	   //           .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+	      CacheEntry entry = controller.getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+	      if (entry == null) {
+	        throw new AsynchAEException("CasReferenceId:" + aCasReferenceId
+	                + " Not Found in the Cache.");
+	      }
+	      CacheEntry inputCasEntry = null;
+	      String inputCasReferenceId = entry.getInputCasReferenceId();
+	      ServicePerformance casStats = ((AggregateAnalysisEngineController) controller)
+	              .getCasStatistics(aCasReferenceId);
+	      if (inputCasReferenceId != null
+	              && controller.getInProcessCache().entryExists(inputCasReferenceId)) {
+	        String casProducerKey = entry.getCasProducerKey();
+	        if (casProducerKey != null
+	                && ((AggregateAnalysisEngineController) controller)
+	                        .isDelegateKeyValid(casProducerKey)) {
+	          // Get entry for the input CAS
+	          inputCasEntry = controller.getInProcessCache().getCacheEntryForCAS(
+	                  inputCasReferenceId);
+	        }
+
+	      }
+	      ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
+	              .getServicePerformance(delegateKey);
+
+	      if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+	        long timeToSerializeCAS = ((Long) aMessageContext
+	                .getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+	        if (timeToSerializeCAS > 0) {
+	          if (delegateServicePerformance != null) {
+	            delegateServicePerformance.incrementCasSerializationTime(timeToSerializeCAS);
+	          }
+	        }
+	      }
+	      if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+	        long timeToDeserializeCAS = ((Long) aMessageContext
+	                .getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+	        if (timeToDeserializeCAS > 0) {
+	          if (delegateServicePerformance != null) {
+	            delegateServicePerformance.incrementCasDeserializationTime(timeToDeserializeCAS);
+	          }
+	        }
+	      }
+
+	      if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
+	        long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime))
+	                .longValue();
+	        if (idleTime > 0 && delegateServicePerformance != null) {
+	          Endpoint endp = aMessageContext.getEndpoint();
+	          if (endp != null && endp.isRemote()) {
+	            delegateServicePerformance.incrementIdleTime(idleTime);
+	          }
+	        }
+	      }
+
+	      if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+	        long timeWaitingForCAS = ((Long) aMessageContext
+	                .getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
+	        if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+	          entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+	          delegateServicePerformance.incrementCasPoolWaitTime(timeWaitingForCAS
+	                  - delegateServicePerformance.getRawCasPoolWaitTime());
+	          if (inputCasEntry != null) {
+	            inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+	          }
+	        }
+	      }
+	      if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+	        long timeInProcessCAS = ((Long) aMessageContext
+	                .getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
+	        Endpoint endp = aMessageContext.getEndpoint();
+	        if (endp != null && endp.isRemote()) {
+	          if (delegateServicePerformance != null) {
+	            // calculate the time spent in analysis. The remote service returns total time
+	            // spent in the analysis. Compute the delta.
+	            long dt = timeInProcessCAS - delegateServicePerformance.getRawAnalysisTime();
+	            // increment total time in analysis
+	            delegateServicePerformance.incrementAnalysisTime(dt);
+	            controller.getServicePerformance().incrementAnalysisTime(dt);
+	          }
+	        } else {
+	          controller.getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
+	        }
+	        casStats.incrementAnalysisTime(timeInProcessCAS);
+
+	        if (inputCasReferenceId != null) {
+	          ServicePerformance inputCasStats = ((AggregateAnalysisEngineController) controller)
+	                  .getCasStatistics(inputCasReferenceId);
+	          // Update processing time for this CAS
+	          if (inputCasStats != null) {
+	            inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+	          }
+	        }
+
+	      }
+	    } catch (AsynchAEException e) {
+	      throw e;
+	    } catch (Exception e) {
+	      throw new AsynchAEException(e);
+	    }
+	  }
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.InProcessCache.UndefinedCacheEntry;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.handler.input.ProcessRequestHandler_impl;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.util.Level;
+
+/**
+ * Handles input CAS process request
+ *
+ */
+public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand  {
+	private static final Class<?> CLASS_NAME = ProcessInputCasRequestCommand.class;
+
+	private MessageContext mc;
+	// controls access to Aggregates semaphore which
+	// throttles processing of CASes from a service input queue
+	private Object lock = new Object();
+
+	public ProcessInputCasRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+
+	public void execute() throws Exception {
+
+		int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+		String inputCasReferenceId = super.getCasReferenceId(this.getClass(), mc);
+		if (inputCasReferenceId == null) {
+			// LOG THIS
+			System.out.println(
+					"ProcessInputCasRequestCommand.execute() - CasReferenceId is missing in MessageContext - Ignoring Message");
+			return; // Nothing to do
+		}
+		// Save Process command in the client endpoint.
+		Endpoint clientEndpoint = controller.getClientEndpoint();
+		if (clientEndpoint != null) {
+			clientEndpoint.setCommand(AsynchAEMessage.Process);
+		}
+		System.out.println("Controller:"+controller.getComponentName()+
+				" ProcessInputCasRequestCommand.execute()");
+		// CAS was passed by reference meaning the global InProcessCache must have it. The client
+		// created the entry for it already.
+		if (AsynchAEMessage.CASRefID == payload) {
+			executeDirectRequest(inputCasReferenceId);
+		} else {
+			executeRemoteRequest(inputCasReferenceId);
+		}
+	}
+
+	private void blockIfControllerNotReady(CacheEntry casCacheEntry) throws AsynchAEException {
+		if (!casCacheEntry.isWarmUp()) {
+			controller.getControllerLatch().waitUntilInitialized();
+		}
+
+	}
+
+	private void saveReplyTo()  throws AsynchAEException {
+		// !!!!!!!!!!!!!!!!! HACK !!!!!!!!!!!!!!!!!!!
+		// Save true replyTo endpoint to the service sending the request
+		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+		Object replyTo = mc.getMessageObjectProperty(AsynchAEMessage.ReplyToEndpoint);
+		mc.getEndpoint().setReplyDestination(replyTo);
+
+	}
+	private void saveDelegateKey() throws AsynchAEException{
+		String delegateKey = mc.getMessageStringProperty(AsynchAEMessage.DelegateKey);
+		if ( delegateKey == null ) {
+			delegateKey =  mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+		}
+		mc.getEndpoint().setDelegateKey(delegateKey);
+
+	}
+	private void saveEndpointName() throws AsynchAEException {
+		String endpointName = mc.getMessageStringProperty(AsynchAEMessage.EndpointName);
+		if (endpointName == null ) {
+			endpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+		}
+		mc.getEndpoint().setEndpoint(endpointName);
+
+	}
+	private void addMessageOrigin(CacheEntry inputCasCacheEntry) {
+		if (!controller.isPrimitive()) {
+			   ((AggregateAnalysisEngineController) controller).addMessageOrigin(inputCasCacheEntry.getCasReferenceId(), mc.getEndpoint());
+			}
+
+	}
+	private void executeDirectRequest(String inputCasReferenceId) throws AsynchAEException {
+		try {
+			// this is a request from a colocated client, the InProcessCache
+			// must have a entry for a given casReferenceId
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "executeDirectRequest",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
+						new Object[] { inputCasReferenceId });
+			}
+			CacheEntry inputCasCacheEntry = super.getCacheEntryForCas(inputCasReferenceId);
+			if (inputCasCacheEntry instanceof UndefinedCacheEntry) {
+				// LOG THIS
+				System.out.println("Controller:"+controller.getComponentName()+
+						" ProcessInputCasRequestCommand.execute() - Cache Entry for CasReferenceId:"
+								+ inputCasReferenceId + " Not Found in InProcessCache - Ignoring Message");
+				return; // Nothing to do
+			}
+
+
+			blockIfControllerNotReady(inputCasCacheEntry);
+			long arrivalTime = System.nanoTime();
+			controller.saveTime(arrivalTime, inputCasCacheEntry.getCasReferenceId(), controller.getName());
+			
+			saveReplyTo();
+			saveDelegateKey();
+			saveEndpointName();
+			addMessageOrigin(inputCasCacheEntry);	
+
+			// Create a CasStateEntry in a local cache 
+			CasStateEntry localStateEntry = getCasStateEntry(inputCasCacheEntry.getCasReferenceId());
+			// associate client endpoint with the input CAS. We need to reply to this client
+			localStateEntry.setClientEndpoint(mc.getEndpoint());
+			localStateEntry.setInputCasReferenceId(inputCasCacheEntry.getCasReferenceId());
+			
+
+			if (controller.isStopped()) {
+				return;
+			}
+			// Use empty string as key. Top level component stats are stored under this key.
+			controller.getMonitor().incrementCount("", Monitor.ProcessCount);
+			// *****************************************************************
+			// Process the CAS
+			// *****************************************************************
+			process(inputCasCacheEntry, false);
+
+		} catch (AsynchAEException e) { 
+		    controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+			e.printStackTrace();
+		} catch (Exception e) {
+//			if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+//				UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+//						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+//						controller.getComponentName());
+//
+//				UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+//						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+//			}
+//			if ( !(e instanceof AsynchAEException) ) {
+//				e = new AsynchAEException(e);
+//			}
+			e = new AsynchAEException(e);
+		    controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+
+			e.printStackTrace();
+		} finally {
+			// Increment number of CASes processed by this service
+			controller.getServicePerformance().incrementNumberOfCASesProcessed();
+		} 
+ 
+	}
+
+	private void executeRemoteRequest(String inputCasReferenceId) throws AsynchAEException {
+		
+		CacheEntry inputCasCacheEntry = null;
+
+		try {
+			// The following applies to input CAS from remote clients only.
+			// To prevent processing multiple messages with the same CasReferenceId, check
+			// the CAS cache to see if the message with a given CasReferenceId is already
+			// being processed. If it is, the message contains the same request possibly
+			// issued by the caller due to a timeout. Also this mechanism helps with
+			// dealing with scenario when this service is not up when the client sends
+			// request. The client can keep re-sending the same request until its timeout
+			// thresholds are exceeded. By that time, there may be multiple messages in
+			// this service queue with the same CasReferenceId. When the service finally
+			// comes back up, it will have multiple messages in its queue possibly from
+			// the same client. Only the first message for any given CasReferenceId
+			// should be processed.
+			if (isCasInTheCacheAlready(inputCasReferenceId)) {
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "executeRemoteRequest",
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_duplicate_request__INFO",
+							new Object[] {inputCasReferenceId });
+				}
+
+				System.out.println(
+						"ProcessInputCasRequestCommand.executeRemoteRequest() - Duplicate request recv'd for CasReferenceId:"
+								+ inputCasReferenceId + " - Ignoring Message");
+				return; // Nothing to do
+			}
+			long inTime = System.nanoTime();
+
+			SerializationResult result = super.deserializeInputCAS(mc); 
+			
+			// Time how long we wait on Cas Pool to fetch a new CAS
+			long t1 = controller.getCpuTime();
+
+			inputCasCacheEntry = controller.getInProcessCache().register(result.getCas(), mc, result.getDeserSharedData(),
+					result.getReuseInfo(), inputCasReferenceId, result.getMarker(), result.acceptsDeltaCas());
+			
+			saveStats(inputCasCacheEntry, inTime, t1, result.getTimeWaitingForCAS());
+			
+			boolean waitForCompletion = false;
+
+			// create an entry for the CAS in a local cache
+			CasStateEntry cse = getCasStateEntry(inputCasReferenceId);
+			
+			if (!controller.isPrimitive()) {
+				addCompletionSemaphore(inputCasCacheEntry);
+				if (cse != null && !cse.isSubordinate()) {
+					waitForCompletion = true;
+				}
+			}
+
+			addMessageOrigin(inputCasCacheEntry);	
+
+			if (controller.isStopped() ) {
+				controller.dropCAS(inputCasCacheEntry.getCasReferenceId(), true);
+				return;
+			}
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "executeRemoteRequest",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
+						new Object[] { mc.getEndpoint().getEndpoint() });
+			}
+			process(inputCasCacheEntry, waitForCompletion);
+
+		} catch (Exception e) {
+			super.handleError(e, inputCasCacheEntry, mc);
+		}
+
+	}
+	
+	private void process(CacheEntry entry, boolean waitForCompletion) {
+		controller.getMonitor().incrementCount("", Monitor.ProcessCount);
+
+		// *****************************************************************
+		// Process the CAS
+		// *****************************************************************
+		if (controller.isPrimitive()) {
+			controller.process(entry.getCas(), entry.getCasReferenceId(), mc.getEndpoint());
+		} else {
+			controller.process(entry.getCas(), entry.getCasReferenceId());
+
+			/**
+			 * Below comments apply to UIMA AS aggregate only. CAS has been handed off to a
+			 * delegate. Now block the receiving thread until the CAS is processed or there
+			 * is a timeout or error. Fetch this thread's ThreadLocal semaphore to block the
+			 * thread. It will be unblocked when the aggregate is done with the CAS.
+			 */
+
+			if (waitForCompletion) {
+				waitForCompletionSemaphore(entry);
+			}
+		}
+	}
+
+	private void waitForCompletionSemaphore(CacheEntry entry) {
+		try {
+			synchronized (lock) {
+				while (!controller.isStopped()) {
+					if (entry.getThreadCompletionSemaphore() != null) {
+						boolean gotIt = entry.getThreadCompletionSemaphore().tryAcquire(500, TimeUnit.MILLISECONDS);
+						if (gotIt) {
+							break;
+						}
+					} else {
+						break;
+					}
+
+				}
+			}
+		} catch (InterruptedException ex) {
+		}
+
+	}
+
+
+
+	private void addCompletionSemaphore(CacheEntry entry) {
+
+		synchronized (lock) {
+			// lazily create a Semaphore on the first Process request. This semaphore
+			// will throttle ingestion of CASes from service input queue.
+			if (((AggregateAnalysisEngineController_impl) controller).semaphore == null) {
+				((AggregateAnalysisEngineController_impl) controller).semaphore = new Semaphore(
+						((AggregateAnalysisEngineController) controller).getServiceCasPoolSize() - 1);
+			}
+		}
+		entry.setThreadCompletionSemaphore(((AggregateAnalysisEngineController_impl) controller).semaphore);
+	}
+
+	private boolean isCasInTheCacheAlready(String casReferenceId) {
+		// the InProcessCache lookup either returns UndefindCacheEntry or CacheEntry
+		// instance
+		CacheEntry casCacheEntry = super.getCacheEntryForCas(casReferenceId);
+		if (!(casCacheEntry instanceof UndefinedCacheEntry)) {
+			// LOG THIS
+			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "executeRemoteRequest",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_duplicate_request__INFO",
+						new Object[] { casReferenceId });
+			}
+
+			System.out.println(
+					"ProcessInputCasRequestCommand.executeRemoteRequest() - Duplicate request recv'd for CasReferenceId:"
+							+ casReferenceId + " - Ignoring Message");
+			return true;
+		}
+		return false;
+	}
+}