You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [10/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uim...

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
+import org.apache.uima.util.Level;
+
+public class ProcessInputCasResponseCommand  extends AbstractUimaAsCommand {
+
+	private MessageContext mc;
+
+	public ProcessInputCasResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+
+	public void execute() throws Exception {
+
+		int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+		String casReferenceId = super.getCasReferenceId(this.getClass(), mc);
+		System.out.println(">>>>>>>>>>>>>>> Controller:" + controller.getComponentName()
+				+ " in ProcessInputCasResponseCommand.execute() - Input CAS:" + casReferenceId + " from "
+				+ mc.getMessageStringProperty(AsynchAEMessage.MessageFrom));
+
+		if (casReferenceId == null) {
+			// LOG THIS
+			System.out.println(
+					"ProcessInputCasResponseCommand.execute() - CasReferenceId is missing in MessageContext - Ignoring Message");
+			return; // Nothing to do
+		}
+		// Cas was passed by reference meaning InProcessCache must have it
+		if (AsynchAEMessage.CASRefID == payload) {
+			executeDirectRequest(casReferenceId);
+		} else {
+			executeRemoteRequest(casReferenceId);
+		}
+
+	}
+
+	/**
+	 * Handles process CAS reply from a delegate colocated in the same process
+	 * 
+	 * @param casReferenceId
+	 * @throws AsynchAEException
+	 */
+	private void executeDirectRequest(String casReferenceId) throws AsynchAEException {
+		CacheEntry cacheEntry = null;
+		Delegate delegate = null;
+		try {
+			casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+			// find an entry for a given cas id in a global cache
+			cacheEntry = super.getCacheEntryForCas(casReferenceId);
+			if (cacheEntry == null) {
+				throw new AsynchAEException("CasReferenceId:" + casReferenceId + " Not Found in the Cache.");
+			}
+
+			// find an entry for a given cas id in this aggregate's local cache
+			CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
+			// find delegate which sent the reply
+			delegate = super.getDelegate(mc);
+			if (casStateEntry != null) {
+				casStateEntry.setReplyReceived();
+				casStateEntry.setLastDelegate(delegate);
+			}
+			// each delegate object manages a list of CAS ids that were
+			// dispatched. Every time a CAS reply is received, we remove
+			// its CAS id from the outstanding list.
+			delegate.removeCasFromOutstandingList(casReferenceId);
+
+			if (cacheEntry.getCas() != null) {
+				computeStats(mc, cacheEntry);
+				((AggregateAnalysisEngineController) controller).process(cacheEntry.getCas(), casReferenceId);
+
+			} else {
+				if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "executeDirectRequest",
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_not_in_cache__INFO",
+							new Object[] { controller.getName(), casReferenceId, mc.getEndpoint().getEndpoint() });
+				}
+				throw new AsynchAEException(
+						"CAS with Reference Id:" + casReferenceId + " Not Found in CasManager's CAS Cache");
+			}
+		} catch (Exception e) {
+			super.handleError(e, cacheEntry, mc);
+		} finally {
+			incrementDelegateProcessCount(mc);
+			if (delegate != null) {
+				handleAbortedCasMultiplier(delegate, cacheEntry);
+			}
+		}
+
+	}
+
+	private void blockIfControllerNotReady(CacheEntry casCacheEntry) throws AsynchAEException {
+		if (!casCacheEntry.isWarmUp()) {
+			controller.getControllerLatch().waitUntilInitialized();
+		}
+
+	}
+
+	private void executeRemoteRequest(String casReferenceId) {
+
+	}
+
+	private void handleAbortedCasMultiplier(Delegate delegate, CacheEntry cacheEntry) {
+		try {
+			// Check if the multiplier aborted during processing of this input CAS
+			if (delegate.getEndpoint() != null && delegate.getEndpoint().isCasMultiplier() && cacheEntry.isAborted()) {
+				if (!((AggregateAnalysisEngineController) controller).getInProcessCache().isEmpty()) {
+					((AggregateAnalysisEngineController) controller).getInProcessCache()
+							.registerCallbackWhenCacheEmpty(controller.getEventListener());
+				} else {
+					// Callback to notify that the cache is empty
+
+					// !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF CAS IS ABORTED?
+					// !!!!!!!!!!!!!!!!!!!!!! ?????????????????????????????????
+					// getController().getEventListener().onCacheEmpty();
+				}
+			}
+
+		} catch (Exception e) {
+			if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+				UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+						controller.getComponentName());
+
+				UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+			}
+		}
+	}
+
+	private void incrementDelegateProcessCount(MessageContext aMessageContext) {
+		Endpoint endpoint = aMessageContext.getEndpoint();
+		if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
+			try {
+				String delegateKey = ((AggregateAnalysisEngineController) controller)
+						.lookUpDelegateKey(endpoint.getEndpoint());
+				LongNumericStatistic stat = controller.getMonitor().getLongNumericStatistic(delegateKey,
+						Monitor.ProcessCount);
+				stat.increment();
+			} catch (Exception e) {
+				if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(),
+							"incrementDelegateProcessCount", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+							"UIMAEE_delegate_key_for_endpoint_not_found__INFO",
+							new Object[] { controller.getComponentName(), endpoint.getEndpoint() });
+				}
+			}
+		}
+
+	}
+
+	protected void computeStats(MessageContext aMessageContext, CacheEntry cacheEntry) throws AsynchAEException {
+		if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
+			long departureTime = controller.getTime(cacheEntry.getCasReferenceId(),
+					aMessageContext.getEndpoint().getEndpoint());
+			long currentTime = System.nanoTime();
+			long roundTrip = currentTime - departureTime;
+			long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+			long totalTimeInComms = currentTime - (departureTime - timeInService);
+
+			if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+				UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
+						new Object[] { cacheEntry.getCasReferenceId(), aMessageContext.getEndpoint(),
+								(double) roundTrip / (double) 1000000 });
+
+				UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
+						new Object[] { cacheEntry.getCasReferenceId(), (double) timeInService / (double) 1000000,
+								aMessageContext.getEndpoint() });
+
+				UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
+						new Object[] { cacheEntry.getCasReferenceId(), (double) totalTimeInComms / (double) 1000000,
+								aMessageContext.getEndpoint() });
+			}
+		}
+
+		if (controller instanceof AggregateAnalysisEngineController) {
+			aggregateDelegateStats(aMessageContext, cacheEntry);
+		}
+	}
+
+	protected synchronized void aggregateDelegateStats(MessageContext aMessageContext, CacheEntry cacheEntry)
+			throws AsynchAEException {
+		String delegateKey = "";
+		try {
+
+			if (aMessageContext.getEndpoint().getEndpoint() == null
+					|| aMessageContext.getEndpoint().getEndpoint().trim().length() == 0) {
+				String fromEndpoint = aMessageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+				delegateKey = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(fromEndpoint);
+			} else {
+				delegateKey = ((AggregateAnalysisEngineController) controller)
+						.lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+			}
+			CacheEntry parentCasEntry = null;
+			String parentCasReferenceId = cacheEntry.getInputCasReferenceId();
+			ServicePerformance casStats = ((AggregateAnalysisEngineController) controller)
+					.getCasStatistics(cacheEntry.getCasReferenceId());
+			if (parentCasReferenceId != null && controller.getInProcessCache().entryExists(parentCasReferenceId)) {
+				String casProducerKey = cacheEntry.getCasProducerKey();
+				if (casProducerKey != null
+						&& ((AggregateAnalysisEngineController) controller).isDelegateKeyValid(casProducerKey)) {
+					// Get entry for the parent CAS
+					parentCasEntry = controller.getInProcessCache().getCacheEntryForCAS(parentCasReferenceId);
+				}
+
+			}
+			ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
+					.getServicePerformance(delegateKey);
+
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+				long timeToSerializeCAS = ((Long) aMessageContext
+						.getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+				if (timeToSerializeCAS > 0) {
+					if (delegateServicePerformance != null) {
+						delegateServicePerformance.incrementCasSerializationTime(timeToSerializeCAS);
+					}
+				}
+			}
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+				long timeToDeserializeCAS = ((Long) aMessageContext
+						.getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+				if (timeToDeserializeCAS > 0) {
+					if (delegateServicePerformance != null) {
+						delegateServicePerformance.incrementCasDeserializationTime(timeToDeserializeCAS);
+					}
+				}
+			}
+
+			if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
+				long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
+				if (idleTime > 0 && delegateServicePerformance != null) {
+					Endpoint endp = aMessageContext.getEndpoint();
+					if (endp != null && endp.isRemote()) {
+						delegateServicePerformance.incrementIdleTime(idleTime);
+					}
+				}
+			}
+
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+				long timeWaitingForCAS = ((Long) aMessageContext
+						.getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
+				if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+					cacheEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+					delegateServicePerformance.incrementCasPoolWaitTime(
+							timeWaitingForCAS - delegateServicePerformance.getRawCasPoolWaitTime());
+					if (parentCasEntry != null) {
+						parentCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+					}
+				}
+			}
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+				long timeInProcessCAS = ((Long) aMessageContext
+						.getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
+				Endpoint endp = aMessageContext.getEndpoint();
+				if (endp != null && endp.isRemote()) {
+					if (delegateServicePerformance != null) {
+						// calculate the time spent in analysis. The remote service returns total time
+						// spent in the analysis. Compute the delta.
+						long dt = timeInProcessCAS - delegateServicePerformance.getRawAnalysisTime();
+						// increment total time in analysis
+						delegateServicePerformance.incrementAnalysisTime(dt);
+						controller.getServicePerformance().incrementAnalysisTime(dt);
+					}
+				} else {
+					controller.getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
+				}
+				casStats.incrementAnalysisTime(timeInProcessCAS);
+
+				if (parentCasReferenceId != null) {
+					ServicePerformance inputCasStats = ((AggregateAnalysisEngineController) controller)
+							.getCasStatistics(parentCasReferenceId);
+					// Update processing time for this CAS
+					if (inputCasStats != null) {
+						inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+					}
+				}
+
+			}
+		} catch (AsynchAEException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new AsynchAEException(e);
+		}
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.util.Level;
+
+public class ReleaseCASRequestCommand extends AbstractUimaAsCommand {
+	private MessageContext mc;
+
+	public ReleaseCASRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+
+	public void execute() throws Exception {
+	    String casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+	    if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+	      UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(),
+	              "execute", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	              "UIMAEE_release_cas_req__FINE",
+	              new Object[] { controller.getName(), casReferenceId });
+	    }
+	    System.out.println("???? Controller:"+controller.getComponentName()+" ReleaseCASRequestCommand.execute()- Releasing CAS:"+casReferenceId);
+	    controller.releaseNextCas(casReferenceId);
+
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.util.Level;
+
+public class StopRequestCommand extends AbstractUimaAsCommand {
+	private MessageContext mc;
+
+	public StopRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+		super(controller);
+		this.mc = mc;
+	}
+
+	public void execute() throws Exception {
+
+		try {
+			String casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+			if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+				UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "execute",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_received_stop_request__INFO",
+						new Object[] { controller.getComponentName(), casReferenceId });
+			}
+
+			if (controller.isPrimitive()) {
+				controller.addAbortedCasReferenceId(casReferenceId);
+			} else {
+				CasStateEntry casStateEntry = super.getCasStateEntry(casReferenceId);
+				// Mark the CAS as if it have failed. In this case we dont associate any
+				// exceptions with this CAS so its really not a failure of a CAS or any
+				// of its children. We simply use the same logic here as if the CAS failed.
+				// The Aggregate replyToClient() method will know that this CAS was stopped
+				// as opposed to failed by the fact that the CAS has no exceptions associated
+				// with it. In such case the replyToClient() method returns an input CAS as if
+				// it has been fully processed.
+				casStateEntry.setFailed();
+				((AggregateAnalysisEngineController_impl) controller).stopCasMultipliers();
+			}
+		} catch (Exception e) {
+			if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.WARNING)) {
+				if (controller != null) {
+					UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(), "execute",
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+							controller.getComponentName());
+				}
+
+				UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, getClass().getName(), "execute",
+						UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+			}
+		}
+
+
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+public interface UimaAsCommand {
+	public abstract void execute() throws Exception;
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.delegate;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.uima.aae.error.ErrorHandlerChain;
+
+public class AggregateAnalysisEngineDelegate extends AnalysisEngineDelegate {
+	private ErrorHandlerChain delegateErrorHandlerChain;
+    private List<AnalysisEngineDelegate> delegates = new LinkedList<>();
+    
+    public AggregateAnalysisEngineDelegate(String key) {
+    	super(key);
+    	setPrimitive(false);
+    }
+    public void addDelegate(AnalysisEngineDelegate delegate) {
+    	delegates.add(delegate);
+    }
+    public List<AnalysisEngineDelegate> getDelegates() {
+    	return delegates;
+    }
+	public ErrorHandlerChain getDelegateErrorHandlerChain() {
+		return delegateErrorHandlerChain;
+	}
+
+	public void setDelegateErrorHandlerChain(ErrorHandlerChain delegateErrorHandlerChain) {
+		this.delegateErrorHandlerChain = delegateErrorHandlerChain;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.delegate;
+
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+
+public class AnalysisEngineDelegate {
+	private String key;
+	private boolean remote=false;
+	private int scaleout=1;
+	private int replyScaleout=1;
+	private boolean primitive;
+	private boolean async=false;
+	private AnalysisEngineDescription resourceSpecifier=null;
+	private CasMultiplierNature cm=null;
+	private int getMetaTimeout = 0;
+	private int processTimeout=0;
+	private int cpcTimeout=0;
+	
+	public AnalysisEngineDelegate() {
+		this("");
+	}
+	public AnalysisEngineDelegate(String key) {
+		setKey(key);
+	}
+	public boolean isCasMultiplier() {
+		return cm != null;
+	}
+	public String getKey() {
+		return key;
+	}
+	public void setKey(String key) {
+		this.key = key;
+	}
+	public boolean isRemote() {
+		return remote;
+	}
+	public void setRemote(boolean remote) {
+		this.remote = remote;
+	}
+	public int getScaleout() {
+		return scaleout;
+	}
+	public void setScaleout(int scaleout) {
+		this.scaleout = scaleout;
+	}
+	public boolean isPrimitive() {
+		return primitive;
+	}
+	public void setPrimitive(boolean primitive) {
+		this.primitive = primitive;
+	}
+	public AnalysisEngineDescription getResourceSpecifier() {
+		return resourceSpecifier;
+	}
+	public void setResourceSpecifier(AnalysisEngineDescription resourceSpecifier) {
+		this.resourceSpecifier = resourceSpecifier;
+	}
+	public CasMultiplierNature getCasMultiplier() {
+		return cm;
+	}
+	public void setCm(CasMultiplierNature cm) {
+		this.cm = cm;
+	}
+	public boolean isAsync() {
+		return async;
+	}
+	public void setAsync(boolean async) {
+		this.async = async;
+	}
+	public int getGetMetaTimeout() {
+		return getMetaTimeout;
+	}
+	public void setGetMetaTimeout(int getMetaTimeout) {
+		this.getMetaTimeout = getMetaTimeout;
+	}
+	public int getProcessTimeout() {
+		return processTimeout;
+	}
+	public void setProcessTimeout(int processTimeout) {
+		this.processTimeout = processTimeout;
+	}
+	public int getCpcTimeout() {
+		return cpcTimeout;
+	}
+	public void setCpcTimeout(int cpcTimeout) {
+		this.cpcTimeout = cpcTimeout;
+	}
+	public int getReplyScaleout() {
+		return replyScaleout;
+	}
+	public void setReplyScaleout(int setReplyScaleout) {
+		this.replyScaleout = setReplyScaleout;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.delegate;
+
+public class CasMultiplierNature {
+	int poolSize = 1;
+	int initialHeapSize = 500;
+	boolean processParentLast = false;
+	boolean disableJCasCache = false;
+	
+	public int getPoolSize() {
+		return poolSize;
+	}
+	public void setPoolSize(int poolSize) {
+		this.poolSize = poolSize;
+	}
+	public int getInitialHeapSize() {
+		return initialHeapSize;
+	}
+	public void setInitialHeapSize(int initialHeapSize) {
+		this.initialHeapSize = initialHeapSize;
+	}
+	public boolean isProcessParentLast() {
+		return processParentLast;
+	}
+	public void setProcessParentLast(boolean processParentLast) {
+		this.processParentLast = processParentLast;
+	}
+	public boolean disableJCasCache() {
+		return disableJCasCache;
+	}
+	public void disableJCasCache(boolean disableJCasCache) {
+		this.disableJCasCache = disableJCasCache;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.delegate;
+
+import org.apache.uima.aae.service.builder.AbstractUimaAsServiceBuilder.Serialization;
+
+public class RemoteAnalysisEngineDelegate extends AnalysisEngineDelegate {
+	private String queueName;
+	private String brokerURI;
+	private int prefetch=0;
+	private String serialization = Serialization.XMI.toString();
+	public RemoteAnalysisEngineDelegate( String key ) {
+		super(key);
+		super.setRemote(true);
+	}
+	public String getQueueName() {
+		return queueName;
+	}
+	public void setQueueName(String queueName) {
+		this.queueName = queueName;
+	}
+	public String getBrokerURI() {
+		return brokerURI;
+	}
+	public void setBrokerURI(String brokerURI) {
+		this.brokerURI = brokerURI;
+	}
+	public boolean isCollocated() {
+		return brokerURI.equalsIgnoreCase("java");
+	}
+	public String getSerialization() {
+		return serialization;
+	}
+	public void setSerialization(String serialization) {
+		this.serialization = serialization;
+	}
+	public int getPrefetch() {
+		return prefetch;
+	}
+	public void setPrefetch(int prefetch) {
+		this.prefetch = prefetch;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.jmx.ServiceInfo;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
+import org.apache.uima.aae.service.command.CommandFactory;
+import org.apache.uima.aae.service.command.UimaAsCommand;
+
+public class DirectInputChannel implements InputChannel  {
+	private static final Class<?> CLASS_NAME = DirectInputChannel.class;
+	AnalysisEngineController controller;
+	Handler handler;
+	String endpointName;
+	private ServiceInfo serviceInfo = null;
+	private boolean isStopped;
+	private List<Listener> listeners = new ArrayList<>();
+	private ENDPOINT_TYPE type = ENDPOINT_TYPE.DIRECT;
+	private ChannelType channelType;
+
+	public DirectInputChannel(ChannelType type) {
+		this.channelType = type;
+	}
+
+	public ChannelType getChannelType() {
+		return channelType;
+	}
+
+	public ENDPOINT_TYPE getType() {
+		return type;
+	}
+
+	public DirectInputChannel withController(AnalysisEngineController controller) {
+		this.controller = controller;
+		return this;
+	}
+
+	public void setController(AnalysisEngineController controller) {
+		this.controller = controller;
+	}
+	
+	public void onMessage(DirectMessage message) {
+
+		try {
+			// every message is wrapped in the MessageContext
+			MessageContext mc = new DirectMessageContext(message, "", getController().getComponentName());
+			if (validMessage(mc)) {
+				UimaAsCommand cmd = CommandFactory.newCommand(mc, controller);
+				cmd.execute();
+			}
+		} catch (Exception t) {
+			t.printStackTrace();
+		}
+	}
+
+	public AnalysisEngineController getController() {
+		return controller;
+	}
+
+	/**
+	 * Validate command contained in the header of the JMS Message
+	 * 
+	 * @param aMessage
+	 *            - JMS Message received
+	 * @param properties
+	 *            - Map containing header properties
+	 * @return - true if the command received is a valid one, false otherwise
+	 * @throws Exception
+	 */
+	private boolean validCommand(MessageContext aMessage) throws Exception {
+		if (aMessage.propertyExists(AsynchAEMessage.Command)) {
+			int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+			if (command != AsynchAEMessage.Process && command != AsynchAEMessage.GetMeta
+					&& command != AsynchAEMessage.ReleaseCAS && command != AsynchAEMessage.Stop
+					&& command != AsynchAEMessage.Ping && command != AsynchAEMessage.ServiceInfo
+					&& command != AsynchAEMessage.CollectionProcessComplete) {
+				System.out.println(CLASS_NAME + ".validCommand() - invalid command in message " + command);
+				/*
+				 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+				 * "validCommand", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				 * "UIMAJMS_invalid_command_in_message__INFO", new Object[] { command,
+				 * endpointName }); }
+				 */
+				return false;
+			}
+		} else {
+			/*
+			 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+			 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+			 * "validCommand", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+			 * "UIMAJMS_command_notin_message__INFO", new Object[] { endpointName }); }
+			 */
+			System.out.println(CLASS_NAME + ".validCommand() - command not in message");
+
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * Validates contents of the message. It checks if command, payload and message
+	 * types contain valid data.
+	 * 
+	 * @param aMessage
+	 *            - JMS Message to validate
+	 * @return - true if message is valid, false otherwise
+	 * @throws Exception
+	 */
+	public boolean validMessage(MessageContext aMessage) throws Exception {
+		if (aMessage instanceof DirectMessageContext) {
+			// Map properties = ((ActiveMQMessage) aMessage).getProperties();
+			if (!validMessageType(aMessage)) {
+				int msgType = 0;
+				if (aMessage.propertyExists(AsynchAEMessage.MessageType)) {
+					msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+				}
+				System.out.println(CLASS_NAME + ".validMessage() - invalid message type " + msgType);
+				/*
+				 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+				 * "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				 * "UIMAJMS_invalid_msg_type__INFO", new Object[] {
+				 * getController().getComponentName(), msgType }); }
+				 */
+				return false;
+			}
+			if (!validCommand(aMessage)) {
+				int command = 0;
+				if (aMessage.propertyExists(AsynchAEMessage.Command)) {
+					command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+				}
+				/*
+				 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+				 * "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				 * "UIMAJMS_invalid_cmd_type__INFO", new Object[] {
+				 * getController().getComponentName(), command }); }
+				 */
+				System.out.println(CLASS_NAME + ".validMessage() - invalid command in message " + command);
+
+				return false;
+			}
+			if (!validPayload(aMessage)) {
+				int payload = 0;
+				if (aMessage.propertyExists(AsynchAEMessage.Payload)) {
+					payload = aMessage.getMessageIntProperty(AsynchAEMessage.Payload);
+				}
+				System.out.println(CLASS_NAME + ".validMessage() - invalid payload type " + payload);
+				/*
+				 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+				 * "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				 * "UIMAJMS_invalid_payload_type__INFO", new Object[] {
+				 * getController().getComponentName(), payload }); }
+				 */
+				return false;
+			}
+
+			if (isStaleMessage(aMessage)) {
+				return false;
+			}
+			return true;
+		} else {
+			return false;
+		}
+
+	}
+
+	/**
+	 * Validate message type contained in the JMS header.
+	 * 
+	 * @param aMessage
+	 *            - jms message retrieved from queue
+	 * @param properties
+	 *            - map containing message properties
+	 * @return - true if message Type is valid, false otherwise
+	 * @throws Exception
+	 */
+
+	private boolean validMessageType(MessageContext aMessage) throws Exception {
+		if (aMessage.propertyExists(AsynchAEMessage.MessageType)) {
+			int msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+			if (msgType != AsynchAEMessage.Response && msgType != AsynchAEMessage.Request) {
+				System.out.println(CLASS_NAME + ".validMessageType() - invalid message type " + msgType);
+				/*
+				 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+				 * "validMessageType", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				 * "UIMAJMS_invalid_msgtype_in_message__INFO", new Object[] { msgType,
+				 * endpointName }); }
+				 */
+				return false;
+
+			}
+		} else {
+			System.out.println(CLASS_NAME + ".validMessageType() - message type not in message");
+			/*
+			 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+			 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+			 * "validMessageType", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+			 * "UIMAJMS_msgtype_notin_message__INFO", new Object[] { endpointName }); }
+			 */
+			return false;
+		}
+
+		return true;
+	}
+
+	private boolean isProcessRequest(MessageContext aMessage) throws Exception {
+		if (aMessage.propertyExists(AsynchAEMessage.MessageType) && aMessage.propertyExists(AsynchAEMessage.Command)) {
+			int msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+			int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+
+			if (msgType != AsynchAEMessage.Request || command != AsynchAEMessage.Process) {
+				return false;
+			}
+			return true;
+		}
+		return false;
+	}
+
+	private boolean isRemoteRequest(MessageContext aMessage) throws Exception {
+
+		// Dont do checkpoints if a message was sent from a Cas Multiplier
+		if (aMessage.propertyExists(AsynchAEMessage.CasSequence)) {
+			return false;
+		}
+
+		if (aMessage.propertyExists(AsynchAEMessage.MessageType) && aMessage.propertyExists(AsynchAEMessage.Command)
+				&& aMessage.propertyExists(UIMAMessage.ServerURI)) {
+			int msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+			int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+			boolean isRemote = aMessage.getMessageStringProperty(UIMAMessage.ServerURI).startsWith("vm") == false;
+			if (isRemote && msgType == AsynchAEMessage.Request
+					&& (command == AsynchAEMessage.Process || command == AsynchAEMessage.CollectionProcessComplete)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	private boolean acceptsDeltaCas(MessageContext aMessage) throws Exception {
+		boolean acceptsDeltaCas = false;
+		if (aMessage.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
+			acceptsDeltaCas = aMessage.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
+		}
+		return acceptsDeltaCas;
+	}
+
+	public void abort() {
+	}
+
+	private String decodeIntToString(String aTypeToDecode, int aValueToDecode) {
+		if (AsynchAEMessage.MessageType.equals(aTypeToDecode)) {
+			switch (aValueToDecode) {
+			case AsynchAEMessage.Request:
+				return "Request";
+			case AsynchAEMessage.Response:
+				return "Response";
+			}
+		} else if (AsynchAEMessage.Command.equals(aTypeToDecode)) {
+			switch (aValueToDecode) {
+			case AsynchAEMessage.Process:
+				return "Process";
+			case AsynchAEMessage.GetMeta:
+				return "GetMetadata";
+			case AsynchAEMessage.CollectionProcessComplete:
+				return "CollectionProcessComplete";
+			case AsynchAEMessage.ReleaseCAS:
+				return "ReleaseCAS";
+			case AsynchAEMessage.Stop:
+				return "Stop";
+			case AsynchAEMessage.Ping:
+				return "Ping";
+			case AsynchAEMessage.ServiceInfo:
+				return "ServiceInfo";
+			}
+
+		} else if (AsynchAEMessage.Payload.equals(aTypeToDecode)) {
+			switch (aValueToDecode) {
+			case AsynchAEMessage.XMIPayload:
+				return "XMIPayload";
+			case AsynchAEMessage.BinaryPayload:
+				return "BinaryPayload";
+			case AsynchAEMessage.CASRefID:
+				return "CASRefID";
+			case AsynchAEMessage.Metadata:
+				return "Metadata";
+			case AsynchAEMessage.Exception:
+				return "Exception";
+			// 5/2013 xcas not used
+			// case AsynchAEMessage.XCASPayload:
+			// return "XCASPayload";
+			case AsynchAEMessage.None:
+				return "None";
+			}
+		}
+		return "UNKNOWN";
+	}
+
+	private boolean isStaleMessage(MessageContext aMessage) throws AsynchAEException {
+		int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+		int msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+		if (isStopped() || getController() == null || getController().getInProcessCache() == null) {
+			// Shutting down
+			return true;
+		}
+		if (command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response) {
+			String casReferenceId = aMessage.getMessageStringProperty(AsynchAEMessage.CasReference);
+			if (!getController().getInProcessCache().entryExists(casReferenceId)) {
+
+				System.out.println(CLASS_NAME + ".isStaleMessage() - stale message rec'd - CasRefId:" + casReferenceId);
+				/*
+				 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				 * UIMAFramework.getLogger(CLASS_NAME).logrb( Level.FINE, CLASS_NAME.getName(),
+				 * "isStaleMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				 * "UIMAJMS_stale_message__FINE", new Object[] { endpointName, casReferenceId,
+				 * aMessage.getMessageStringProperty(AsynchAEMessage.MessageFrom) });
+				 * 
+				 * }
+				 */
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * Validates payload in the JMS Message.
+	 * 
+	 * @param aMessage
+	 *            - JMS Message received
+	 * @param properties
+	 *            - Map containing header properties
+	 * @return - true if the payload is valid, false otherwise
+	 * @throws Exception
+	 */
+	private boolean validPayload(MessageContext aMessage) throws Exception {
+		if (aMessage.propertyExists(AsynchAEMessage.Command)) {
+			int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+			if (command == AsynchAEMessage.GetMeta || command == AsynchAEMessage.CollectionProcessComplete
+					|| command == AsynchAEMessage.Stop || command == AsynchAEMessage.Ping
+					|| command == AsynchAEMessage.ServiceInfo || command == AsynchAEMessage.ReleaseCAS) {
+				// Payload not included in GetMeta Request
+				return true;
+			}
+		}
+
+		if (aMessage.propertyExists(AsynchAEMessage.Payload)) {
+			int payload = aMessage.getMessageIntProperty(AsynchAEMessage.Payload);
+			if (payload != AsynchAEMessage.XMIPayload && payload != AsynchAEMessage.BinaryPayload
+					&& payload != AsynchAEMessage.CASRefID && payload != AsynchAEMessage.Exception
+					&& payload != AsynchAEMessage.Metadata) {
+				/*
+				 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+				 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+				 * "validPayload", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+				 * "UIMAJMS_invalid_payload_in_message__INFO", new Object[] { payload,
+				 * endpointName }); }
+				 */
+				System.out.println(CLASS_NAME + ".validPayload() - Invalid Payload in message:" + payload);
+				return false;
+			}
+		} else {
+			/*
+			 * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+			 * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+			 * "validPayload", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+			 * "UIMAJMS_payload_notin_message__INFO", new Object[] { endpointName }); }
+			 */
+			System.out.println(CLASS_NAME + ".validPayload() - Payload not in message");
+			return false;
+		}
+
+		return true;
+	}
+
+	public void setMessageHandler(Handler aHandler) {
+		handler = aHandler;
+		// msgHandlerLatch.countDown();
+	}
+
+	public List<Listener> registerListener(Listener listener) {
+		listeners.add(listener);
+		return listeners;
+	}
+
+	public void setListener(DirectListener listener) {
+		listeners.add(listener);
+	}
+
+	public void setListeners(List<DirectListener> list) {
+		System.out.println("... DirectInputChannel adding listeners - howMany:" + list.size());
+		listeners.addAll(list);
+
+	}
+
+	public List<Listener> getListeners() {
+		List<Listener> ll = new ArrayList<>();
+		for (Listener l : listeners) {
+			ll.add(l);
+		}
+		return ll;
+	}
+
+	@Override
+	public void stop(boolean shutdownNow) throws Exception {
+		isStopped = true;
+		for (Listener l : listeners) {
+			l.stop();
+		}
+	}
+
+	@Override
+	public void stop(int channelsToStop, boolean shutdownNow) throws Exception {
+		// TODO Auto-generated method stub
+		for (Listener l : listeners) {
+			l.stop();
+		}
+	}
+
+	@Override
+	public String getName() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public int getSessionAckMode() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	@Override
+	public void ackMessage(MessageContext aMessageContext) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public String getServerUri() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void setServerUri(String aServerUri) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public String getInputQueueName() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public ServiceInfo getServiceInfo() {
+		if (serviceInfo == null) {
+			serviceInfo = new ServiceInfo(false, controller);
+			serviceInfo.setBrokerURL("Direct");
+			serviceInfo.setInputQueueName(getName());
+			if (controller == null) {
+				serviceInfo.setState(ServiceState.INITIALIZING.name());
+			} else {
+				if (controller.isCasMultiplier()) {
+					serviceInfo.setCASMultiplier();
+				}
+			}
+		}
+		return serviceInfo;
+	}
+
+	@Override
+	public boolean isStopped() {
+		return isStopped;
+	}
+
+	public int getConcurrentConsumerCount() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	@Override
+	public void destroyListener(String anEndpointName, String aDelegateKey) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public boolean isFailed(String aDelegateKey) {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public boolean isListenerForDestination(String anEndpointName) {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public void removeDelegateFromFailedList(String aDelegateKey) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void setTerminating() {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void terminate() {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void disconnectListenersFromQueue() throws Exception {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void disconnectListenerFromQueue(Listener listener) throws Exception {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void setEndpointName(String name) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void createListenerForTargetedMessages() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.UimaAsThreadFactory;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.spi.transport.vm.UimaVmQueue;
+import org.springframework.core.task.TaskExecutor;
+
+public class DirectListener implements Listener, JavaQueueListener {
+	private BlockingQueue<DirectMessage> inQueue;
+	private DirectInputChannel ic;
+	private int consumerCount = 1;
+	private ExecutorService executor;
+	private AnalysisEngineController controller;
+	private int scaleout;
+	private Thread msgHandlerThread;  
+	private BlockingQueue<Runnable> workQueue = null;
+	private CountDownLatch latchToCountNumberOfInitedThreads;
+	private CountDownLatch latchToCountNumberOfTerminatedThreads;
+	private Type type;
+	private Transport transport = Transport.Java;
+	//private boolean hasTaskExecutor = false;
+	private String name=null;
+	private  volatile boolean started = false;
+	public DirectListener(Type t) {
+		this.type = t;
+		System.out.println("DirectListener c'tor - type:"+type.name());
+	}
+	
+	public DirectListener(AnalysisEngineController ctlr, Type t, int scaleout) {
+		this.controller = ctlr;
+		this.scaleout = scaleout;
+		this.type = t;
+		System.out.println("DirectListener c'tor - type:"+type.name());
+	}
+	public Transport getTransport() {
+	  return transport;
+	}
+	public BlockingQueue<DirectMessage> getEndpoint() {
+		return inQueue;
+	}
+	public Type getType() {
+		return type;
+	}
+	public void setController(AnalysisEngineController ctlr) {
+		this.controller = ctlr;
+	}
+	public DirectListener withController(AnalysisEngineController ctlr) {
+		this.controller = ctlr;
+		return this;
+	}
+	public void setConsumerThreads(int howMany) {
+		this.scaleout = howMany;
+		latchToCountNumberOfInitedThreads = new CountDownLatch(howMany);
+		latchToCountNumberOfTerminatedThreads = new CountDownLatch(howMany);
+	}
+	public DirectListener withConsumerThreads(int howMany) {
+		this.scaleout = howMany;
+		latchToCountNumberOfInitedThreads = new CountDownLatch(howMany);
+		latchToCountNumberOfTerminatedThreads = new CountDownLatch(howMany);
+		
+		return this;
+	}
+	public DirectListener withName(String name) {
+		this.name = name;
+		return this;
+	}
+	public void setQueue(BlockingQueue<DirectMessage> inQueue) throws Exception {
+		this.inQueue = inQueue;
+	}
+	public DirectListener withQueue(BlockingQueue<DirectMessage> inQueue) throws Exception {
+		this.inQueue = inQueue;
+		return this;
+	}
+	public void setConsumers(int howMany) {
+		consumerCount = howMany;
+	}
+
+	public DirectListener withInputChannel(DirectInputChannel ic) {
+		this.ic = ic;
+		return this;
+	}
+	public void setInputChannel(DirectInputChannel ic) {
+		this.ic = ic;
+	}
+
+	public DirectListener initialize() throws Exception {
+		if (Type.ProcessCAS.equals(type)) {
+			workQueue = new UimaVmQueue();
+			if ( controller instanceof PrimitiveAnalysisEngineController ) {
+				ThreadGroup threadGroup = new ThreadGroup("VmThreadGroup" + 1 + "_" + controller.getComponentName());
+				executor = new ThreadPoolExecutor(scaleout, scaleout, Long.MAX_VALUE, TimeUnit.DAYS, workQueue);
+				UimaAsThreadFactory tf = null;
+				tf = new UimaAsThreadFactory().
+						withThreadGroup(threadGroup).
+						withPrimitiveController((PrimitiveAnalysisEngineController)controller).
+						withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
+						withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
+				tf.setDaemon(true);
+				((ThreadPoolExecutor)executor).setThreadFactory(tf);
+				((ThreadPoolExecutor)executor).prestartAllCoreThreads();
+				latchToCountNumberOfInitedThreads.await();
+				System.out.println("Executor Started - All Process Threads Initialized");
+			} else {
+				 executor = Executors.newFixedThreadPool(consumerCount);
+			}
+		} else {
+			 executor = Executors.newFixedThreadPool(consumerCount);
+		}
+		return this;
+	}
+
+	private boolean stopConsumingMessages(DirectMessage message ) {
+		return message.getAsInt(AsynchAEMessage.Command) == AsynchAEMessage.PoisonPill;
+		
+	}
+	public synchronized void start() {
+		if ( started ) {
+			return;
+		}
+		System.out.println(">>> "+controller.getComponentName()+" DirectListener.start()");
+		msgHandlerThread = new Thread() {
+			public void run() {
+				started = true;
+				boolean stop = false;
+				
+				while( !controller.isStopped() && !stop) {
+					try {
+						
+						final DirectMessage message = inQueue.take(); //blocks if empty
+			            if ( stopConsumingMessages(message)) {
+						    System.out.println(">>> "+controller.getComponentName()+" Got END message - Stopping Queue Consumer");
+							stop = true;
+						} else {
+							executor.submit(new Runnable() {
+						        public void run() {
+						        	
+						            try {
+						            	System.out.println(">>> "+controller.getComponentName()+" Got new message - processing on thread "+Thread.currentThread().getName()+" channel:"+getType());
+										ic.onMessage(message);
+						            } catch( Exception e) {
+						            	e.printStackTrace();
+						            }
+						          }
+						        });
+						}
+
+					} catch( InterruptedException e) {
+						Thread.currentThread().interrupt();
+						
+						System.out.println(getType()+ " Listener Thread Interrupted - Stop Listening");
+						stop = true;
+				    } catch (Exception e) {
+						e.printStackTrace();
+						stop = true;
+					} 
+				}
+			}
+		};
+		msgHandlerThread.start();
+	}
+	public void stop() {
+		msgHandlerThread.interrupt();
+		
+		DirectMessage message = new DirectMessage().withCommand(AsynchAEMessage.PoisonPill);
+		try {
+			inQueue.put(message);
+		} catch( InterruptedException e) {
+			Thread.currentThread().interrupt();
+		}
+		
+		if ( executor != null ) {
+			executor.shutdown();
+			
+		}
+	}
+
+	public TaskExecutor getTaskExecutor() {
+		return null;
+	}
+
+	public String getName() {
+		if ( name != null ) {
+			return name;
+		}
+		return controller.getKey();
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
+
+public class DirectMessage {
+	
+	private static final long serialVersionUID = 1L;
+	private Map<String, Object> stateMap = 
+			 new HashMap<>();
+
+	private void store(String key, Object value) {
+		stateMap.put(key, value);
+	}
+	public DirectMessage withCasReferenceId(String casReferenceId) {
+		store(AsynchAEMessage.CasReference, casReferenceId);
+		return this;
+	}
+	public DirectMessage withEndpointName(String endpointName) {
+		store(AsynchAEMessage.EndpointName, endpointName);
+		return this;
+	}
+	
+	public DirectMessage withParentCasReferenceId(String parentCasReferenceId) {
+		store(AsynchAEMessage.InputCasReference, parentCasReferenceId);
+		return this;
+	}
+	public DirectMessage withSequenceNumber(long seqNo) {
+		store(AsynchAEMessage.CasSequence, seqNo);
+		return this;
+	}
+	public DirectMessage withStat(String statKey, long stat) {
+		store(statKey, stat);
+		return this;
+	}
+	public DirectMessage withError(Throwable t) {
+		store(AsynchAEMessage.ErrorCause, t);
+		return this;
+	}
+	public DirectMessage withFreeCASQueue(BlockingQueue<DirectMessage> freeCASQueue) {
+		store(AsynchAEMessage.FreeCASQueue, freeCASQueue);
+		return this;
+
+	}
+	public DirectMessage withCommand(int command) {
+		store(AsynchAEMessage.Command, command);
+		if ( command == AsynchAEMessage.Process ) {
+			if (System.getProperty("UimaAsCasTracking") != null) {
+				store("UimaAsCasTracking", "enable");
+			}
+		}
+
+		return this;
+	}
+	public DirectMessage withMessageType(int messageType) {
+		store(AsynchAEMessage.MessageType, messageType);
+		return this;
+	}
+	public DirectMessage withOrigin(String origin) {
+		store(AsynchAEMessage.MessageFrom, origin);
+		return this;
+	}
+	public DirectMessage withPayload(int payloadType) {
+		store(AsynchAEMessage.Payload, payloadType);
+		return this;
+	}
+
+	public DirectMessage withReplyQueue(BlockingQueue<DirectMessage> replyQueue ) {
+		store(AsynchAEMessage.ReplyToEndpoint, replyQueue);
+		return this;
+	}
+	public DirectMessage withMetadata(ProcessingResourceMetaData aProcessingResourceMetadata) {
+		store(AsynchAEMessage.AEMetadata, aProcessingResourceMetadata);
+		return this;
+	}
+	public DirectMessage withSerializationType(int serializationType) {
+		store(AsynchAEMessage.SERIALIZATION, serializationType);
+		return this;
+	}
+	public DirectMessage withReplyDestination(Object replyDestination) {
+		store(AsynchAEMessage.ReplyToEndpoint, replyDestination);
+		return this;
+	}
+	public DirectMessage withDelegateKey(Object delegateKey) {
+		store(AsynchAEMessage.DelegateKey, delegateKey);
+		return this;
+	}
+	public String getAsString(String key) {
+		return (String)stateMap.get(key);
+	}
+	public int getAsInt(String key) {
+		return (Integer)stateMap.get(key);
+	}
+	public BlockingQueue<DirectMessage> getFreeQueue(String key) {
+		return (BlockingQueue<DirectMessage>)stateMap.get(key);
+	}
+	public Object get(String key) {
+		return stateMap.get(key);
+	}
+	public boolean propertyExists(String key) {
+		return stateMap.containsKey(key);
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.cas.SerialFormat;
+
+public class DirectMessageContext implements MessageContext {
+	private Endpoint endpoint;
+
+	private long messageArrivalTime = 0L;
+
+	private String endpointName;
+
+	DirectMessage message;
+	
+	public DirectMessageContext() {
+	}
+
+	public DirectMessageContext(DirectMessage message, String anEndpointName, String controllerName ) {
+		this();
+		this.message = message;
+		endpoint =  new Endpoint_impl();
+		endpointName = anEndpointName;
+		endpoint.setSerialFormat(SerialFormat.UNKNOWN);
+		endpoint.setServerURI("java");
+		endpoint.setEndpoint(anEndpointName);
+		endpoint.setReplyDestination(message.get(AsynchAEMessage.ReplyToEndpoint));
+		endpoint.setDelegateKey(message.getAsString(AsynchAEMessage.DelegateKey));
+		StringBuilder sb = new StringBuilder();
+		if ( controllerName != null && !controllerName.trim().isEmpty()) {
+			sb.append("Service:"+controllerName+" ");
+			sb.append("Delegate Key:"+endpoint.getDelegateKey()+"\t");
+
+		}
+		sb.append(this.getClass().getSimpleName()).append("\n").append(" - message from:").
+		append(message.get(AsynchAEMessage.MessageFrom)).append("\n").
+		append(" - ServerURI:").append("java").append("\n");
+		if ( message.propertyExists(AsynchAEMessage.CasReference) ) {
+			sb.append(" - CasReferenceId:").
+			append(message.getAsString(AsynchAEMessage.CasReference))
+			.append("\n");
+		}
+		
+		System.out.println(sb.toString());
+	}
+
+	public String getMessageStringProperty(String aMessagePropertyName) throws AsynchAEException {
+		return (String)message.get(aMessagePropertyName);
+	}
+
+	public int getMessageIntProperty(String aMessagePropertyName) throws AsynchAEException {
+		return (Integer)message.get(aMessagePropertyName);
+	}
+
+	public long getMessageLongProperty(String aMessagePropertyName) throws AsynchAEException {
+		return (Long)message.get(aMessagePropertyName);
+	}
+
+	public Object getMessageObjectProperty(String aMessagePropertyName) throws AsynchAEException {
+		return message.get(aMessagePropertyName);
+	}
+	
+	public boolean getMessageBooleanProperty(String aMessagePropertyName) throws AsynchAEException {
+		return (Boolean)message.get(aMessagePropertyName);
+	}
+
+	public Endpoint getEndpoint() {
+		return endpoint;
+	}
+
+	private Object getMessageCargo() {
+		return message.get("Cargo");
+	}
+	public String getStringMessage() throws AsynchAEException {
+		return (String)getMessageCargo();
+	}
+
+	public Object getObjectMessage() throws AsynchAEException {
+		return getMessageCargo();
+	}
+
+	public byte[] getByteMessage() throws AsynchAEException {
+		return null;   // this class returns XMI only
+	}
+
+	public Object getRawMessage() {
+		return message;
+	}
+
+	public boolean propertyExists(String aKey) throws AsynchAEException {
+		return message.propertyExists(aKey);
+	}
+
+	public void setMessageArrivalTime(long anArrivalTime) {
+		messageArrivalTime = anArrivalTime;
+	}
+
+	public long getMessageArrivalTime() {
+		return messageArrivalTime;
+	}
+
+	public String getEndpointName() {
+		return endpointName;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.error.UimaEEServiceException;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
+
+public class DirectOutputChannel implements OutputChannel {
+	private AnalysisEngineController controller;
+	private volatile boolean aborting = false;
+	private BlockingQueue<DirectMessage> freeCASQueue;
+
+	public DirectOutputChannel withController(AnalysisEngineController controller) {
+		this.controller = controller;
+		return this;
+	}
+
+	public void setFreeCASQueue(BlockingQueue<DirectMessage> queue) {
+		freeCASQueue = queue;
+	}
+
+	public ENDPOINT_TYPE getType() {
+		return ENDPOINT_TYPE.DIRECT;
+	}
+
+	@Override
+	public void stop(boolean shutdownNow) throws Exception {
+		aborting = true;
+	}
+
+	@Override
+	public void stop(int channelsToStop, boolean shutdownNow) throws Exception {
+
+	}
+
+	@Override
+	public String getName() {
+		return null;
+	}
+
+	@Override
+	public void setController(AnalysisEngineController aController) {
+		withController(aController);
+	}
+
+	@Override
+	public void initialize() throws AsynchAEException {
+
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
+
+		BlockingQueue<DirectMessage> requestQueue = null;
+
+		String origin = controller.getServiceEndpointName();
+		if ( !controller.isTopLevelComponent()) {
+			origin = controller.getKey();
+		}
+		if ( origin == null ) {
+			origin = controller.getComponentName();
+		}
+		DirectMessage dm = new DirectMessage()
+				.withMessageType(AsynchAEMessage.Request)
+				.withCommand(aCommand)
+				.withDelegateKey(anEndpoint.getDelegateKey())
+				.withOrigin(origin);
+/*				
+				dm.put(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+		dm.put(AsynchAEMessage.Command, aCommand);
+		dm.put(AsynchAEMessage.MessageFrom, controller.getServiceEndpointName());
+	*/
+	//	InputChannel ic = controller.getInputChannel();
+	//	List<Listener> listeners = ic.getListeners();
+
+		switch (aCommand) {
+		case AsynchAEMessage.CollectionProcessComplete:
+			requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getDestination();
+			dm.withReplyDestination(anEndpoint.getReplyDestination());
+			break;
+		case AsynchAEMessage.ReleaseCAS:
+			System.out.println(">>>>>>>>>>>>>>>>>>> DirectOutputChannel.sendRequest() - Sending ReleaseCAS Request");
+			dm.withCasReferenceId( aCasReferenceId);
+			requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getDestination();
+			break;
+		case AsynchAEMessage.GetMeta:
+			// every delegate has an endpoint containing a replyTo queue where
+			// a listener waiting for reply msgs.
+			dm.withReplyDestination( anEndpoint.getReplyDestination());
+
+			requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getMetaDestination();
+			System.out.println(">>>>>>>>>>>>>>>>>>> Service:" + controller.getComponentName()
+			+ " DirectOutputChannel.sendRequest() - Sending GetMeta Request to delegate:"+anEndpoint.getDelegateKey()+" target queue.hashcode():"+requestQueue.hashCode());
+			// delegate = startGetMetaTimerAndGetDelegate(anEndpoint);
+//			dm.withDelegateKey(anEndpoint.getDelegateKey());
+			break;
+		case AsynchAEMessage.Stop:
+			requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getDestination();
+			dm.withCasReferenceId(aCasReferenceId);
+			break;
+
+		case AsynchAEMessage.Process:
+			// Delegate must reply to a given endpoint
+			dm.withReplyDestination(anEndpoint.getReplyDestination());
+			// passing CAS by reference
+			dm.withPayload(AsynchAEMessage.CASRefID);
+			// id of a CAS which is needed to find it in a global cache
+			dm.withCasReferenceId(aCasReferenceId);
+			dm.withEndpointName(anEndpoint.getEndpoint());
+			requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getDestination();
+			break;
+			
+			default:
+				dm.withPayload(AsynchAEMessage.None);
+
+		};
+
+		requestQueue.add(dm);
+	}
+
+	@Override
+	public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId, boolean notifyOnJmsException)
+			throws AsynchAEException {
+		System.out.println(".... Controller:" + controller.getComponentName()
+				+ " DirectOutputChannel.sendReply() - CAS Id:" + aCasReferenceId+" Origin:"+controller.getServiceEndpointName());
+		@SuppressWarnings("unchecked")
+		BlockingQueue<DirectMessage> replyQueue =
+				(BlockingQueue<DirectMessage>) anEndpoint.getReplyDestination();
+		DirectMessage replyMessage = new DirectMessage().withCommand(aCommand).withMessageType(AsynchAEMessage.Response)
+				.withOrigin(getOrigin()).withPayload(AsynchAEMessage.None);
+
+		replyQueue.add(replyMessage);
+
+	}
+
+	
+//	public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
+	public void sendReply(CasStateEntry casStateEntry, Endpoint anEndpoint) throws AsynchAEException {
+		@SuppressWarnings("unchecked")
+		BlockingQueue<DirectMessage> replyQueue = (BlockingQueue<DirectMessage>) anEndpoint.getReplyDestination();
+
+		DirectMessage message;
+
+		
+//		casStateEntry = controller.getLocalCache().lookupEntry(entry.getCasReferenceId());
+
+		ServicePerformance casStats = 
+				controller.getCasStatistics(casStateEntry.getCasReferenceId());
+		
+		if (controller.isCasMultiplier() && casStateEntry.isSubordinate()) {
+			String casAncestor = casStateEntry.getParentCasReferenceId();
+			if ( controller.isTopLevelComponent() ) {
+				casAncestor = casStateEntry.getInputCasReferenceId();
+			}
+			// This CM will send a child CAS to the Client for processing. The 
+			message = new DirectMessage().withCommand(AsynchAEMessage.Process)
+					.withMessageType(AsynchAEMessage.Request).withOrigin(getOrigin())
+					.withPayload(AsynchAEMessage.CASRefID).withCasReferenceId(casStateEntry.getCasReferenceId())
+					.withParentCasReferenceId(casAncestor).withSequenceNumber(casStateEntry.getSequenceNumber())
+					.withFreeCASQueue(freeCASQueue);
+			System.out.println("..... Service:" + controller.getComponentName()
+					+ " Sending Child CAS and FreeCAS Queue:" + freeCASQueue.hashCode()+" For CAS:"+casStateEntry.getCasReferenceId());
+
+		} else {
+			message = new DirectMessage().withCommand(AsynchAEMessage.Process)
+					.withMessageType(AsynchAEMessage.Response).withOrigin(getOrigin())//controller.getServiceEndpointName())
+					.withPayload(AsynchAEMessage.CASRefID)
+					.withCasReferenceId(casStateEntry.getCasReferenceId());
+		}
+		// add timing stats associated with the CAS processing
+		message.withStat(AsynchAEMessage.TimeToSerializeCAS, casStats.getRawCasSerializationTime())
+			.withStat(AsynchAEMessage.TimeToDeserializeCAS, casStats.getRawCasDeserializationTime())
+			.withStat(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime())
+			.withStat(AsynchAEMessage.IdleTime,
+				controller.getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process));
+
+		replyQueue.add(message);
+
+	}
+
+	private String getTopParentCasReferenceId(String casReferenceId) {
+		if (!controller.getLocalCache().containsKey(casReferenceId)) {
+			return null;
+		}
+		CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
+
+		if (casStateEntry.isSubordinate()) {
+			// Recurse until the top CAS reference Id is found
+			return getTopParentCasReferenceId(casStateEntry.getParentCasReferenceId());
+		}
+		// Return the top ancestor CAS id
+		return casStateEntry.getCasReferenceId();
+	}
+
+	@Override
+	public void sendReply(ProcessingResourceMetaData aProcessingResourceMetadata, Endpoint anEndpoint,
+			boolean serialize) throws AsynchAEException {
+		System.out.println("Service:" + controller.getName()
+				+ " DirectOutputChannel.sendReply() - sending GetMeta Reply - MsgFrom:"+anEndpoint.getDelegateKey());//controller.getServiceEndpointName()+" Delegate Key:"+anEndpoint.getDelegateKey());//controller.getKey());
+		String msgFrom = controller.getName(); //controller.getServiceEndpointName();
+//		if ( !controller.isTopLevelComponent()) {
+//			msgFrom = controller.getKey();
+//		}
+//		if ( msgFrom == null ) {
+//			msgFrom = anEndpoint.getDelegateKey();
+//		}
+		DirectMessage getMetaReplyMessage = new DirectMessage().withCommand(AsynchAEMessage.GetMeta)
+				.withMessageType(AsynchAEMessage.Response).withOrigin(msgFrom)
+				.withPayload(AsynchAEMessage.Metadata);
+
+		// BlockingQueue<DirectMessage> replyQueue =
+		// (BlockingQueue<DirectMessage>)anEndpoint.getDestination();
+		@SuppressWarnings("unchecked")
+		BlockingQueue<DirectMessage> replyQueue = (BlockingQueue<DirectMessage>) anEndpoint.getReplyDestination();
+
+		// DirectMessage dm = new DirectMessage();
+		// dm.put(AsynchAEMessage.MessageType,AsynchAEMessage.Response);
+		// dm.put(AsynchAEMessage.Command,AsynchAEMessage.GetMeta);
+		// dm.put(AsynchAEMessage.Payload, AsynchAEMessage.Metadata);
+		// dm.put(AsynchAEMessage.MessageFrom, controller.getServiceId());
+		getMetaReplyMessage.withSerializationType(AsynchAEMessage.None);
+		getMetaReplyMessage.withMetadata(aProcessingResourceMetadata);
+		replyQueue.add(getMetaReplyMessage);
+	}
+
+	private UimaEEServiceException wrapErrorInUimaEEServiceException(Throwable t) {
+		if (!(t instanceof UimaEEServiceException)) {
+			UimaEEServiceException wrapper;
+			// Strip off AsyncAEException and replace with UimaEEServiceException
+			if (t instanceof AsynchAEException && t.getCause() != null) {
+				wrapper = new UimaEEServiceException(t.getCause());
+			} else {
+				wrapper = new UimaEEServiceException(t);
+			}
+			return wrapper;
+		} else {
+			return (UimaEEServiceException) t;
+		}
+	}
+
+	private String getOrigin() {
+		String origin = controller.getServiceEndpointName();
+
+		if ( !controller.isTopLevelComponent()) {
+			origin = controller.getKey();
+		}
+		return origin;
+	}
+	public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId, Endpoint anEndpoint,
+			int aCommand) throws AsynchAEException {
+		anEndpoint.setReplyEndpoint(true);
+		// the client expects an error wrapped in UimaEEServiceException
+		UimaEEServiceException errorWrapper = wrapErrorInUimaEEServiceException(t);
+		
+		// construct reply msg containing the error
+		DirectMessage errorReplyMessage = new DirectMessage().withCommand(AsynchAEMessage.Process)
+				.withMessageType(AsynchAEMessage.Response).withOrigin(getOrigin())
+				.withCasReferenceId(aCasReferenceId).withParentCasReferenceId(aParentCasReferenceId)
+				.withError(errorWrapper).withPayload(AsynchAEMessage.Exception);
+		@SuppressWarnings("unchecked")
+		BlockingQueue<DirectMessage> replyQueue = (BlockingQueue<DirectMessage>) anEndpoint.getReplyDestination();
+		controller.dropStats(aCasReferenceId, getName());
+
+		replyQueue.add(errorReplyMessage);
+
+	}
+
+	@Override
+	public void bindWithClientEndpoint(Endpoint anEndpoint) throws Exception {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void setServerURI(String aServerURI) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void stop() {
+		// TODO Auto-generated method stub
+		aborting = true;
+	}
+
+	@Override
+	public void cancelTimers() {
+		// TODO Auto-generated method stub
+
+	}
+
+	public boolean isStopping() {
+		return aborting;
+	}
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+public interface JavaQueueListener {
+	public void setConsumers(int howMany);
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+public interface LifecycleListener {
+	public void start();
+	public void stop();
+	
+}