You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [5/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima...

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Mon Feb 26 18:54:11 2018
@@ -38,6 +38,7 @@ import org.apache.uima.aae.client.UimaAS
 import org.apache.uima.aae.client.UimaASProcessStatusImpl;
 import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
 import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.aae.error.UimaASPingTimeout;
 import org.apache.uima.aae.error.UimaASProcessCasTimeout;
@@ -114,9 +115,62 @@ public abstract class BaseTestSupport ex
 
   protected long failedCasCountDueToBrokerFailure = 0;
   
+  protected String deployService(Transport transport, BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+          String aDeploymentDescriptorPath) throws Exception  {
+	  String serviceId = null;
+	  if ( transport.equals(Transport.Java)) {
+		  serviceId = deployJavaService(eeUimaEngine, aDeploymentDescriptorPath);
+	  } else if ( transport.equals(Transport.JMS)) {
+		  serviceId = deployJmsService(eeUimaEngine, aDeploymentDescriptorPath);
+	  }
+	  return serviceId;
+  }
+  protected String deployJavaService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+          String aDeploymentDescriptorPath) throws Exception  {
+    System.setProperty("Provider", "java");
+    System.setProperty("Protocol", "java");
+    System.setProperty("defaultBrokerURL", "java");
+    System.setProperty("DefaultBrokerURL", "java");
+
+    Map<String, Object> appCtx = new HashMap<>();
+    appCtx.put( AsynchAEMessage.Transport, Transport.Java);
+ 
+    return deployService(eeUimaEngine, aDeploymentDescriptorPath, appCtx);
+  }
+  protected String deployJmsService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+          String aDeploymentDescriptorPath)  throws Exception  {
+     System.setProperty("Provider", "activemq");
+     System.setProperty("Protocol", "jms");
+     System.setProperty("defaultBrokerURL", "tcp://localhost:61617");
+     String defaultBrokerURL = System.getProperty("defaultBrokerURL");
+     if (defaultBrokerURL != null) {
+       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+           String msg = ">>> runTest: Setting defaultBrokerURL to:" + defaultBrokerURL;
+           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "deployService",
+                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
+                   new Object[] { msg });
+       }
+       System.setProperty("defaultBrokerURL", defaultBrokerURL);
+     } else {
+       System.setProperty("defaultBrokerURL", "tcp://localhost:8118");
+     }
+
+     Map<String, Object> appCtx = new HashMap<>();
+	 appCtx.put( AsynchAEMessage.Transport, Transport.JMS);
+	 
+	 return deployService(eeUimaEngine, aDeploymentDescriptorPath, appCtx);
+  }
   protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
           String aDeploymentDescriptorPath) throws Exception {
-    String defaultBrokerURL = System.getProperty("BrokerURL");
+	  return deployService(eeUimaEngine, aDeploymentDescriptorPath, new HashMap<String, Object>());
+  }
+  protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+          String aDeploymentDescriptorPath,  Map<String, Object> appCtx) throws Exception {
+
+ // protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+ //         String aDeploymentDescriptorPath) throws Exception {
+//    String defaultBrokerURL = System.getProperty("BrokerURL");
+    String defaultBrokerURL = System.getProperty("defaultBrokerURL");
     if (defaultBrokerURL != null) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
           String msg = ">>> runTest: Setting defaultBrokerURL to:" + defaultBrokerURL;
@@ -129,15 +183,15 @@ public abstract class BaseTestSupport ex
       System.setProperty("defaultBrokerURL", "tcp://localhost:8118");
     }
 
-    Map<String, Object> appCtx = new HashMap();
-    appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
-            "../src/main/scripts/dd2spring.xsl".replace('/', FS));
-    appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
-            "file:../src/main/saxon/saxon8.jar".replace('/', FS));
+    //Map<String, Object> appCtx = new HashMap();
+//    appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
+//            "../src/main/scripts/dd2spring.xsl".replace('/', FS));
+//    appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
+//            "file:../src/main/saxon/saxon8.jar".replace('/', FS));
     // appCtx.put(UimaAsynchronousEngine.UimaEeDebug, UimaAsynchronousEngine.UimaEeDebug);
-    String containerId = null;
+    String serviceId = null;
     try {
-      containerId = eeUimaEngine.deploy(aDeploymentDescriptorPath, appCtx);
+    	serviceId = eeUimaEngine.deploy(aDeploymentDescriptorPath, appCtx);
     } catch (ResourceInitializationException e) {
       if (!ignoreException(ResourceInitializationException.class)) {
         System.out
@@ -151,7 +205,7 @@ public abstract class BaseTestSupport ex
       System.out.println(">>>>>>>>>>> runTest: Exception:" + e.getClass().getName());
       throw e;
     }
-    return containerId;
+    return serviceId;
   }
 
   protected void addExceptionToignore(Class anExceptionToIgnore) {
@@ -196,19 +250,21 @@ public abstract class BaseTestSupport ex
     return (url == null ? null : url.getPath());
   }
 
-  protected Map<String, Object> buildContext(String aTopLevelServiceBrokerURI, String aTopLevelServiceQueueName)
+  protected Map<String, Object> buildContext(String aTopLevelServiceServiceURI, String aTopLevelServiceQueueName)
           throws Exception {
-    return buildContext(aTopLevelServiceBrokerURI, aTopLevelServiceQueueName, 0);
+    return buildContext(aTopLevelServiceServiceURI, aTopLevelServiceQueueName, 0);
   }
 
-  protected Map<String, Object> buildContext(String aTopLevelServiceBrokerURI, String aTopLevelServiceQueueName,
+  protected Map<String, Object> buildContext(String aTopLevelServiceServiceURI, String aTopLevelServiceQueueName,
           int timeout) throws Exception {
     Map<String, Object> appCtx = new HashMap<String, Object>();
-    appCtx.put(UimaAsynchronousEngine.ServerUri, aTopLevelServiceBrokerURI);
+    appCtx.put(UimaAsynchronousEngine.ServerUri, aTopLevelServiceServiceURI);
     appCtx.put(UimaAsynchronousEngine.ENDPOINT, aTopLevelServiceQueueName);
     appCtx.put(UimaAsynchronousEngine.CasPoolSize, Integer.valueOf(4));
     appCtx.put(UimaAsynchronousEngine.ReplyWindow, 15);
     appCtx.put(UimaAsynchronousEngine.Timeout, timeout);
+    appCtx.put(UimaAsynchronousEngine.GetMetaTimeout,0);
+
     return appCtx;
   }
 
@@ -316,7 +372,7 @@ public abstract class BaseTestSupport ex
     isStopping = false;
     final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
     // Deploy Uima EE Primitive Service
-    final String containerId = deployService(eeUimaEngine, serviceDeplyDescriptor);
+    final String containerId = deployJavaService(eeUimaEngine, serviceDeplyDescriptor);
 
     engine = eeUimaEngine;
 
@@ -577,6 +633,9 @@ public abstract class BaseTestSupport ex
         // Wait until the CPC Thread is ready.
         waitOnMonitor(ctrlSemaphore);
         if (!isStopped) {
+           	StringBuilder sb = new StringBuilder().append("=================> Client sending ").
+        			append(i+1).append(" CAS of ").append(howMany);
+        	System.out.println(sb.toString());
           // Send an in CAS to the top level service
           sendCAS(aUimaEeEngine, 1, sendCasAsynchronously);
         }
@@ -633,6 +692,7 @@ public abstract class BaseTestSupport ex
           boolean sendCasAsynchronously) throws Exception {
     engine = eeUimaEngine;
     for (int i = 0; i < howMany; i++) {
+ 
     	if (isStopping) {
     		break;
     	}
@@ -685,8 +745,8 @@ public abstract class BaseTestSupport ex
     }
     public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
       casSent = status.getCasReferenceId();
-//      System.out.println("runTest: Received onBeforeMessageSend() Notification With CAS:"
-//              + status.getCasReferenceId());
+      System.out.println("runTest: Received onBeforeMessageSend() Notification With CAS:"
+              + status.getCasReferenceId());
     }
     public void onUimaAsServiceExit(EventTrigger cause) {
         System.out.println("runTest: Received onUimaAsServiceExit() Notification With Cause:"

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Mon Feb 26 18:54:11 2018
@@ -70,8 +70,15 @@ public class InProcessCache implements I
 
   private BaseAnalysisEngineController controller;
   
+  private boolean registeredWithJMX = false;
   
+  public boolean isRegisteredWithJMX() {
+	  return registeredWithJMX;
+  }
   
+  public void setRegisteredWithJMX() {
+	  registeredWithJMX = true;
+  }
   /**
 	  Register controller to call when the cache becomes empty.
     This call is made when the controller enters quiesce
@@ -225,41 +232,51 @@ public class InProcessCache implements I
   }
 
   public synchronized void dumpContents(String aControllerName) {
-//    int count = 0;
-    /*
-     * if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) ) { Iterator it =
-     * cache.keySet().iterator(); StringBuffer sb = new StringBuffer("\n");
-     * 
-     * while( it.hasNext() ) { String key = (String) it.next(); CacheEntry entry =
-     * (CacheEntry)cache.get(key); count++; if ( entry.isSubordinate()) { sb.append(key+
-     * " Number Of Child CASes In Play:"
-     * +entry.getSubordinateCasInPlayCount()+" Parent CAS id:"+entry.getInputCasReferenceId()); }
-     * else { sb.append(key+
-     * " *** Input CAS. Number Of Child CASes In Play:"+entry.getSubordinateCasInPlayCount()); }
-     * 
-     * // if ( entry.isWaitingForRelease() ) // { //
-     * sb.append(" <<< Reached Final State in Controller:"+aControllerName); // }
-     * 
-     * sb.append("\n"); } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST,
-     * CLASS_NAME.getName(), "dumpContents", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-     * "UIMAEE_show_cache_entry_key__FINEST", new Object[] { aControllerName, count, sb.toString()
-     * });
-     * 
-     * sb.setLength(0); } else if ( UIMAFramework.getLogger().isLoggable(Level.FINE) ) { Iterator it
-     * = cache.keySet().iterator(); StringBuffer sb = new StringBuffer("\n"); int inFinalState=0;
-     * 
-     * while( it.hasNext() ) { String key = (String) it.next(); CacheEntry entry =
-     * (CacheEntry)cache.get(key); count++;
-     * 
-     * //if ( entry.isWaitingForRelease() ) //{ //inFinalState++; //}
-     * 
-     * } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "dumpContents",
-     * UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_abbrev_cache_stats___FINE", new
-     * Object[] { aControllerName, count, inFinalState });
-     * 
-     * 
-     * }
-     */
+    int count = 0;
+    
+    //  if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) ) { 
+    	  Iterator it =
+             cache.keySet().iterator(); 
+    	  StringBuilder sb = new StringBuilder("\n");
+      
+          while( it.hasNext() ) { 
+        	  String key = (String) it.next(); 
+        	  CacheEntry entry =
+                  (CacheEntry)cache.get(key); 
+        	  count++; 
+        	  sb.append("CAS ").append(entry.getCasReferenceId()).append(" Parent:").append(entry.getInputCasReferenceId());
+        	  /*
+        	  if ( entry.isSubordinate()) { 
+        		  sb.append(key+ " Number Of Child CASes In Play:"
+                     +entry.getSubordinateCasInPlayCount()+" Parent CAS id:"+entry.getInputCasReferenceId()); 
+        	  } else { sb.append(key+
+      " *** Input CAS. Number Of Child CASes In Play:"+entry.getSubordinateCasInPlayCount()); 
+        	  }
+      
+      // if ( entry.isWaitingForRelease() ) // { //
+      sb.append(" <<< Reached Final State in Controller:"+aControllerName); // }
+      
+      sb.append("\n"); } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST,
+      CLASS_NAME.getName(), "dumpContents", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+      "UIMAEE_show_cache_entry_key__FINEST", new Object[] { aControllerName, count, sb.toString()
+      });
+      
+      sb.setLength(0); } else if ( UIMAFramework.getLogger().isLoggable(Level.FINE) ) { Iterator it
+      = cache.keySet().iterator(); StringBuffer sb = new StringBuffer("\n"); int inFinalState=0;
+      
+      while( it.hasNext() ) { String key = (String) it.next(); CacheEntry entry =
+      (CacheEntry)cache.get(key); count++;
+      
+      //if ( entry.isWaitingForRelease() ) //{ //inFinalState++; //}
+      
+      } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "dumpContents",
+      UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_abbrev_cache_stats___FINE", new
+      Object[] { aControllerName, count, inFinalState });
+      
+      */
+     // }
+      }
+      System.out.println(sb.toString());
   }
 
   public synchronized void remove(String aCasReferenceId) {
@@ -366,10 +383,15 @@ public class InProcessCache implements I
   }
 
   public MessageContext getMessageAccessorByReference(String aCasReferenceId) {
-    if (!cache.containsKey(aCasReferenceId)) {
+//    if (!cache.containsKey(aCasReferenceId)) {
+//      return null;
+//    }
+    CacheEntry casRefEntry = getEntry(aCasReferenceId);
+    if (casRefEntry == null) {
+    	System.out.println("... CAS "+aCasReferenceId+" Not Found In InprocessCache");
       return null;
     }
-    CacheEntry casRefEntry = getEntry(aCasReferenceId);
+
     return casRefEntry.getMessageAccessor();
   }
 
@@ -528,7 +550,13 @@ public class InProcessCache implements I
     }
     return casRefEntry;
   }
+  public static class UndefinedCacheEntry extends CacheEntry {
 
+	  public UndefinedCacheEntry() {
+		  super(null, null, null);
+	  }
+	  
+  }
   public static class CacheEntry {
     public static final int FINAL_STATE = 1;
 

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java Mon Feb 26 18:54:11 2018
@@ -19,12 +19,26 @@
 
 package org.apache.uima.aae;
 
+import java.util.List;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.handler.Handler;
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.message.MessageContext;
 import org.apache.uima.aae.message.MessageWrapper;
+import org.apache.uima.as.client.Listener;
 
 public interface InputChannel extends Channel {
+  // The REPLY type is only for handling replies from remote services.
+  public enum ChannelType {REPLY, REQUEST_REPLY };
+	
+  public void setController(AnalysisEngineController controller) throws Exception;
+  public void setEndpointName(String name);
+  public void setMessageHandler(Handler handler);
+  
+  public ChannelType getChannelType();
   public int getSessionAckMode();
 
   public void ackMessage(MessageContext aMessageContext);
@@ -39,7 +53,7 @@ public interface InputChannel extends Ch
 
   public boolean isStopped();
 
-  public int getConcurrentConsumerCount();
+ // public int getConcurrentConsumerCount();
 
   public void destroyListener(String anEndpointName, String aDelegateKey);
 
@@ -47,6 +61,15 @@ public interface InputChannel extends Ch
 
   public void createListenerForTargetedMessages() throws Exception;
   
+  public List<Listener> getListeners();
+  
+//  public void addListener(Listener listener);
+  public List<Listener> registerListener(Listener messageListener);
+  
+  public void disconnectListenersFromQueue() throws Exception;
+
+  public void disconnectListenerFromQueue(Listener listener) throws Exception;
+
   public boolean isFailed(String aDelegateKey);
 
   public boolean isListenerForDestination(String anEndpointName);
@@ -57,7 +80,8 @@ public interface InputChannel extends Ch
   
   public void terminate();
   
-  public void disconnectListenersFromQueue() throws Exception;
+ 
+ // public void onMessage(MessageWrapper message);
   
-  public void onMessage(MessageWrapper message);
+  public ENDPOINT_TYPE getType();
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java Mon Feb 26 18:54:11 2018
@@ -21,11 +21,16 @@ package org.apache.uima.aae;
 
 import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
 
 public interface OutputChannel extends Channel {
+	
+  public ENDPOINT_TYPE getType();
+	
   public void setController(AnalysisEngineController aContainer);
 
   public void initialize() throws AsynchAEException;
@@ -36,7 +41,7 @@ public interface OutputChannel extends C
   public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId, boolean notifyOnJmsException)
   throws AsynchAEException;
 
-  public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException;
+  public void sendReply(CasStateEntry casStateEntry, Endpoint anEndpoint) throws AsynchAEException;
 
   public void sendReply(ProcessingResourceMetaData aProcessingResourceMetadata,
           Endpoint anEndpoint, boolean serialize) throws AsynchAEException;

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,48 @@
+package org.apache.uima.aae;
+
+import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class UimaASUtils {
+	public static boolean isAbsolutePath(String filePath) {
+		return new File(filePath).isAbsolute();
+	}
+	public static String getBaseDir(String filePath) {
+		// match the end of the path. An example:
+		// /user/path/foo.xml
+		// the pattern matches /foo.xml
+		String pattern = "/[^/]*?\\.[xX][mM][lL]";
+	    Pattern r = Pattern.compile(pattern);
+
+	    // Now create matcher object.
+	    Matcher m = r.matcher(filePath);
+	    if (m.find( )) {
+	    	// strip from a given path the filename and
+	    	// return parent path.
+	    	return filePath.replace(m.group(0),"");
+	    }
+	    return null;
+	}
+	public static String replaceBackslashesWithForwardSlashes(String filePath) {
+		return filePath.replace("\\", "/");
+	}
+	public static String fixPath( String parentPath, String childPath ) {
+		// operate on a copy of childPath. In case the childPath is absolute
+		// path we will return the original passed in as an arg
+		String adjustedPath = childPath;
+		
+    	if ( childPath.startsWith("file:")) {
+    		adjustedPath = childPath.substring(6);
+    	}
+    	adjustedPath = UimaASUtils.replaceBackslashesWithForwardSlashes(adjustedPath);
+    	if ( !UimaASUtils.isAbsolutePath(adjustedPath)) {
+    		// relative path to the enclosing descriptor
+    		String baseDir = UimaASUtils.getBaseDir(parentPath);
+    		adjustedPath = baseDir+'/'+adjustedPath;
+    	} else {
+    		adjustedPath = childPath;
+    	}
+    	return adjustedPath;
+	}
+}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java Mon Feb 26 18:54:11 2018
@@ -57,19 +57,40 @@ public class UimaAsThreadFactory impleme
   
   private volatile boolean initFailed=false;
   
+  private CountDownLatch latchToCountNumberOfInitedThreads;
+
+  public UimaAsThreadFactory() {
+	  
+  }
   public UimaAsThreadFactory(ThreadGroup tGroup) {
     this(tGroup,null);
   }
  
   public UimaAsThreadFactory(ThreadGroup tGroup, PrimitiveAnalysisEngineController aController) {
-    this( tGroup, aController, null);
+    this( tGroup, aController, null, null);
   }
-  public UimaAsThreadFactory(ThreadGroup tGroup, PrimitiveAnalysisEngineController aController, CountDownLatch latchToCountNumberOfTerminatedThreads) {
+  public UimaAsThreadFactory(ThreadGroup tGroup, PrimitiveAnalysisEngineController aController, CountDownLatch latchToCountNumberOfTerminatedThreads, CountDownLatch latchToCountNumberOfInitedThreads) {
     controller = aController;
     theThreadGroup = tGroup;
     this.latchToCountNumberOfTerminatedThreads = latchToCountNumberOfTerminatedThreads;
+    this.latchToCountNumberOfInitedThreads = latchToCountNumberOfInitedThreads;
   }
-  
+  public UimaAsThreadFactory withThreadGroup(ThreadGroup tGroup) {
+	  theThreadGroup = tGroup;
+	  return this;
+  }
+  public UimaAsThreadFactory withPrimitiveController(PrimitiveAnalysisEngineController aController) {
+	  controller = aController;
+	  return this;
+  }
+  public UimaAsThreadFactory withTerminatedThreadsLatch(CountDownLatch latchToCountNumberOfTerminatedThreads) {
+	    this.latchToCountNumberOfTerminatedThreads = latchToCountNumberOfTerminatedThreads;
+	  return this;
+  }
+  public UimaAsThreadFactory withInitedThreadsLatch(CountDownLatch latchToCountNumberOfInitedThreads) {
+	    this.latchToCountNumberOfInitedThreads = latchToCountNumberOfInitedThreads;
+	  return this;
+}
   public void setThreadNamePrefix(String prefix) {
     threadNamePrefix = prefix;
   }
@@ -117,6 +138,10 @@ public class UimaAsThreadFactory impleme
             		  initFailed = true;
             		  e.printStackTrace();
             		  throw e;
+            	  }  finally {
+            		  if ( latchToCountNumberOfInitedThreads != null ) {
+                		  latchToCountNumberOfInitedThreads.countDown();
+            		  }
             	  }
               } else {
             	  return; // there was failure previously so just return

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java Mon Feb 26 18:54:11 2018
@@ -44,7 +44,7 @@ import org.apache.uima.resource.metadata
  * registered, and {@link #initialize(Map)} method is called, the application may call
  * {@link #process()} method.
  * 
- * 
+ * <p>
  * This API enables the application to dynamically deploy UIMA AS services that it intends to use
  * for processing. These services are deployed in a container and are collocated in the same JVM as
  * the application. The services are considered private and used exclusively by the application. To
@@ -53,24 +53,31 @@ import org.apache.uima.resource.metadata
  * descriptor or an array thereof. The application must deploy its "private" services *before*
  * calling {@link #initialize(Map)} method.
  * 
- * 
+ * <p>
  * The application may stop the UIMA AS client in the middle of processing by calling
  * {@link #stop()} method.
  * 
- * 
+ * <p>
  * Listeners can register with the <code>UimaAsynchronousEngine</code> by calling the
  * {@link #addStatusCallbackListener(UimaAsBaseCallbackListener)} method. These listeners receive
  * status callbacks during the processing. An exception to that is the synchronous processing via
  * {@link #sendAndReceiveCAS(CAS)} method. This method returns either a CAS containing results of
  * analysis or an exception. No callbacks are made while processing CASes synchronously.
- * 
+ * <p>
  * An application may choose to implement parallelization of the processing, calling either
  * {@link #sendAndReceiveCAS(CAS)} or {@link #sendCAS(CAS)} methods from multiple threads.
- * 
+ * <p>
  * 
  * 
  */
 public interface UimaAsynchronousEngine {
+	
+	public enum Transport {JMS, Java};
+	
+	public Transport transportType = Transport.Java;
+	
+	public final String ClientTransport = "ClientTransport";
+
   /**
    * @deprecated
    */

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java Mon Feb 26 18:54:11 2018
@@ -35,11 +35,12 @@ import org.apache.uima.aae.jmx.ServiceEr
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.jmx.ServicePerformance;
 import org.apache.uima.flow.FinalStep;
+import org.apache.uima.resource.metadata.ResourceMetaData;
 
 public interface AggregateAnalysisEngineController extends AnalysisEngineController {
-  public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException;
+//  public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException;
 
-  public void mergeTypeSystem(String aTypeSystem, String fromDestination, String fromServer)
+  public void mergeTypeSystem(ResourceMetaData aProcessingResourceMetadata, String fromDestination, String fromServer)
           throws AsynchAEException;
 
   public void setRemoteSerializationSupported(int code, String fromDestination, String fromServer);
@@ -113,7 +114,7 @@ public interface AggregateAnalysisEngine
 
   public void setRequestForMetaSentToRemotes();
 
-  public Map getDestinations();
+  public Map<String, Endpoint>  getDestinations();
 
   public ServicePerformance getServicePerformance(String aDelegateKey);
 
@@ -124,7 +125,7 @@ public interface AggregateAnalysisEngine
   public boolean delayCasIfDelegateInTimedOutState(String aCasReferenceId, String aDelegateKey, long casHashcode)
           throws AsynchAEException;
 
-  public List getChildControllerList();
+  public List<AnalysisEngineController> getChildControllerList();
 
   public void stopCasMultiplier(Delegate casMultiplier, String aCasReferenceId);
 

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Mon Feb 26 18:54:11 2018
@@ -100,7 +100,8 @@ import org.apache.uima.util.Logger;
 import org.apache.uima.util.TypeSystemUtil;
 import org.apache.uima.util.XMLInputSource;
 
-public class AggregateAnalysisEngineController_impl extends BaseAnalysisEngineController implements
+public class AggregateAnalysisEngineController_impl extends BaseAnalysisEngineController 
+implements
         AggregateAnalysisEngineController, AggregateAnalysisEngineController_implMBean {
 
   /**
@@ -217,6 +218,8 @@ public class AggregateAnalysisEngineCont
               new Object[] { getComponentName(), aChildController.getComponentName() });
     }
     synchronized(childControllerList) {
+    	System.out.println(":::::::::::::::::: Aggregate:"+getComponentName()+" Registering Child Controller:"+aChildController.getComponentName());;
+
       childControllerList.add(aChildController);
     }
   }
@@ -228,6 +231,7 @@ public class AggregateAnalysisEngineCont
 
  
   public void addMessageOrigin(String aCasReferenceId, Endpoint anEndpoint) {
+//	  Thread.dumpStack();
     if (anEndpoint == null) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "addMessageOrigin",
@@ -236,23 +240,26 @@ public class AggregateAnalysisEngineCont
       }
       return;
     }
+    System.out.println(".... Aggregate "+getComponentName()+" addMessageOrigin() - Adding endoint with hashCode:"+anEndpoint.hashCode()+" For CAS:"+aCasReferenceId);
+
     originMap.put(aCasReferenceId, anEndpoint);
-    if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+//    if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
       Iterator it = originMap.keySet().iterator();
-      StringBuffer sb = new StringBuffer();
+      StringBuilder sb = new StringBuilder();
       while (it.hasNext()) {
         String key = (String) it.next();
         Endpoint e = (Endpoint) originMap.get(key);
         if (e != null) {
           sb.append("\t\nCAS:" + key + " Origin:" + e.getEndpoint());
         }
-      }
+ //     }
       /*
        * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
        * "addMessageOrigin", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
        * "UIMAEE_dump_msg_origin__FINE", new Object[] {getComponentName(), sb.toString()});
        */
     }
+      System.out.println(".... Aggregate "+getComponentName()+" addMessageOrigin() - Origin Map Contents:\n"+sb.toString());
   }
 
   public boolean isDelegateDisabled(String aDelegateKey) {
@@ -270,6 +277,7 @@ public class AggregateAnalysisEngineCont
 
 
   public void setServiceEndpointName(String anEndpointName) {
+	  //Thread.currentThread().dumpStack();
     serviceEndpointName = anEndpointName;
     if (this.isTopLevelComponent()) {
       // This is done so that the collocated client application can determine where to send messages
@@ -293,6 +301,8 @@ public class AggregateAnalysisEngineCont
  
   public Endpoint getMessageOrigin(String aCasReferenceId) {
       if (originMap.containsKey(aCasReferenceId)) {
+  	    System.out.println(".... Aggregate "+getComponentName()+" getMessageOrigin() - endoint with hashCode:"+((Endpoint) originMap.get(aCasReferenceId)).hashCode()+" For CAS:"+aCasReferenceId);
+
         return (Endpoint) originMap.get(aCasReferenceId);
     }
     return null;
@@ -335,9 +345,9 @@ public class AggregateAnalysisEngineCont
 
   public void mapEndpointsToKeys(ConcurrentHashMap aDestinationMap) {
     destinationMap = aDestinationMap;
-    Set set = destinationMap.entrySet();
-    for (Iterator it = set.iterator(); it.hasNext();) {
-      Map.Entry entry = (Map.Entry) it.next();
+    Set<Map.Entry<String, Endpoint>> set = destinationMap.entrySet();
+    for (Iterator<Map.Entry<String, Endpoint>> it = set.iterator(); it.hasNext();) {
+      Map.Entry<String, Endpoint> entry =  it.next();
       Endpoint endpoint = (Endpoint) entry.getValue();
       if (endpoint != null) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -346,6 +356,7 @@ public class AggregateAnalysisEngineCont
                   "UIMAEE_endpoint_to_key_map__FINE",
                   new Object[] { getName(), (String) entry.getKey(), endpoint.getEndpoint() });
         }
+        System.out.println("............ Aggregate:"+getName()+" Delegate:"+entry.getKey()+" Endpoint:"+endpoint.getEndpoint() );
         if (destinationToKeyMap == null) {
           destinationToKeyMap = new HashMap();
         }
@@ -462,6 +473,7 @@ public class AggregateAnalysisEngineCont
     if (aClientEndpoint == null) {
       aClientEndpoint = getClientEndpoint();
     }
+    /*
     if (!aClientEndpoint.isRemote()) {
       UimaTransport transport = getTransport(aClientEndpoint.getEndpoint());
       UimaMessage message = transport.produceMessage(AsynchAEMessage.CollectionProcessComplete,
@@ -471,7 +483,8 @@ public class AggregateAnalysisEngineCont
     } else {
       getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint, null, false);
     }
-
+*/
+    getOutputChannel(aClientEndpoint).sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint, null, false);
     clearStats();
   }
 
@@ -486,10 +499,12 @@ public class AggregateAnalysisEngineCont
       boolean cacheNotEmpty = true;
       boolean shownOnce = false;
       final Object localMux = new Object();
+      
+      /*
       while (cacheNotEmpty) {
         InProcessCache cache = getInProcessCache();
         if (!shownOnce) {
-          shownOnce = true;
+          //shownOnce = true;
           cache.dumpContents(getComponentName());
         }
 
@@ -501,6 +516,7 @@ public class AggregateAnalysisEngineCont
           }
         }
       }
+      */
     } catch (Exception e) {
       throw new AsynchAEException(e);
     }
@@ -563,10 +579,11 @@ public class AggregateAnalysisEngineCont
         }
       }
     } else {
-      Set set = destinationMap.entrySet();
-      for (Iterator it = set.iterator(); it.hasNext();) {
-        Map.Entry entry = (Map.Entry) it.next();
+      Set<?> set = destinationMap.entrySet();
+      for (Iterator<?> it = set.iterator(); it.hasNext();) {
+        Map.Entry<String, Endpoint> entry = (Map.Entry) it.next();
         Endpoint endpoint = (Endpoint) entry.getValue();
+        /*
         if (endpoint != null && endpoint.getStatus() == Endpoint.OK) {
 
           if (!endpoint.isRemote()) {
@@ -593,6 +610,10 @@ public class AggregateAnalysisEngineCont
             endpoint.startCollectionProcessCompleteTimer();
           }
         }
+        */
+        getOutputChannel(endpoint).sendRequest(AsynchAEMessage.CollectionProcessComplete, null, endpoint);
+        endpoint.startCollectionProcessCompleteTimer();
+
       }
     }
   }
@@ -609,7 +630,7 @@ public class AggregateAnalysisEngineCont
     return false;
   }
 
-  public Map getDestinations() {
+  public Map<String, Endpoint> getDestinations() {
     return destinationMap;
   }
 
@@ -639,23 +660,23 @@ public class AggregateAnalysisEngineCont
 
   private void stopListener(String key, Endpoint endpoint) throws Exception {
     // Stop the Listener on endpoint that has been disabled
-    InputChannel iC = null;
+//    InputChannel iC = null;
     String destName = null;
     if (endpoint.getDestination() != null) {
       destName = endpoint.getDestination().toString();
-      iC = getInputChannel(destName);
+ //     iC = getInputChannel(destName);
     } else {
       destName = endpoint.getReplyToEndpoint();
-      iC = getInputChannel(destName);
+//      iC = getInputChannel(destName);
     }
-    if (iC != null) {
+//    if (iC != null) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "stopListener",
                 UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stopping_listener__INFO",
                 new Object[] { getComponentName(), destName, key });
       }
-      iC.destroyListener(destName, key);
-    }
+      inputChannel.destroyListener(destName, key);
+//    }
   }
 
   public void disableDelegates(List aDelegateList) throws AsynchAEException {
@@ -882,7 +903,7 @@ public class AggregateAnalysisEngineCont
           if (localCache.lookupEntry(aNewCasReferenceId) == null) {
             // Add this Cas Id to the local cache. Every input CAS goes through here
             CasStateEntry casStateEntry = localCache.createCasStateEntry(aNewCasReferenceId);
-            casStateEntry.setInputCasReferenceId(anInputCasReferenceId);
+            casStateEntry.setParentCasReferenceId(anInputCasReferenceId);
           }
 
           // Save the subordinate Flow Object in a cache. Flow exists in the
@@ -947,7 +968,7 @@ public class AggregateAnalysisEngineCont
       CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
       Endpoint replyEndpoint = getReplyEndpoint(cacheEntry, casStateEntry);
       if (replyEndpoint != null) {
-        getOutputChannel().sendReply(new ServiceShutdownException(), aCasReferenceId, null,
+        getOutputChannel(replyEndpoint).sendReply(new ServiceShutdownException(), aCasReferenceId, null,
                 replyEndpoint, AsynchAEMessage.Process);
       }
     } catch (Exception ex) {
@@ -970,7 +991,7 @@ public class AggregateAnalysisEngineCont
       // Check if this CAS has a parent
       if (casStateEntry.isSubordinate()) {
         // Fetch parent's cache entry
-        parentCasStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId());
+        parentCasStateEntry = getLocalCache().lookupEntry(casStateEntry.getParentCasReferenceId());
         // Check the state of the parent CAS. If it is marked as failed, it means that
         // one of its child CASes failed and error handling was configured to fail the
         // CAS. Such failure of a child CAS causes a failure of the parent CAS. All child
@@ -990,7 +1011,7 @@ public class AggregateAnalysisEngineCont
               fcEndpoint.setReplyEndpoint(true);
               fcEndpoint.setIsCasMultiplier(true);
               fcEndpoint.setFreeCasEndpoint(true);
-              getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(),
+              getOutputChannel(fcEndpoint).sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(),
                       fcEndpoint);
             }
             // Check if a request to stop generation of new CASes from the parent of
@@ -1490,7 +1511,8 @@ public class AggregateAnalysisEngineCont
   }
 
   public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException {
-    synchronized(childControllerList) {
+    /*
+	  synchronized(childControllerList) {
       //  Add a delay of 100ms before sending requests for metadata to remote delegates.
       //  This is done to give the broker enough time to 'finalize' creation of
       //  temp reply queues for each remote delegate. It's been observed (on MAC OS only) that AMQ
@@ -1513,6 +1535,9 @@ public class AggregateAnalysisEngineCont
         }
       }
     }
+    */
+	    System.out.println("........... Aggregate.sendRequestForMetadataToRemoteDelegates():"+getName());
+
     Endpoint[] delegateEndpoints = new Endpoint[destinationMap.size()];
 
     // First copy endpoints to an array so that we dont get Concurrent access problems
@@ -1574,6 +1599,8 @@ public class AggregateAnalysisEngineCont
             return;
           }
           if (delegateEndpoints[i].getStatus() == Endpoint.OK ) {
+              System.out.println("----- Aggregate Service:"+getName()+ " dispatching GetMeta request to remote "+delegateEndpoints[i].getEndpoint());
+
             dispatchMetadataRequest(delegateEndpoints[i]);
           }
         }
@@ -1583,8 +1610,9 @@ public class AggregateAnalysisEngineCont
         // collocated delegate
         delegateEndpoints[i].initialize();
         delegateEndpoints[i].setController(this);
-
         delegateEndpoints[i].setWaitingForResponse(true);
+        getOutputChannel(ENDPOINT_TYPE.DIRECT).sendRequest(AsynchAEMessage.GetMeta, null, delegateEndpoints[i]);
+/*
         try {
           UimaMessage message = getTransport(delegateEndpoints[i].getEndpoint()).produceMessage(
                   AsynchAEMessage.GetMeta, AsynchAEMessage.Request, getName());
@@ -1593,6 +1621,7 @@ public class AggregateAnalysisEngineCont
         } catch (Exception e) {
           throw new AsynchAEException(e);
         }
+        */
       }
     }
   }
@@ -1600,7 +1629,7 @@ public class AggregateAnalysisEngineCont
   private CasStateEntry fetchParentCasFromLocalCache(CasStateEntry casStateEntry) throws Exception {
     // Lookup parent CAS in the local cache
     CasStateEntry parentCasStateEntry = localCache.lookupEntry(casStateEntry
-            .getInputCasReferenceId());
+            .getParentCasReferenceId());
     if (parentCasStateEntry == null) {
 
       if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
@@ -1623,7 +1652,7 @@ public class AggregateAnalysisEngineCont
     try {
       // Fetch the parent Cas cache entry
       parentCASCacheEntry = getInProcessCache().getCacheEntryForCAS(
-              casStateEntry.getInputCasReferenceId());
+              casStateEntry.getParentCasReferenceId());
     } catch (Exception ex) {
 
       if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
@@ -1633,7 +1662,7 @@ public class AggregateAnalysisEngineCont
                 "fetchParentCasFromGlobalCache",
                 UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                 "UIMAEE_cas_not_found__INFO",
-                new Object[] { getComponentName(), casStateEntry.getInputCasReferenceId(),
+                new Object[] { getComponentName(), casStateEntry.getParentCasReferenceId(),
                     "InProcessCache" });
       }
     }
@@ -1641,6 +1670,7 @@ public class AggregateAnalysisEngineCont
   }
 
   private boolean casHasChildrenInPlay(CasStateEntry casStateEntry) throws Exception {
+	  System.out.println(".. FinalState.casHasChildrenInPlay()-CAS:"+casStateEntry.getCasReferenceId()+" Number of Child CASes in Play:"+casStateEntry.getSubordinateCasInPlayCount());
     if (casStateEntry.getSubordinateCasInPlayCount() > 0) {
       // This CAS has child CASes still in play. This CAS will remain in the cache
       // until all its children are fully processed.
@@ -1716,8 +1746,8 @@ public class AggregateAnalysisEngineCont
     }
     // Found entries in caches for a given CAS id
     try {
-      endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
-
+     // endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
+    	endpoint = getMessageOrigin(aCasReferenceId);
       synchronized (super.finalStepMux) {
         // Check if the global cache still contains the CAS. It may have been deleted by another
         // thread already
@@ -1766,12 +1796,15 @@ public class AggregateAnalysisEngineCont
                   new Object[] { getComponentName(), aCasReferenceId });
         }
         // Determine if this CAS is a child of some CAS
-        isSubordinate = casStateEntry.getInputCasReferenceId() != null;
+        isSubordinate = casStateEntry.getParentCasReferenceId() != null;
 
         if (isSubordinate) {
           // fetch the destination of a CM that produced this CAS, so that we know where to send
           // Free Cas Notification
-          freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
+         // freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
+          freeCasEndpoint = casStateEntry.getFreeCasNotificationEndpoint();
+     	  System.out.println(".........Service:"+getComponentName()+" Cas:"+aCasReferenceId+" is subordinate - freeCasEndpoint="+freeCasEndpoint);
+
           parentCasStateEntry = fetchParentCasFromLocalCache(casStateEntry);
           parentCASCacheEntry = fetchParentCasFromGlobalCache(casStateEntry);
           doDecrementChildCount = true;
@@ -1796,6 +1829,7 @@ public class AggregateAnalysisEngineCont
             if (isSubordinate) {
               // drop the flow since we no longer need it
               dropFlow(aCasReferenceId, true);
+              System.out.println(">>>>>>>>>>>> Controller:"+getComponentName()+" Dropping CAS:"+aCasReferenceId);
               // Drop the CAS and remove cache entry for it
               dropCAS(aCasReferenceId, true);
               casDropped = true;
@@ -1817,14 +1851,18 @@ public class AggregateAnalysisEngineCont
           cEndpoint = replyToClient(cacheEntry, casStateEntry);
           replySentToClient = true;
           
-          if (cEndpoint != null && cEndpoint.isRemote()) {
+          if (cEndpoint != null && cEndpoint.isRemote() && !cEndpoint.getServerURI().equals("java")) {
             // if this service is a Cas Multiplier don't remove the CAS. It will be removed
             // when a remote client sends explicit Release CAS Request
             if (!isCasMultiplier()) {
+            	System.out.println(".... Aggregate:"+getComponentName()+" Releasing CAS:"+aCasReferenceId +" Client is Remote");
+
               // Drop the CAS and remove cache entry for it
               dropCAS(aCasReferenceId, true);
             }
             casDropped = true;
+          } else if (cEndpoint != null && cEndpoint.getServerURI().equals("java")) {
+        	  casDropped = true;
           } else {
             // Remove entry from the local cache for this CAS. If the client
             // is remote the entry was removed in replyToClient()
@@ -1839,12 +1877,13 @@ public class AggregateAnalysisEngineCont
         }
 
         if (parentCasStateEntry == null && isSubordinate) {
-          parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
+          parentCasStateEntry = localCache.lookupEntry(casStateEntry.getParentCasReferenceId());
         }
         if (doDecrementChildCount) {
           // Child CAS has been fully processed, decrement its parent count of active child CASes
           if (parentCasStateEntry != null) {
             parentCasStateEntry.decrementSubordinateCasInPlayCount();
+        	  System.out.println("... Controller:"+getComponentName()+" CAS:"+parentCasStateEntry.getCasReferenceId()+" Decremented Child Count - Courrent Count:"+parentCasStateEntry.getSubordinateCasInPlayCount());
             // If debug level=FINEST dump the entire cache
             localCache.dumpContents();
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -1932,8 +1971,10 @@ public class AggregateAnalysisEngineCont
           freeCasEndpoint.setReplyEndpoint(true);
           freeCasEndpoint.setIsCasMultiplier(true);
           freeCasEndpoint.setFreeCasEndpoint(true);
+          System.out.println("....Aggregate "+getComponentName()+" Sending ReleaseCAS to remote CM - CASID:"+aCasReferenceId);
+
           // send Free CAS Notification to a Cas Multiplier
-          getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId,
+          getOutputChannel(freeCasEndpoint).sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId,
                   freeCasEndpoint);
         } catch (Exception e) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -1984,7 +2025,8 @@ public class AggregateAnalysisEngineCont
   }
 
   private boolean forceToDropTheCas(CasStateEntry entry, CacheEntry cacheEntry, FinalStep aStep) {
-    // Get the key of the Cas Producer
+   
+	  // Get the key of the Cas Producer
     String casProducer = cacheEntry.getCasProducerAggregateName();
     // CAS is considered new from the point of view of this service IF it was produced by it
     boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(
@@ -2025,10 +2067,13 @@ public class AggregateAnalysisEngineCont
               new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
                   replyEndpoint.getEndpoint() });
     }
+    getOutputChannel(replyEndpoint).sendReply(casStateEntry.getErrors().get(0),
+            casStateEntry.getCasReferenceId(), null, replyEndpoint, AsynchAEMessage.Process);
+/*
     if (replyEndpoint.isRemote()) {
       // this is an input CAS that has been marked as failed. Return the input CAS
       // and an exception to the client.
-      getOutputChannel().sendReply(casStateEntry.getErrors().get(0),
+      getOutputChannel(replyEndpoint).sendReply(casStateEntry.getErrors().get(0),
               casStateEntry.getCasReferenceId(), null, replyEndpoint, AsynchAEMessage.Process);
     } else {
       replyEndpoint.setReplyEndpoint(true);
@@ -2056,6 +2101,7 @@ public class AggregateAnalysisEngineCont
       vmTransport.getUimaMessageDispatcher(replyEndpoint.getEndpoint()).dispatch(message);
       dropStats(casStateEntry.getCasReferenceId(),getName());
     }
+    */
   }
 
   private boolean sendExceptionToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry,
@@ -2070,7 +2116,7 @@ public class AggregateAnalysisEngineCont
 
       // Fetch the top ancestor CAS of this CAS.
       CasStateEntry topAncestorCasStateEntry = getLocalCache().getTopCasAncestor(
-              casStateEntry.getInputCasReferenceId());
+              casStateEntry.getParentCasReferenceId());
       if ( topAncestorCasStateEntry != null ) {
     	// check the state
           if (topAncestorCasStateEntry.isFailed() && casHasExceptions(casStateEntry)
@@ -2125,7 +2171,7 @@ public class AggregateAnalysisEngineCont
       sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
     } else {
       // Send response to a given endpoint
-      getOutputChannel().sendReply(cacheEntry, replyEndpoint);
+      getOutputChannel(replyEndpoint).sendReply(casStateEntry, replyEndpoint);
     }
     // Drop the CAS only if the client is remote and the CAS is an input CAS OR
     // the CAS is a child but there was a failure delivering it to a client. The client
@@ -2139,7 +2185,7 @@ public class AggregateAnalysisEngineCont
 		if ( casStateEntry.isSubordinate()) {
 			try {
 				
-				String inputCasId = casStateEntry.getInputCasReferenceId();
+				String inputCasId = casStateEntry.getParentCasReferenceId();
 				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
 					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
 							"sendReplyToRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -2175,7 +2221,9 @@ public class AggregateAnalysisEngineCont
                 "UIMAEE_client_dead__FINE",
                 new Object[] { getComponentName(), replyEndpoint.getDestination().toString(), casStateEntry.getCasReferenceId()});
         }
-    	dropCAS(casStateEntry.getCasReferenceId(), true);
+    	if ( !replyEndpoint.getServerURI().equals("java")) {
+        	dropCAS(casStateEntry.getCasReferenceId(), true);
+    	}
     	// If the cache is empty change the state of the Aggregate to idle
     	if (getInProcessCache().isEmpty()) {
     	  endProcess(AsynchAEMessage.Process);
@@ -2223,23 +2271,29 @@ public class AggregateAnalysisEngineCont
         // client perspective, this Cas Multiplier Aggregate is a black box,
         // all CASes produced here must be linked with the input CAS.
         // Find the top ancestor of this CAS. It is the input CAS sent by the client
-        String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
+//        String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
+        String inputCasId = casStateEntry.getInputCasReferenceId(); //getLocalCache().lookupInputCasReferenceId(casStateEntry);
         // Modify the parent of this CAS.
         if (inputCasId != null ) {
-          if ( !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
-            cacheEntry.setInputCasReferenceId(inputCasId);
-          }
-          // Update counters in the parents controller local cache. 
-          CasStateEntry parentCasStateEntry = 
-            parentController.getLocalCache().lookupEntry(inputCasId);
-          if ( parentCasStateEntry != null ) {
-            parentCasStateEntry.incrementSubordinateCasInPlayCount();
-            parentCasStateEntry.incrementOutstandingFlowCounter();
+//          if ( !inputCasId.equals(casStateEntry.getParentCasReferenceId())) {
+//            cacheEntry.setInputCasReferenceId(inputCasId);
+//          }
+          if ( parentController != null ) {
+        	  // Update counters in the parents controller local cache. 
+              CasStateEntry parentCasStateEntry = 
+                parentController.getLocalCache().lookupEntry(inputCasId);
+              if ( parentCasStateEntry != null ) {
+                parentCasStateEntry.incrementSubordinateCasInPlayCount();
+                parentCasStateEntry.incrementOutstandingFlowCounter();
+              } 
           }
+         
         }
       }
       // Send CAS to a given reply endpoint
-      sendVMMessage(mType, replyEndpoint, cacheEntry);
+//      sendVMMessage(mType, replyEndpoint, cacheEntry);
+      getOutputChannel(replyEndpoint).sendReply(casStateEntry, replyEndpoint);
+
     }
   }
 
@@ -2263,7 +2317,7 @@ public class AggregateAnalysisEngineCont
       HashMap map = new HashMap();
       map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
       map.put(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
-      handleError(map, new UnknownDestinationException());
+      handleError(map, new UnknownDestinationException("Controller:"+getComponentName()+" CasReferenceId:"+casStateEntry.getCasReferenceId()+" Destination Not Found"));
       return false;
     }
     // Dont send a reply to the client if the client is a CAS multiplier
@@ -2276,7 +2330,14 @@ public class AggregateAnalysisEngineCont
 
   private Endpoint replyToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry)
           throws Exception {
-    Endpoint endpoint = getReplyEndpoint(cacheEntry, casStateEntry);
+	 // Thread.dumpStack();
+   // Endpoint endpoint = getReplyEndpoint(cacheEntry, casStateEntry);
+    
+    CasStateEntry inputCasCasStateEntry = getLocalCache().get(casStateEntry.getInputCasReferenceId());
+    
+    Endpoint endpoint = inputCasCasStateEntry.getClientEndpoint();
+   // Endpoint endpoint = getMessageOrigin(cacheEntry.getCasReferenceId());
+    
     if (!validEndpoint(endpoint, casStateEntry)) {
       return null; // the reason has already been logged
     }
@@ -2294,8 +2355,12 @@ public class AggregateAnalysisEngineCont
     if (!isStopped()) {
       try {
         if (endpoint.isRemote()) {
+       	  System.out.println("......replyToClient()[Remote] - Controller:"+getComponentName()+" CasID:"+cacheEntry.getCasReferenceId());
+
           sendReplyToRemoteClient(cacheEntry, casStateEntry, endpoint);
         } else {
+        	  System.out.println("......replyToClient()[Direct] - Controller:"+getComponentName()+" CasID:"+cacheEntry.getCasReferenceId());
+
           sendReplyToCollocatedClient(cacheEntry, casStateEntry, endpoint);
         }
       } catch ( Exception e) {
@@ -2323,6 +2388,7 @@ public class AggregateAnalysisEngineCont
 	  }
 	  return false;
   }
+  /*
   private void sendVMMessage(int messageType, Endpoint endpoint, CacheEntry cacheEntry)
           throws Exception {
     // If the CAS was produced by this aggregate send the request message to the client
@@ -2352,7 +2418,7 @@ public class AggregateAnalysisEngineCont
     dropStats(cacheEntry.getCasReferenceId(), getName());
 
   }
-
+*/
   private Endpoint getReplyEndpoint(CacheEntry cacheEntry, CasStateEntry casStateEntry)
           throws Exception {
     Endpoint endpoint = null;
@@ -2481,6 +2547,8 @@ public class AggregateAnalysisEngineCont
 
   private void dispatch(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
     if (!anEndpoint.isRemote()) {
+        getOutputChannel(ENDPOINT_TYPE.DIRECT).sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), anEndpoint);
+/*
       try {
         UimaTransport transport = getTransport(anEndpoint.getEndpoint());
         UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
@@ -2499,6 +2567,7 @@ public class AggregateAnalysisEngineCont
                   "UIMAEE_exception__WARNING", e);
         }
       }
+      */
     } else {
       // Check delegate's state before sending it a CAS. The delegate
       // may have previously timed out and is in a process of pinging
@@ -2508,7 +2577,7 @@ public class AggregateAnalysisEngineCont
       // delayed CASes will be dispatched to the delegate.
       if (!delayCasIfDelegateInTimedOutState(entry.getCasReferenceId(), anEndpoint.getDelegateKey(), entry.getCas().hashCode())) {
         // The delegate is in the normal state so send it this CAS
-        getOutputChannel().sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), anEndpoint);
+        getOutputChannel(anEndpoint).sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), anEndpoint);
       }
     }
   }
@@ -2612,7 +2681,7 @@ public class AggregateAnalysisEngineCont
         if (endpoint.getSerialFormat() == SerialFormat.BINARY) {
           endpoint.setSerialFormat(SerialFormat.XMI);  
         }
-        getOutputChannel().sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), endpoint);
+        getOutputChannel(endpoint).sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), endpoint);
       }    
     }
 
@@ -2729,15 +2798,20 @@ public class AggregateAnalysisEngineCont
                                                                                                      SerialFormat.COMPRESSED_FILTERED);
     }
   }
-  
-  public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException {
-    mergeTypeSystem(aTypeSystem, fromDestination, null);
-  }
+//  
+//  public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException {
+//    mergeTypeSystem(aTypeSystem, fromDestination, null);
+//  }
 
 //  public synchronized void mergeTypeSystem(String aTypeSystem, String fromDestination,
-  public void mergeTypeSystem(String aTypeSystem, String fromDestination,
+//  public void mergeTypeSystem(String aTypeSystem, String fromDestination,
+ //         String fromServer) throws AsynchAEException {
+  public void mergeTypeSystem(ResourceMetaData resource, String fromDestination,
           String fromServer) throws AsynchAEException {
+
     mergeLock.lock();
+    System.out.println("AggregateAnalysisEngineController.mergeTypeSystem() - from "+fromDestination);
+
     try {
       // Find the endpoint for this service, given its input queue name and broker URI.
       // We now allow endpoints managed by different servers to have the same queue name.
@@ -2763,9 +2837,10 @@ public class AggregateAnalysisEngineCont
         if ( endpoint.getServiceInfo() != null ) {
           endpoint.getServiceInfo().setState(ServiceState.RUNNING.name());
         }
-        ResourceMetaData resource = null;
+//        ResourceMetaData resource = null;
         ServiceInfo remoteDelegateServiceInfo = null;
-        if (aTypeSystem.trim().length() > 0) {
+//        if (aTypeSystem.trim().length() > 0) {
+        if ( resource != null ) {  
           if (endpoint.isRemote()) {
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
@@ -2779,10 +2854,12 @@ public class AggregateAnalysisEngineCont
                     "mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                     "UIMAEE_merge_ts_from_delegate__CONFIG", new Object[] { fromDestination });
           }
+          /*
           ByteArrayInputStream bis = new ByteArrayInputStream(aTypeSystem.getBytes());
           XMLInputSource in1 = new XMLInputSource(bis, null);
 
           resource = UIMAFramework.getXMLParser().parseResourceMetaData(in1);
+          */
           if (isStopped()) {
             return;
           }
@@ -2832,7 +2909,11 @@ public class AggregateAnalysisEngineCont
 
         //  
         if (collocatedAggregate || resource instanceof ProcessingResourceMetaData) {
-          if (allTypeSystemsMerged()) {
+        	System.out.println("#############  Aggregate:"+getName()+" Merged Typesystem from "+fromDestination);//+" allTypeSystemsMerged():"+allTypeSystemsMerged());
+
+        	if (allTypeSystemsMerged()) {
+                System.out.println("!!!!! AGGREGATE:"+getName()+" Got Metadata from ALL delegates");
+
             if (!isStopped()) {
               try {
                 completeInitialization();
@@ -2862,6 +2943,7 @@ public class AggregateAnalysisEngineCont
         }
       }
     } catch (Exception e) {
+    	e.printStackTrace();
       throw new AsynchAEException(e);
     } finally {
     	mergeLock.unlock();
@@ -3007,11 +3089,14 @@ public class AggregateAnalysisEngineCont
 	  return aggTypePriorities;
   }
   protected void startProcessing() throws Exception {
-	  
+	  System.out.println(",,,,,,,,,,, Controller "+getName()+" Opening Latch ,,,,,,,,,,,,");
+
 	    // Open latch to allow messages to be processed. The
 	    // latch was closed to prevent messages from entering
 	    // the controller before it is initialized.
 	    latch.openLatch(getName(), isTopLevelComponent(), true);
+		  System.out.println(",,,,,,,,,,, Controller "+getName()+" Latch is Opened,,,,,,,,,,,, Latch hashcode:"+latch.hashCode());
+
 	    initialized = true;
 	    // Notify client listener that the initialization of the controller was successfull
 	    notifyListenersWithInitializationStatus(null);
@@ -3055,6 +3140,8 @@ public class AggregateAnalysisEngineCont
       }
       delegateCount++;
     }
+    System.out.println("+++++++++++++ delegateCount="+delegateCount+" destinationMap.size()="+destinationMap.size());
+
     if (delegateCount == destinationMap.size()) {
       return true; // All delegates responded to GetMeta request
     }
@@ -3110,7 +3197,7 @@ public class AggregateAnalysisEngineCont
 
       }
     }
-    getOutputChannel().sendRequest(AsynchAEMessage.GetMeta, null, anEndpoint);
+    getOutputChannel(anEndpoint).sendRequest(AsynchAEMessage.GetMeta, null, anEndpoint);
   }
 
   public void retryMetadataRequest(Endpoint anEndpoint) throws AsynchAEException {
@@ -3163,7 +3250,7 @@ public class AggregateAnalysisEngineCont
         getInProcessCache().getEndpoint(anEndpoint, casReferenceId).cancelTimer();
         Endpoint requestOrigin = cachedEntries[i].getMessageOrigin();
         try {
-          getOutputChannel().sendReply(
+          getOutputChannel(requestOrigin).sendReply(
                   new UimaEEServiceException("Delegates Not Found To Process CAS on Endpoint:"
                           + anEndpoint), casReferenceId, parentCasReferenceId, requestOrigin,
                   AsynchAEMessage.Process);
@@ -3186,7 +3273,7 @@ public class AggregateAnalysisEngineCont
 
   public void retryLastCommand(int aCommand, Endpoint anEndpoint, String aCasReferenceId) {
     try {
-      getOutputChannel().sendRequest(aCommand, aCasReferenceId, anEndpoint);
+      getOutputChannel(anEndpoint).sendRequest(aCommand, aCasReferenceId, anEndpoint);
     } catch (AsynchAEException e) {
 
     }
@@ -3206,7 +3293,7 @@ public class AggregateAnalysisEngineCont
       // block in getInputChannel() on the latch
       if (isTopLevelComponent() && getInputChannel() != null) {
         serviceInfo.setInputQueueName(getInputChannel().getName());
-        serviceInfo.setBrokerURL(super.getBrokerURL());
+     //   serviceInfo.setBrokerURL(super.getBrokerURL());
       } else {
         serviceInfo.setInputQueueName(getName());
         serviceInfo.setBrokerURL("vm://localhost");
@@ -3299,8 +3386,12 @@ public class AggregateAnalysisEngineCont
       cache.destroy();
     }
   }
-
+ 
   public void stop() {
+	  for( AnalysisEngineController delegate : getChildControllerList() ) {
+		  delegate.stop();
+	  }
+
 	  super.stop(true);  // shutdown now
 	  
 	  // enable blocked threads to finish // https://issues.apache.org/jira/browse/UIMA-3433
@@ -3327,7 +3418,7 @@ public class AggregateAnalysisEngineCont
     
   }
 
-  public List getChildControllerList() {
+  public List<AnalysisEngineController> getChildControllerList() {
     return childControllerList;
   }
 
@@ -3417,9 +3508,10 @@ public class AggregateAnalysisEngineCont
   }
   
   public void changeCollocatedDelegateState( String delegateKey, ServiceState state ) throws Exception {
-    if ( delegateKey != null && state != null ) {
+	  System.out.println("............. changeCollocatedDelegateState - delegateKey:"+delegateKey+" state is Null:"+(state==null));
+	  if ( delegateKey != null && state != null ) {
       synchronized(childControllerList) {
-        if ( childControllerList.size() > 0 ) {
+        if ( !childControllerList.isEmpty() ) {
           for( AnalysisEngineController childController : childControllerList ) {
             if ( delegateKey.equals(childController.getKey())) {
               childController.changeState(state);

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Mon Feb 26 18:54:11 2018
@@ -19,6 +19,7 @@
 
 package org.apache.uima.aae.controller;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
@@ -29,6 +30,7 @@ import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.OutputChannel;
 import org.apache.uima.aae.UimaAsContext;
 import org.apache.uima.aae.UimaEEAdminContext;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.ErrorContext;
@@ -41,7 +43,10 @@ import org.apache.uima.aae.monitor.Monit
 import org.apache.uima.aae.spi.transport.UimaMessageListener;
 import org.apache.uima.aae.spi.transport.UimaTransport;
 import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.Listener;
 import org.apache.uima.cas.CAS;
+import org.apache.uima.resource.ResourceSpecifier;
 
 public interface AnalysisEngineController extends ControllerLifecycle {
   public static final String CasPoolSize = "CasPoolSize";
@@ -54,17 +59,29 @@ public interface AnalysisEngineControlle
 
   public void setInputChannel(InputChannel anInputChannel) throws Exception;
 
+  public void setDirectInputChannel(DirectInputChannel anInputChannel) throws Exception;
+  
+  public void setJmsInputChannel(InputChannel anInputChannel) throws Exception;
+
+  public InputChannel getInputChannel(ENDPOINT_TYPE et);
+
   public void addInputChannel(InputChannel anInputChannel) throws Exception;
 
   public String getServiceEndpointName();
 
+  public void setServiceId(String name);
+
+  public String getServiceId();
+
   public void handleDelegateLifeCycleEvent(String anEndpoint, int aDelegateCount);
 
   public void takeAction(String anAction, String anEndpointName, ErrorContext anErrorContext);
 
   public InputChannel getInputChannel();
 
-  public InputChannel getInputChannel(String aQueueName);
+  public List<Listener> getAllListeners();
+
+  //public InputChannel getInputChannel(String aQueueName);
 
   public void saveReplyTime(long snapshot, String aKey);
 
@@ -109,10 +126,16 @@ public interface AnalysisEngineControlle
   public long getTime(String aCasReferenceId, String anEndpointName);
 
   public ErrorHandlerChain getErrorHandlerChain();
-
+/*
   public void setOutputChannel(OutputChannel anOutputChannel) throws Exception;
 
   public OutputChannel getOutputChannel();
+*/
+  public void addOutputChannel(OutputChannel anOutputChannel) throws Exception;
+
+  public OutputChannel getOutputChannel(Endpoint anEndpoint);
+  
+  public OutputChannel getOutputChannel(ENDPOINT_TYPE et);
 
   public void setCasManager(AsynchAECasManager aCasManager);
 
@@ -195,13 +218,13 @@ public interface AnalysisEngineControlle
 
   public UimaMessageListener getUimaMessageListener(String aDelegateKey);
 
-  public UimaTransport getTransport(UimaAsContext aContext, String aKey) throws Exception;
-
-  public UimaTransport getTransport(String aKey) throws Exception;
-
-  public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception;
-
-  public InputChannel getReplyInputChannel(String aDelegateKey);
+//  public UimaTransport getTransport(UimaAsContext aContext, String aKey) throws Exception;
+//
+//  public UimaTransport getTransport(String aKey) throws Exception;
+//
+//  public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception;
+//
+//  public InputChannel getReplyInputChannel(String aDelegateKey);
 
   public LocalCache getLocalCache();
 
@@ -235,6 +258,10 @@ public interface AnalysisEngineControlle
   
   public void addUimaObject(String objectName ) throws Exception;
   
+  public void setErrorHandlerChain(ErrorHandlerChain ehc);
+
+  public ResourceSpecifier getResourceSpecifier();
+
 }