You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [6/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima...

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon Feb 26 18:54:11 2018
@@ -19,7 +19,6 @@
 
 package org.apache.uima.aae.controller;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
@@ -52,11 +51,11 @@ import org.apache.uima.aae.EECasManager_
 import org.apache.uima.aae.InProcessCache;
 import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
 import org.apache.uima.aae.OutputChannel;
 import org.apache.uima.aae.UIDGenerator;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
-import org.apache.uima.aae.UimaAsContext;
 import org.apache.uima.aae.UimaAsVersion;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.UimaEEAdminContext;
@@ -69,7 +68,6 @@ import org.apache.uima.aae.error.ErrorCo
 import org.apache.uima.aae.error.ErrorHandler;
 import org.apache.uima.aae.error.ErrorHandlerChain;
 import org.apache.uima.aae.error.ForcedMessageTimeoutException;
-import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.aae.error.UimaAsUncaughtExceptionHandler;
 import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
 import org.apache.uima.aae.jmx.JmxManagement;
@@ -83,15 +81,16 @@ import org.apache.uima.aae.monitor.Monit
 import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
 import org.apache.uima.aae.monitor.statistics.Statistic;
 import org.apache.uima.aae.monitor.statistics.Statistics;
-import org.apache.uima.aae.spi.transport.UimaMessage;
 import org.apache.uima.aae.spi.transport.UimaMessageListener;
 import org.apache.uima.aae.spi.transport.UimaTransport;
-import org.apache.uima.aae.spi.transport.vm.VmTransport;
 import org.apache.uima.analysis_engine.AnalysisEngine;
 import org.apache.uima.analysis_engine.AnalysisEngineDescription;
 import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.analysis_engine.metadata.SofaMapping;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.Listener;
+import org.apache.uima.as.client.Listener.Type;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.collection.CollectionReaderDescription;
 import org.apache.uima.resource.PearSpecifier;
@@ -104,10 +103,16 @@ import org.apache.uima.util.Level;
 
 public abstract class BaseAnalysisEngineController extends Resource_ImplBase implements
         AnalysisEngineController, EventSubscriber {
-  private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
+  public static enum ENDPOINT_TYPE {
+	JMS,
+	DIRECT
+  };
+  private static final Class<?> CLASS_NAME = BaseAnalysisEngineController.class;
   private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
-  public static enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
+  public enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
   public static final boolean NO_RECOVERY = true;
+  public static final ENDPOINT_TYPE DEFAULT_OUTPUTCHANNEL_TYPE = ENDPOINT_TYPE.JMS;
+
   // Semaphore use only when quiesceAndStop is called
   // When the cache becomes empty the semaphore is released.
   private Semaphore quiesceSemaphore = new Semaphore(0);
@@ -148,9 +153,10 @@ public abstract class BaseAnalysisEngine
 
   protected long errorCount = 0;
 
-  protected List<InputChannel> inputChannelList = new ArrayList<InputChannel>();
+//  protected List<InputChannel> inputChannelList = new ArrayList<InputChannel>();
 
-  protected ConcurrentHashMap<String, InputChannel> inputChannelMap = new ConcurrentHashMap<String, InputChannel>();
+  protected ConcurrentHashMap<ENDPOINT_TYPE, InputChannel> inputChannelMap = 
+		  new ConcurrentHashMap<>();
 
   private UimaEEAdminContext adminContext;
 
@@ -219,6 +225,9 @@ public abstract class BaseAnalysisEngine
   
   private Object lock = new Object();
 
+  protected Map<String, OutputChannel> outputChannelMap = 
+		  new HashMap<String, OutputChannel>();
+ 
   // Local cache for this controller only. This cache stores state of
   // each CAS. The actual CAS is still stored in the global cache. The
   // local cache is used to determine when each CAS can be removed as
@@ -260,6 +269,8 @@ public abstract class BaseAnalysisEngine
   
   private String serviceName=null;
   
+  private String serviceId="";
+  
   protected UimaContext uimaContext=null;
   
   public abstract void dumpState(StringBuffer buffer, String lbl1);
@@ -297,7 +308,10 @@ public abstract class BaseAnalysisEngine
           int aComponentCasPoolSize, long anInitialCasHeapSize, String anEndpointName,
           String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache,
           Map aDestinationMap, JmxManagement aJmxManagement,boolean disableJCasCache) throws Exception {
-    casManager = aCasManager;
+    
+	System.out.println("C'tor Called Descriptor:"+aDescriptor);
+
+	casManager = aCasManager;
     inProcessCache = anInProcessCache;
     localCache = new LocalCache(this);
     aeDescriptor = aDescriptor;
@@ -456,11 +470,25 @@ public abstract class BaseAnalysisEngine
     }
 
     // Register InProcessCache with JMX under the top level component
-    if (inProcessCache != null && isTopLevelComponent()) {
-      inProcessCache.setName(jmxManagement.getJmxDomain() + jmxContext + ",name="
-              + inProcessCache.getName());
-      ObjectName on = new ObjectName(inProcessCache.getName());
-      jmxManagement.registerMBean(inProcessCache, on);
+//    if (inProcessCache != null && isTopLevelComponent()) {
+//      inProcessCache.setName(jmxManagement.getJmxDomain() + jmxContext + ",name="
+//              + inProcessCache.getName());
+//      ObjectName on = new ObjectName(inProcessCache.getName());
+//      jmxManagement.registerMBean(inProcessCache, on);
+//    }
+    if (inProcessCache != null && !inProcessCache.isRegisteredWithJMX() && isTopLevelComponent()) {
+  	  
+    	if ( jmxManagement.isRegistered(new ObjectName("org.apache.uima:type=ee.jms.services,s=Test Aggregate TAE Uima EE Service,p0=Test Aggregate TAE Components,name=InProcessCache"))) {
+    		System.out.println("!!!!!!!!!!!!!!!!!! InProcessCache Already Registered"); 
+    	} else {
+    		System.out.println(">>>>>>>>> Registering InProcessCache with JMX");
+        	inProcessCache.setName(jmxManagement.getJmxDomain() + jmxContext + ",name="
+                    + inProcessCache.getName());
+            System.out.println("-------->>>>>>>>> InProcessCache Object Name:"+inProcessCache.getName());
+            ObjectName on = new ObjectName(inProcessCache.getName());
+            jmxManagement.registerMBean(inProcessCache, on);
+            inProcessCache.setRegisteredWithJMX();
+    	}
     }
     initializeServiceStats();
 
@@ -582,7 +610,7 @@ public abstract class BaseAnalysisEngine
   public AnalysisEngineController getParentController() {
     return parentController;
   }
-
+/*
   public UimaTransport getTransport(String aKey) throws Exception {
     return getTransport(null, aKey);
   }
@@ -609,10 +637,11 @@ public abstract class BaseAnalysisEngine
 
     return transport;
   }
-
+*/
   /**
    * Initializes transport used for internal messaging between collocated Uima AS services.
    */
+  /*
   public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception {
     // If this controller is an Aggregate Controller, force delegates to initialize
     // their internal transports.
@@ -686,7 +715,7 @@ public abstract class BaseAnalysisEngine
     }
 
   }
-
+*/
   public synchronized UimaMessageListener getUimaMessageListener(String aDelegateKey) {
     return messageListeners.get(aDelegateKey);
   }
@@ -818,6 +847,10 @@ public abstract class BaseAnalysisEngine
   public void addUimaObject(String objectName ) throws Exception {
 	  jmxManagement.addObject(objectName);
   }
+  public void addOutputChannel(OutputChannel outputChannel) throws Exception {
+	  outputChannelMap.put( outputChannel.getType().name(), outputChannel);  
+//	  this.outputChannel = outputChannel;
+  }
   /**
    * Register a component with a given name with JMX MBeanServer
    * 
@@ -867,6 +900,7 @@ public abstract class BaseAnalysisEngine
 
     registerWithAgent(servicePerformance, name);
     servicePerformance.setIdleTime(System.nanoTime());
+    /*
     ServiceInfo serviceInfo = null;
     if (remote) {
       serviceInfo = getInputChannel().getServiceInfo();
@@ -886,6 +920,7 @@ public abstract class BaseAnalysisEngine
         }
       }
     }
+    */
     ServiceInfo pServiceInfo = null;
 
     if (this instanceof PrimitiveAnalysisEngineController) {
@@ -906,8 +941,12 @@ public abstract class BaseAnalysisEngine
       }
 
       
+//      name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
+//              + serviceInfo.getLabel();
+      
       name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
-              + serviceInfo.getLabel();
+              + pServiceInfo.getLabel();
+
       if (!isTopLevelComponent()) {
         pServiceInfo.setBrokerURL("Embedded Broker");
       } else {
@@ -1092,7 +1131,7 @@ public abstract class BaseAnalysisEngine
 
   public void setInputChannel(InputChannel anInputChannel) throws Exception {
     inputChannel = anInputChannel;
-    inputChannelList.add(anInputChannel);
+//    inputChannelList.add(anInputChannel);
 
     inputChannelLatch.countDown();
     if (!registeredWithJMXServer) {
@@ -1102,6 +1141,7 @@ public abstract class BaseAnalysisEngine
   }
 
   public void addInputChannel(InputChannel anInputChannel) {
+	  /*
 	  inputChannelLatch.countDown();
     if (!inputChannelMap.containsKey(anInputChannel.getInputQueueName())) {
       inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
@@ -1109,18 +1149,65 @@ public abstract class BaseAnalysisEngine
         inputChannelList.add(anInputChannel);
       }
     }
-  }
+    */
+    if (!inputChannelMap.containsKey(anInputChannel.getType())) {
+  	  inputChannelMap.put(anInputChannel.getType(),anInputChannel);
+        inputChannelLatch.countDown();
+        if (!registeredWithJMXServer) {
+          registerServiceWithJMX(jmxContext, false);
+          registeredWithJMXServer = true;
+
+        }
+
+    }
+    if ( anInputChannel.getChannelType().equals(ChannelType.REQUEST_REPLY)) {
+  	  inputChannel = anInputChannel;
+    }
 
+  }
   public InputChannel getInputChannel() {
-    try {
+	  return inputChannel;
+  }
+  public InputChannel getInputChannel(ENDPOINT_TYPE type) {
+    /*
+	  try {
       inputChannelLatch.await();
 
     } catch (Exception e) {
     }
 
     return inputChannel;
+    */
+    return inputChannelMap.get(type);
+  }
+  public InputChannel getInputChannel(Class<?> clz) {
+	  return inputChannel;
+  }
+  public List<Listener> getAllListeners() {
+	  List<Listener> listeners = new ArrayList<Listener>();
+		if ( getInputChannel(ENDPOINT_TYPE.JMS) != null ) {
+			listeners.addAll(getInputChannel(ENDPOINT_TYPE.JMS).getListeners());
+		}   
+		if ( getInputChannel(ENDPOINT_TYPE.DIRECT) != null ) {
+			listeners.addAll(getInputChannel(ENDPOINT_TYPE.DIRECT).getListeners());
+		}
+		return listeners;
+  }
+  public void setDirectInputChannel(DirectInputChannel anInputChannel) throws Exception {
+ //   inputChannel = anInputChannel;
+    
+  //  inputChannelList.add(anInputChannel);
+	addInputChannel(anInputChannel);
+	   
+    inputChannelLatch.countDown();
+    if (!registeredWithJMXServer) {
+      registeredWithJMXServer = true;
+      registerServiceWithJMX(jmxContext, false);
+    }
+  }
+  public void setJmsInputChannel(InputChannel anInputChannel) throws Exception {
+	   addInputChannel(anInputChannel);
   }
-
   public void dropCAS(CAS aCAS) {
     if (aCAS != null) {
       // Check if this method was called while another thread is stopping the service.
@@ -1180,7 +1267,7 @@ public abstract class BaseAnalysisEngine
           if (!isStopped()) {
             Endpoint endpoint = (Endpoint) anErrorContext.get(AsynchAEMessage.Endpoint);
             if ( endpoint != null && !"WarmupDelegate".equals(endpoint.getDelegateKey() ) ) {
-              getOutputChannel().sendReply((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR), 
+              getOutputChannel(endpoint).sendReply((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR), 
                       casReferenceId, parentCasReferenceId,
                       endpoint, AsynchAEMessage.Process);
             }
@@ -1346,7 +1433,39 @@ public abstract class BaseAnalysisEngine
     }
 
   }
-
+  public void dropCASFromLocal(String aCasReferenceId) {
+	  try {
+		    CacheEntry entry = null ;
+		    if ( inProcessCache.entryExists(aCasReferenceId)) {
+		        entry = inProcessCache.getCacheEntryForCAS(aCasReferenceId);
+		    }
+		    if ( entry != null ) {
+			      // Release semaphore which throttles ingestion of CASes from service
+			      // input queue.
+		        Semaphore semaphore=null;
+		        if ( !isPrimitive() && (semaphore = entry.getThreadCompletionSemaphore()) != null ) {
+		          semaphore.release();
+		        }
+		        if (localCache.containsKey(aCasReferenceId)) {
+		            try {
+		              localCache.lookupEntry(aCasReferenceId).setDropped(true);
+		            } catch (Exception e) {
+		          	    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+		          	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "dropCASFromLocal",
+		          	                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_parent_cas_notin_cache__INFO",
+		          	                new Object[] {getComponentName(), aCasReferenceId  });
+		          	      }
+		            }
+		            localCache.remove(aCasReferenceId);
+		          }
+		        // Remove stats from the map maintaining CAS specific stats
+		        if (perCasStatistics.containsKey(aCasReferenceId)) {
+		          perCasStatistics.remove(aCasReferenceId);
+		        }
+		    }
+	  } catch( Exception e) {
+      }
+  }
   public void forceTimeoutOnPendingCases(String key) {
     Delegate delegate = ((AggregateAnalysisEngineController) this).lookupDelegate(key);
     // Cancel the delegate timer. No more responses are expected
@@ -1546,15 +1665,33 @@ public abstract class BaseAnalysisEngine
 
     return false;
   }
-
+/*
   public OutputChannel getOutputChannel() {
     return outputChannel;
   }
-
+*/
   public void setOutputChannel(OutputChannel outputChannel) throws Exception {
-    this.outputChannel = outputChannel;
+//    this.outputChannel = outputChannel;
+    outputChannelMap.put(outputChannel.getType().name(), outputChannel);
+
+  }
+  public OutputChannel getOutputChannel(ENDPOINT_TYPE et) {
+	return outputChannelMap.get(et.name());
+//	  return outputChannel;
+  }
+  public OutputChannel getOutputChannel(Endpoint anEndpoint) {
+      ENDPOINT_TYPE et  = ENDPOINT_TYPE.DIRECT;
+      
+      if ( anEndpoint.getServerURI().indexOf("http") > -1 ||
+    		  anEndpoint.getServerURI().indexOf("tcp") > -1 ) {
+    	 et = ENDPOINT_TYPE.JMS;
+      }
+//System.out.println("............... OutputChannel type:"+et.name());
+      OutputChannel oc = outputChannelMap.get(et.name());
+	return oc;
   }
 
+
   public AsynchAECasManager getCasManagerWrapper() {
     return casManager;
   }
@@ -1567,10 +1704,15 @@ public abstract class BaseAnalysisEngine
     return inProcessCache;
   }
 
-  protected ResourceSpecifier getResourceSpecifier() {
+  public ResourceSpecifier getResourceSpecifier() {
     return resourceSpecifier;
   }
-
+  public void setServiceId(String sid) {
+	  serviceId = sid;
+  }
+  public String getServiceId() {
+	  return serviceId;
+  }
   public String getName() {
     return endpointName;
   }
@@ -1794,12 +1936,12 @@ public abstract class BaseAnalysisEngine
 
   public String getBrokerURL() {
     // Wait until the connection factory is injected by Spring
-    while (System.getProperty("BrokerURI") == null) {
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException ex) {
-      }
-    }
+//    while (System.getProperty("BrokerURI") == null) {
+//      try {
+//        Thread.sleep(50);
+//      } catch (InterruptedException ex) {
+//      }
+//    }
     return System.getProperty("BrokerURI");
   }
 
@@ -1909,12 +2051,11 @@ public abstract class BaseAnalysisEngine
     /*
      * Send an exception to the client if this is a top level service
      */
-    if (cause != null && aCasReferenceId != null && getOutputChannel() != null
-            && isTopLevelComponent()) {
+    if (cause != null && aCasReferenceId != null && isTopLevelComponent()) {
       Endpoint clientEndpoint = null;
       if ((clientEndpoint = getClientEndpoint()) != null) {
         try {
-          getOutputChannel().sendReply(cause, aCasReferenceId, null, clientEndpoint,
+          getOutputChannel(clientEndpoint).sendReply(cause, aCasReferenceId, null, clientEndpoint,
                   clientEndpoint.getCommand());
         } catch (Exception e) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -1937,11 +2078,16 @@ public abstract class BaseAnalysisEngine
               UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stop__INFO",
               new Object[] { getComponentName() });
     }
-    if (getOutputChannel() != null) {
-      getOutputChannel().cancelTimers();
+    for( Entry<String, OutputChannel> oce : outputChannelMap.entrySet()) {
+    	if ( oce.getValue() != null ) {
+    		oce.getValue().cancelTimers();
+    	}
     }
+//    if (getOutputChannel() != null) {
+//      getOutputChannel().cancelTimers();
+//    }
     
-    if (this instanceof PrimitiveAnalysisEngineController) {
+    if (isPrimitive()) {
       getControllerLatch().release();
       // Stops the input channel of this service
       stopInputChannels(InputChannel.CloseAllChannels, shutdownNow);
@@ -1986,8 +2132,17 @@ public abstract class BaseAnalysisEngine
       adminContext = null;
     } else {
       // Stop output channel
-      getOutputChannel().stop();
-      
+        for( Entry<String, OutputChannel> oce : outputChannelMap.entrySet()) {
+        	if ( oce.getValue() != null ) {
+        	    oce.getValue().stop();
+        	}
+          }
+      //getOutputChannel().stop();
+        try {
+            // Remove all MBeans registered by this service
+            jmxManagement.destroy();
+          } catch (Exception e) {
+          }
       try {
         getInProcessCache().destroy();
       } catch (Exception e) {
@@ -2002,9 +2157,9 @@ public abstract class BaseAnalysisEngine
     if (statsMap != null) {
       statsMap.clear();
     }
-    if (inputChannelList != null) {
-      inputChannelList.clear();
-    }
+//    if (inputChannelList != null) {
+//      inputChannelList.clear();
+//    }
     //inputChannel = null;
 
     if (serviceErrorMap != null) {
@@ -2031,9 +2186,9 @@ public abstract class BaseAnalysisEngine
     if (threadStateMap != null) {
       threadStateMap.clear();
     }
-    if (inputChannelMap != null) {
-      inputChannelMap.clear();
-    }
+//    if (inputChannelMap != null) {
+//      inputChannelMap.clear();
+//    }
     if (controllerListeners != null) {
       controllerListeners.clear();
     }
@@ -2082,10 +2237,18 @@ public abstract class BaseAnalysisEngine
       // we proceed with the shutdown of delegates and finally of the top level service.
       if (isTopLevelComponent()) {
           getInputChannel().setTerminating();
-
+          try {
+              // Stops all input channels of this service, but keep temp reply queue input channels open
+              // to process replies.
+              stopReceivingCASes(false);  // dont kill listeners on temp queues. The remotes may send replies
+ 	  
+          } catch( Exception e) {
+        	  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                      "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                      "UIMAEE_exception__WARNING", e);
+          }
         // Stops all input channels of this service, but keep temp reply queue input channels open
         // to process replies.
-        stopReceivingCASes(false);  // dont kill listeners on temp queues. The remotes may send replies
         if ( this instanceof PrimitiveAnalysisEngineController_impl &&
         		((PrimitiveAnalysisEngineController_impl)this).aeInstancePool != null ) {
         	//	Since we are quiescing, destroy all AEs that are in AE pool. Those that
@@ -2120,8 +2283,11 @@ public abstract class BaseAnalysisEngine
           stopReceivingCASes(true);
           stopInputChannels(InputChannel.InputChannels, true);  
           System.out.println("UIMA-AS Service is Stopping, All CASes Have Been Processed");
-        } catch( InterruptedException e) {
-          
+        } catch( Exception e) {
+           	UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                    "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAEE_exception__WARNING", e);
+
         }
         stop(true); 
       }
@@ -2163,16 +2329,26 @@ public abstract class BaseAnalysisEngine
       ((BaseAnalysisEngineController) parentController).stop();
     } else if (!isStopped()) {
       stopDelegateTimers();
-      getOutputChannel().cancelTimers();
-      InputChannel iC = getInputChannel(endpointName);
+      for( Entry<String, OutputChannel> oce : outputChannelMap.entrySet()) {
+      	if ( oce.getValue() != null ) {
+      		oce.getValue().cancelTimers();
+      	}
+      }
+     // getOutputChannel().cancelTimers();
+      InputChannel iC = getInputChannel();
       if ( iC != null) {
           iC.setTerminating();
       }
-      // Stop the inflow of new input CASes
-      stopInputChannel(true);  // shutdownNow
-       if ( iC != null ) {
-        iC.terminate();
+      try {
+    	  // Stop the inflow of new input CASes
+          stopInputChannel(true);  // shutdownNow
+           if ( iC != null ) {
+            iC.terminate();
+          }
+      } catch (Exception e) {
+    	  
       }
+
       stopCasMultipliers();
       stopTransportLayer();
       if (cause != null && aCasReferenceId != null) {
@@ -2247,7 +2423,7 @@ public abstract class BaseAnalysisEngine
             Endpoint freeCasNotificationEndpoint = casEntry.getFreeCasNotificationEndpoint();
             if (freeCasNotificationEndpoint != null) {
               freeCasNotificationEndpoint.setCommand(AsynchAEMessage.Stop);
-              getOutputChannel().sendRequest(AsynchAEMessage.Stop, aCasReferenceId,
+              getOutputChannel(freeCasNotificationEndpoint).sendRequest(AsynchAEMessage.Stop, aCasReferenceId,
                       freeCasNotificationEndpoint);
             }
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -2298,7 +2474,14 @@ public abstract class BaseAnalysisEngine
    * Stops a listener on the main input channel
    * @param shutdownNow stop
    */
-  protected void stopInputChannel(boolean shutdownNow) {
+  protected void stopInputChannel(boolean shutdownNow) throws Exception {
+	  for( Listener listener : inputChannel.getListeners()) {
+		  if ( listener.getType().equals(Type.GetMeta) || 
+			   listener.getType().equals(Type.ProcessCAS)) {
+			  inputChannel.disconnectListenerFromQueue(listener);
+		  }
+	  }
+	  /*
     InputChannel iC = getInputChannel(endpointName);
     if (iC != null && !iC.isStopped()) {
       try {
@@ -2312,15 +2495,27 @@ public abstract class BaseAnalysisEngine
         }
       }
     }
+    */
   }
   private void setInputChannelForNoRecovery() {
+	  /*
 	  if ( inputChannelMap.size() > 0 ) {
 		  InputChannel iC = getInputChannel();
 		  iC.setTerminating();
 	  }
+	  */
+	  inputChannel.setTerminating();
   }
-  protected void stopReceivingCASes(boolean stopAllListeners)  {
-	  
+  private boolean isTempListener( Listener listener ) {
+	  return listener.getType().equals(Type.Reply) || listener.getType().equals(Type.FreeCAS);
+  }
+  protected void stopReceivingCASes(boolean stopAllListeners) throws Exception {
+	  for( Listener listener : inputChannel.getListeners()) {
+		  if ( stopAllListeners || !isTempListener(listener) ) {
+			  inputChannel.disconnectListenerFromQueue(listener);
+		  }
+	  }
+/*
 	    InputChannel iC = null;
 	    setInputChannelForNoRecovery();
 	    Iterator<String> it = inputChannelMap.keySet().iterator();
@@ -2357,10 +2552,21 @@ public abstract class BaseAnalysisEngine
 	        }
 	      }
 	    }
-	  
+	  */
   }
   protected void stopInputChannels( int channelsToStop, boolean shutdownNow) {   //, boolean norecovery) {
-	    InputChannel iC = null;
+	  try {
+		  for( Entry<BaseAnalysisEngineController.ENDPOINT_TYPE, InputChannel> ic : inputChannelMap.entrySet()) {
+			  ic.getValue().stop(shutdownNow);
+		  }
+		 // inputChannel.stop(shutdownNow);
+	  } catch( Exception e ) {
+		  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                  "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAEE_exception__WARNING", e); 
+	  }
+	  /*
+	  InputChannel iC = null;
 	    setInputChannelForNoRecovery();
 	    Iterator it = inputChannelMap.keySet().iterator();
 	    int i = 1;
@@ -2401,7 +2607,7 @@ public abstract class BaseAnalysisEngine
 	        }
 	      }
 	    }
-	  
+	  */
   }
   /**
    * Aggregates have more than one Listener channel. This method stops all configured input channels
@@ -2426,7 +2632,7 @@ public abstract class BaseAnalysisEngine
     }
     return null;
   }
-
+/*
   public InputChannel getInputChannel(String anEndpointName) {
 
     for (int i = 0; inputChannelList != null && i < inputChannelList.size(); i++) {
@@ -2437,7 +2643,7 @@ public abstract class BaseAnalysisEngine
     }
     return null;
   }
-
+*/
 //  public InputChannel getReplyInputChannel(String aDelegateKey) {
   public InputChannel getReplyInputChannel(String aDestination) {
 	  InputChannel IC = null;
@@ -2916,6 +3122,7 @@ public abstract class BaseAnalysisEngine
     	  if ( anEndpoint.getServerURI().equals("vm://localhost?broker.persistent=false")) {
     		  anEndpoint.setRemote(true);
     	  }
+    	  /*
         if (!anEndpoint.isRemote()) {
           ByteArrayOutputStream bos = new ByteArrayOutputStream();
           try {
@@ -2946,8 +3153,12 @@ public abstract class BaseAnalysisEngine
         } else {
           getOutputChannel().sendReply(metadata, anEndpoint, true);
         }
+        */
+          getOutputChannel(anEndpoint).sendReply(metadata, anEndpoint, true);
+
       }
     } catch (Exception e) {
+    	e.printStackTrace();
       HashMap map = new HashMap();
       map.put(AsynchAEMessage.Endpoint, anEndpoint);
       map.put(AsynchAEMessage.MessageType, Integer.valueOf(AsynchAEMessage.Request));

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,35 @@
+package org.apache.uima.aae.controller;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+
+public class ControllerStatusListener implements ControllerCallbackListener  {
+	CountDownLatch latch;
+	public ControllerStatusListener(CountDownLatch latch) {
+		this.latch = latch;
+	}
+	public void notifyOnTermination(String aMessage, EventTrigger cause) {
+	}
+
+	public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
+	}
+
+	public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
+		System.out.println("------- Controller:"+aController.getName()+" Initialized");
+		latch.countDown();
+	}
+
+	public void notifyOnInitializationFailure(Exception e) {
+	}
+
+	public void notifyOnInitializationSuccess() {
+	}
+
+	public void notifyOnReconnecting(String aMessage) {
+	}
+
+	public void notifyOnReconnectionSuccess() {
+	}
+
+}

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,127 @@
+package org.apache.uima.aae.controller;
+
+import org.apache.uima.cas.SerialFormat;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public class DelegateEndpoint {
+	public class Builder {
+		private Endpoint_impl e1 = new Endpoint_impl();
+		private String key;
+		private String uri;
+		private String endpointName;
+		private Object replyTo;
+		private String replyDestinatioName;
+		private int getMetaTimeout=0;
+		private int processTimeout=0;
+		private String getMetaActionOnError;
+		private String processActionOnError;
+		private int getMetaMaxRetries=0;
+		private int processsMaxRetries=0;
+		private boolean isTempReplyDestination;
+		private int collectionProcessCompleteTimeout=0;
+		private String serializer=SerialFormat.XMI.getDefaultFileExtension();
+		private int scaleout=1;
+		private String descriptor;
+		private ResourceSpecifier rs;
+		private boolean isRemote;
+		
+		public Builder setRemote(boolean isRemote) {
+			this.isRemote = isRemote;
+			if ( !isRemote ) {
+				setServerURI("java");
+			}
+			return this;
+		}
+		public Builder withResourceSpecifier(ResourceSpecifier rs) {
+			this.rs = rs;
+			setDescriptor(rs.getSourceUrlString());
+			return this;
+		}
+		public Builder setTempDestination(boolean isTempReplyDestination) {
+			this.isTempReplyDestination = isTempReplyDestination;
+			return this;
+		}
+		public Builder setDescriptor(String descriptor) {
+			this.descriptor = descriptor;
+			return this;
+		}
+
+		public Builder setScaleout(int scaleout) {
+			this.scaleout = scaleout;
+			return this;
+		}
+
+		public Builder withDelegateKey(String key) {
+			this.key = key;
+			return this;
+		}
+
+		public Builder setServerURI(String uri) {
+			this.uri = uri;
+			return this;
+		}
+
+		public Builder withEndpointName(String endpointName) {
+			this.endpointName = endpointName;
+			return this;
+		}
+
+		public Builder setReplyToDestination(Object replyTo) {
+			this.replyTo = replyTo;
+			return this;
+		}
+
+		public Builder setReplyDestinationName(String replyDestinatioName) {
+			this.replyDestinatioName = replyDestinatioName;
+			return this;
+		}
+
+		public Builder setGetMetaErrorHandlingParams(int timeout, int maxRetries, String action) {
+			this.getMetaTimeout = timeout;
+			this.getMetaActionOnError = action;
+			this.getMetaMaxRetries = maxRetries;
+			return this;
+		}
+
+		public Builder setProcessErrorHandlingParams(int timeout, int maxRetries, String action) {
+			this.processTimeout = timeout;
+			this.processActionOnError = action;
+			this.processsMaxRetries = maxRetries;
+			return this;
+		}
+
+		public Builder setCollectionProcessCompleteTimeout(int timeout) {
+			this.collectionProcessCompleteTimeout = timeout;
+			return this;
+		}
+
+		public Builder setSerializer(String serializer) {
+			this.serializer = serializer;
+			return this;
+		}
+
+		public Endpoint_impl build() {
+			e1.setDelegateKey(key);
+			e1.setServerURI(uri);
+			e1.setEndpoint(endpointName);
+			e1.setDescriptor(descriptor);
+			e1.setConcurrentRequestConsumers(scaleout);
+			e1.setDestination(null); 
+			e1.setReplyToEndpoint(replyDestinatioName);
+			e1.setMetadataRequestTimeout(getMetaTimeout);
+			e1.setProcessRequestTimeout(processTimeout);
+			e1.setTempReplyDestination(isTempReplyDestination);
+			e1.setCollectionProcessCompleteTimeout(0);
+			e1.setSerializer(SerialFormat.XMI.getDefaultFileExtension());
+			e1.setRemote(isRemote);
+//			if ( uri != null && uri.length() > 0 && (uri.contains("tcp:") || uri.contains("http:") )) {
+//				e1.setRemote(true);
+//			} else {
+//				e1.setRemote(false);
+//			}
+			e1.setResourceSpecifier(rs);
+			return e1;
+		}
+
+	}
+}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java Mon Feb 26 18:54:11 2018
@@ -31,8 +31,16 @@ public interface Endpoint {
 
   public static final int DISABLED = 3;
 
+  public boolean isJavaRemote();
+  
+  public void setJavaRemote();
+  
   public int getMetadataRequestTimeout();
 
+  public void setDisableJCasCache(boolean disableOrEnable);
+  
+  public void setCollectionProcessCompleteTimeout(int cpcTimeout);
+  
   public void setController(AnalysisEngineController aController);
 
   public void startCheckpointTimer();
@@ -47,6 +55,10 @@ public interface Endpoint {
 
   public void setReplyEndpoint(boolean tORf);
 
+  public void setReplyDestination(Object replyDestination);
+
+  public Object getReplyDestination();
+
   public boolean isReplyEndpoint();
 
   public void setProcessRequestTimeout(int processRequestTimeout);
@@ -129,6 +141,10 @@ public interface Endpoint {
 
   public void setDestination(Object aDestination);
 
+  public void setGetMetaDestination(Object aDestination);
+
+  public Object getMetaDestination();
+
   public void setCommand(int aCommand);
 
   public int getCommand();

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java Mon Feb 26 18:54:11 2018
@@ -27,14 +27,21 @@ import org.apache.uima.aae.jmx.ServiceIn
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.cas.impl.TypeSystemImpl;
+import org.apache.uima.resource.ResourceSpecifier;
 
 public class Endpoint_impl implements Endpoint, Cloneable {
-  private static final Class CLASS_NAME = Endpoint_impl.class;
+  private static final Class<?> CLASS_NAME = Endpoint_impl.class;
 
+  private volatile boolean javaRemote=false;
+  
   private volatile Object destination = null;
 
   private String endpoint;  // is the queue name (only)
 
+  private volatile Object getMetaDestination = null;
+  
+  private volatile Object replyDestination = null;
+
   private String serverURI;
 
   private volatile boolean initialized;
@@ -45,11 +52,11 @@ public class Endpoint_impl implements En
 
   private volatile boolean waitingForResponse;
 
-  private int metadataRequestTimeout;
+  private int metadataRequestTimeout=0;
 
-  private int processRequestTimeout;
+  private int processRequestTimeout=0;
 
-  private int collectionProcessCompleteTimeout;
+  private int collectionProcessCompleteTimeout=0;
 
   private volatile boolean isRemote;
 
@@ -57,7 +64,7 @@ public class Endpoint_impl implements En
 
   private SerialFormat serialFormat = null;
   
-  private String serializer = "xmi";  // spring bean interface
+  private String serializer = SerialFormat.XMI.getDefaultFileExtension(); 
 
   private volatile boolean finalEndpoint;
 
@@ -121,6 +128,23 @@ public class Endpoint_impl implements En
   
   private volatile boolean disableJCasCache;
   
+ private ResourceSpecifier resourceSpecifier;
+  
+  public void setJavaRemote() {
+	 javaRemote = true;
+  }
+  public boolean isJavaRemote() {
+	  return javaRemote;
+  }
+  public void setReplyDestination(Object replyDestination) {
+	  this.replyDestination = replyDestination;
+  }
+  public Object getReplyDestination() {
+	  return replyDestination;
+  }
+  public void setResourceSpecifier(ResourceSpecifier rs ) {
+	  this.resourceSpecifier = rs;
+  }
   public boolean isDisableJCasCache() {
     return disableJCasCache;
   }
@@ -319,11 +343,6 @@ public class Endpoint_impl implements En
 
   public void setServerURI(String aServerURI) {
     this.serverURI = aServerURI;
-    if ( aServerURI != null && ( aServerURI.startsWith("vm:") == true && !aServerURI.equals("vm://localhost?broker.persistent=false")  ) ){
-      setRemote(false);
-    } else {
-      setRemote(true);
-    }
   }
 
   public void setWaitingForResponse(boolean isWaiting) {
@@ -436,7 +455,12 @@ public class Endpoint_impl implements En
     isRemote = aRemote;
 
   }
-
+  public void setGetMetaDestination(Object aDestination) {
+	  getMetaDestination = aDestination;
+  }
+  public Object getMetaDestination() {
+	  return getMetaDestination;
+  }
   public String getDescriptor() {
     return descriptor;
   }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Mon Feb 26 18:54:11 2018
@@ -71,7 +71,7 @@ public class LocalCache extends Concurre
       if (entry != null && entry.isSubordinate()) {
         // recursively call each parent until we get to the top of the
         // Cas hierarchy
-        parentCasReferenceId = lookupInputCasReferenceId(entry.getInputCasReferenceId());
+        parentCasReferenceId = lookupInputCasReferenceId(entry.getParentCasReferenceId());
       } else {
         return aCasReferenceId;
       }
@@ -85,14 +85,14 @@ public class LocalCache extends Concurre
       // recursively call each parent until we get to the top of the
       // Cas hierarchy
       parentCasReferenceId = lookupInputCasReferenceId((CasStateEntry) get(entry
-              .getInputCasReferenceId()));
+              .getParentCasReferenceId()));
     } else {
       return entry.getCasReferenceId();
     }
     return parentCasReferenceId;
   }
   public void dumpContents() {
-    dumpContents(false);
+    dumpContents(true);
   }
   public synchronized void dumpContents(boolean dump2Stdout) {
     int count = 0;
@@ -109,7 +109,7 @@ public class LocalCache extends Concurre
         if (casStateEntry.isSubordinate()) {
           sb.append(entry.getKey() + " Number Of Child CASes In Play:"
                   + casStateEntry.getSubordinateCasInPlayCount() + " Parent CAS id:"
-                  + casStateEntry.getInputCasReferenceId());
+                  + casStateEntry.getParentCasReferenceId());
         } else {
           sb.append(entry.getKey() + " *** Input CAS. Number Of Child CASes In Play:"
                   + casStateEntry.getSubordinateCasInPlayCount());
@@ -179,7 +179,7 @@ public class LocalCache extends Concurre
     CasStateEntry casStateEntry = lookupEntry(casReferenceId);
     if (casStateEntry.isSubordinate()) {
       // Recurse until the top CAS reference Id is found
-      return getTopCasAncestor(casStateEntry.getInputCasReferenceId());
+      return getTopCasAncestor(casStateEntry.getParentCasReferenceId());
     }
     // Return the top ancestor CAS id
     return casStateEntry;
@@ -187,7 +187,12 @@ public class LocalCache extends Concurre
 
   public static class CasStateEntry {
 	private String casReferenceId;
-
+	// id of a parent CAS 
+    private String parentCasReferenceId;
+    // stores the id of an input CAS sent by the client. This is used
+    // to identify client endpoint if the service is a CM.
+    private String inputCasReferenceId;
+    
     private volatile boolean waitingForChildren; // true if in FinalState and still has children in play
     
     private volatile boolean waitingForRealease;
@@ -212,8 +217,8 @@ public class LocalCache extends Concurre
 
     private Object childCountMux = new Object();
 
-    private String inputCasReferenceId;
-
+    private long seqNo;
+    
     private int numberOfParallelDelegates = 1;
 
     private Delegate lastDelegate = null;
@@ -222,6 +227,9 @@ public class LocalCache extends Concurre
 
     private Endpoint freeCasNotificationEndpoint;
 
+    // client endpoint where the input CAS will be returned
+    private Endpoint clientEndpoint;
+
     private volatile boolean deliveryToClientFailed;
     
     private String hostIpProcessingCAS;
@@ -249,6 +257,12 @@ public class LocalCache extends Concurre
     
     protected Map<String, AEMetrics > casMetrics = new TreeMap<String, AEMetrics>();
     
+    public Endpoint getClientEndpoint() {
+    	return clientEndpoint;
+    }
+    public void setClientEndpoint(Endpoint ce) {
+    	clientEndpoint = ce;
+    }
     public boolean waitingForChildrenToFinish() {
     	return waitingForChildren;
     }
@@ -297,7 +311,12 @@ public class LocalCache extends Concurre
 	public void setDeliveryToClientFailed() {
 		this.deliveryToClientFailed = true;
 	}
-
+	public void setSequenceNumber(long seq) {
+		this.seqNo = seq;
+	}
+	public long getSequenceNumber() {
+		return this.seqNo;
+	}
 	public boolean isDropped() {
       return dropped;
     }
@@ -330,15 +349,20 @@ public class LocalCache extends Concurre
       return casReferenceId;
     }
 
-    public String getInputCasReferenceId() {
-      return inputCasReferenceId;
+    public String getParentCasReferenceId() {
+      return parentCasReferenceId;
     }
 
-    public void setInputCasReferenceId(String anInputCasReferenceId) {
-      inputCasReferenceId = anInputCasReferenceId;
+    public void setParentCasReferenceId(String parentCasReferenceId) {
+      this.parentCasReferenceId = parentCasReferenceId;
       subordinateCAS = true;
     }
-
+    public String getInputCasReferenceId() {
+    	return inputCasReferenceId;
+    }
+    public void setInputCasReferenceId(String inputCasId) {
+    	inputCasReferenceId = inputCasId;
+    }
     public void setWaitingForRelease(boolean flag) {
       waitingForRealease = flag;
     }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Mon Feb 26 18:54:11 2018
@@ -63,6 +63,7 @@ import org.apache.uima.analysis_engine.A
 import org.apache.uima.analysis_engine.AnalysisEngineManagement;
 import org.apache.uima.analysis_engine.CasIterator;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
+import org.apache.uima.as.client.DirectMessageContext;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.CASImpl;
 import org.apache.uima.collection.CollectionReaderDescription;
@@ -263,7 +264,7 @@ public class PrimitiveAnalysisEngineCont
     	    }
       }
       AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, paramsMap);
-   
+      System.out.println("------------ initializeAnalysisEngine()-Created instance of AE:"+getComponentName()+" Thread iD:"+Thread.currentThread().getId());
       super.addUimaObject(ae.getManagementInterface().getUniqueMBeanName());
       //  Call to produceAnalysisEngine() may take a long time to complete. While this
         //  method was executing, the service may have been stopped. Before continuing 
@@ -476,7 +477,7 @@ public class PrimitiveAnalysisEngineCont
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "postInitialize",
                   UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_initialized_controller__INFO",
-                  new Object[] { getComponentName() });
+                  new Object[] { getComponentName(), super.getBrokerURL() });
           }
           super.serviceInitialized = true;
         }
@@ -549,6 +550,7 @@ public class PrimitiveAnalysisEngineCont
                 "UIMAEE_cpc_all_cases_processed__FINEST", new Object[] { getComponentName() });
       }
       getServicePerformance().incrementAnalysisTime(super.getCpuTime() - start);
+      /*
       if (!anEndpoint.isRemote()) {
         UimaTransport transport = getTransport(anEndpoint.getEndpoint());
         UimaMessage message = transport.produceMessage(AsynchAEMessage.CollectionProcessComplete,
@@ -558,6 +560,8 @@ public class PrimitiveAnalysisEngineCont
       } else {
         getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint, null, false);
       }
+*/
+      getOutputChannel(anEndpoint).sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint, null, false);
 
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(),
@@ -741,7 +745,14 @@ public class PrimitiveAnalysisEngineCont
             aem.getNumberOfCASesProcessed());
     
   }
-  
+  private void cancelStackDumpTimer(StackDumpTimer stackDumpTimer) {
+      if ( stackDumpTimer != null ) {
+    	  stackDumpTimer.cancel();
+    	  stackDumpTimer = null;   // nullify timer instance so that we dont have to worry about
+          // it in case an exception happens below
+      }
+
+  }
   /**
    * This is called when a Stop request is received from a client. Add the provided Cas id to the
    * list of aborted CASes. The process() method checks this list to determine if it should continue
@@ -758,9 +769,9 @@ public class PrimitiveAnalysisEngineCont
     if (stopped) {
       return;
     }
-    
-    List<AnalysisEnginePerformanceMetrics> beforeAnalysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
-    List<AnalysisEnginePerformanceMetrics> afterAnalysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
+    System.out.println("Service:"+getComponentName()+" CAS:"+aCasReferenceId+" CAS Hashcode:"+aCAS.hashCode());
+    List<AnalysisEnginePerformanceMetrics> beforeAnalysisManagementObjects = new ArrayList<>();
+    List<AnalysisEnginePerformanceMetrics> afterAnalysisManagementObjects = new ArrayList<>();
     CasStateEntry parentCasStateEntry = null;
     //	If enabled, keep a reference to a timer which
     //  when it expires, will cause a JVM to dump a stack
@@ -810,19 +821,15 @@ public class PrimitiveAnalysisEngineCont
       }
       
       CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
-      if ( stackDumpTimer != null ) {
-    	  stackDumpTimer.cancel();
-    	  stackDumpTimer = null;   // nullify timer instance so that we dont have to worry about
-          // it in case an exception happens below
-      }
-      
+      cancelStackDumpTimer(stackDumpTimer);
       // Store how long it took to call processAndOutputNewCASes()
       totalProcessTime = (super.getCpuTime() - time);
       long sequence = 1;
       long hasNextTime = 0; // stores time in hasNext()
-      long getNextTime = 0; // stores time in next();
+      long getNextTime = 0; // stores time in next()
       boolean moreCASesToProcess = true;
       boolean casAbortedDueToExternalRequest = false;
+      
       while (moreCASesToProcess) {
         long timeToProcessCAS = 0; // stores time in hasNext() and next() for each CAS
         hasNextTime = super.getCpuTime();
@@ -831,23 +838,25 @@ public class PrimitiveAnalysisEngineCont
         //	method is allowed to complete. If the method is not complete in allowed window
         //	the heap and stack trace dump of all threads will be produced.
         stackDumpTimer = ifEnabledStartHeapDumpTimer();
+        
+        /* ********************************************** */
+        /* CHECK IF THERE ARE MORE CHILD CASes TO PROCESS */
+        /* ********************************************** */
         if (!casIterator.hasNext()) {
           moreCASesToProcess = false;
           // Measure how long it took to call hasNext()
           timeToProcessCAS = (super.getCpuTime() - hasNextTime);
           totalProcessTime += timeToProcessCAS;
-          if ( stackDumpTimer != null ) {
-        	  stackDumpTimer.cancel();
-        	  stackDumpTimer = null;   // nullify timer instance so that we dont have to worry about
-              // it in case an exception happens below
-          }
+          cancelStackDumpTimer(stackDumpTimer);
+          /* ************************************* */
+          /* WE ARE DONE PROCESSING INPUT CAS HERE */
+          /* ************************************* */
           break; // from while
         }
-        if ( stackDumpTimer != null ) {
-        	stackDumpTimer.cancel();
-        	stackDumpTimer = null;   // nullify timer instance so that we dont have to worry about
-                                    // it in case an exception happens below
-        }
+
+        
+        cancelStackDumpTimer(stackDumpTimer);
+        
         // Measure how long it took to call hasNext()
         timeToProcessCAS = (super.getCpuTime() - hasNextTime);
         getNextTime = super.getCpuTime();
@@ -856,17 +865,20 @@ public class PrimitiveAnalysisEngineCont
         //	method is allowed to complete. If the method is not complete in allowed window
         //	the heap and stack trace dump of all threads will be produced.
         stackDumpTimer = ifEnabledStartHeapDumpTimer();
-        CAS casProduced = casIterator.next();
-        if ( stackDumpTimer != null ) {
-        	stackDumpTimer.cancel();
-        	stackDumpTimer = null;   // nullify timer instance so that we dont have to worry about
-            // it in case an exception happens below
-        }
+
+        /* ****************************** */
+        /* GET THE NEXT CHILD CAS         */
+        /* ****************************** */
+        CAS childCAS = casIterator.next();
+        
+        cancelStackDumpTimer(stackDumpTimer);
+
         // Add how long it took to call next()
         timeToProcessCAS += (super.getCpuTime() - getNextTime);
         // Add time to call hasNext() and next() to the running total
         totalProcessTime += timeToProcessCAS;
         casAbortedDueToExternalRequest = abortGeneratingCASes(aCasReferenceId);
+
         // If the service is stopped or aborted, stop generating new CASes and just return the input
         // CAS
         if (stopped || casAbortedDueToExternalRequest) {
@@ -891,8 +903,8 @@ public class PrimitiveAnalysisEngineCont
                 // We are either stopping the service or aborting input CAS due to explicit STOP
                 // request
                 // from a client. If a new CAS was produced, release it back to the pool.
-                if (casProduced != null) {
-                  casProduced.release();
+                if (childCAS != null) {
+                	childCAS.release();
                 }
               } catch (Exception e) {
             	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -935,24 +947,22 @@ public class PrimitiveAnalysisEngineCont
 //        OutOfTypeSystemData otsd = getInProcessCache().getOutOfTypeSystemData(aCasReferenceId);
         MessageContext mContext = getInProcessCache()
                 .getMessageAccessorByReference(aCasReferenceId);
-        CacheEntry newEntry = getInProcessCache().register(casProduced, mContext /*, otsd*/);
+        CacheEntry newEntry = getInProcessCache().register(childCAS, mContext /*, otsd*/);
         // if this Cas Multiplier is not Top Level service, add new Cas Id to the private
         // cache of the parent aggregate controller. The Aggregate needs to know about
         // all CASes it has in play that were generated from the input CAS.
         CasStateEntry childCasStateEntry = null;
         if (!isTopLevelComponent()) {
-          newEntry.setNewCas(true, parentController.getComponentName());
-          // Create CAS state entry in the aggregate's local cache
-          childCasStateEntry = parentController.getLocalCache().createCasStateEntry(
-                  newEntry.getCasReferenceId());
-          // Fetch the parent CAS state entry from the aggregate's local cache. We need to increment
-          // number of child CASes associated with it.
-          parentCasStateEntry = parentController.getLocalCache().lookupEntry(aCasReferenceId);
-        } else {
-          childCasStateEntry = getLocalCache().createCasStateEntry(newEntry.getCasReferenceId());
-        }
+            newEntry.setNewCas(true, parentController.getComponentName());
+            // Fetch the parent CAS state entry from the aggregate's local cache. We need to increment
+            // number of child CASes associated with it.
+            parentCasStateEntry = parentController.getLocalCache().lookupEntry(aCasReferenceId);
+          } 
+        childCasStateEntry = getLocalCache().createCasStateEntry(newEntry.getCasReferenceId());
+
         // Associate parent CAS (input CAS) with the new CAS.
-        childCasStateEntry.setInputCasReferenceId(aCasReferenceId);
+        childCasStateEntry.setParentCasReferenceId(aCasReferenceId);
+        childCasStateEntry.setInputCasReferenceId(parentCasStateEntry.getInputCasReferenceId());
         // Increment number of child CASes generated from the input CAS
         parentCasStateEntry.incrementSubordinateCasInPlayCount();
         parentCasStateEntry.incrementOutstandingFlowCounter();
@@ -960,6 +970,8 @@ public class PrimitiveAnalysisEngineCont
         // Associate input CAS with the new CAS
         newEntry.setInputCasReferenceId(aCasReferenceId);
         newEntry.setCasSequence(sequence);
+        childCasStateEntry.setSequenceNumber(sequence);
+        
         // Add to the cache how long it took to process the generated (subordinate) CAS
         getCasStatistics(newEntry.getCasReferenceId()).incrementAnalysisTime(timeToProcessCAS);
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -1007,48 +1019,26 @@ public class PrimitiveAnalysisEngineCont
                     "UIMAEE_exception__WARNING", exx);
         }
 
-        if (!anEndpoint.isRemote()) {
-          UimaTransport transport = getTransport(anEndpoint.getEndpoint());
-          UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
-                  AsynchAEMessage.Request, getName());
-          message.addStringProperty(AsynchAEMessage.CasReference, newEntry.getCasReferenceId());
-          message.addStringProperty(AsynchAEMessage.InputCasReference, aCasReferenceId);
-          message.addLongProperty(AsynchAEMessage.CasSequence, sequence);
-          ServicePerformance casStats = getCasStatistics(aCasReferenceId);
-
-          message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
-                  .getRawCasSerializationTime());
-          message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
-                  .getRawCasDeserializationTime());
-          message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
-          long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
-          message.addLongProperty(AsynchAEMessage.IdleTime, iT);
-          if (!stopped) {
-            transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
-            dropStats(newEntry.getCasReferenceId(), getName());
-          }
-        } else {
-          // Send generated CAS to the remote client
-          if (!stopped) {
-              getOutputChannel().sendReply(newEntry, anEndpoint);
-            
-              //	Check for delivery failure. The client may have terminated while an input CAS was being processed
-            if ( childCasStateEntry.deliveryToClientFailed() ) {
-              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-                    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
-                            UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delivery_to_client_failed_INFO",
-                            new Object[] { getComponentName(), aCasReferenceId });
-              }
-              clientUnreachable = true;
-              if ( cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) {
-              	  cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId());
-          	  }
-              //	Stop generating new CASes. We failed to send a CAS to a client. Most likely
-              //	the client has terminated. 
-          	  moreCASesToProcess = false; // exit the while loop
-          	  
-          	  dropCAS(childCasStateEntry.getCasReferenceId(), true);
+        // Send generated CAS to the client
+        if (!stopped) {
+            getOutputChannel(anEndpoint).sendReply(childCasStateEntry, anEndpoint);
+          
+            //	Check for delivery failure. The client may have terminated while an input CAS was being processed
+          if ( childCasStateEntry.deliveryToClientFailed() ) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
+                          UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delivery_to_client_failed_INFO",
+                          new Object[] { getComponentName(), aCasReferenceId });
             }
+            clientUnreachable = true;
+            if ( cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) {
+            	  cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId());
+        	  }
+            //	Stop generating new CASes. We failed to send a CAS to a client. Most likely
+            //	the client has terminated. 
+        	  moreCASesToProcess = false; // exit the while loop
+        	  
+        	  dropCAS(childCasStateEntry.getCasReferenceId(), true);
           }
         }
         // Remove new CAS state entry from the local cache if this is a top level primitive.
@@ -1117,6 +1107,8 @@ public class PrimitiveAnalysisEngineCont
       //  Create a List to hold per CAS analysisTime and total number of CASes processed
       //  by each AE. This list will be serialized and sent to the client
       List<AnalysisEnginePerformanceMetrics> performanceList = 
+    		  getCasMetricList(parentCasStateEntry, afterAnalysisManagementObjects, beforeAnalysisManagementObjects);
+      /*
         new ArrayList<AnalysisEnginePerformanceMetrics>();
       //  Diff the before process() performance metrics with post process performance
       //  metrics
@@ -1158,70 +1150,14 @@ public class PrimitiveAnalysisEngineCont
           }
         }
       }
+      */
       parentCasStateEntry.getAEPerformanceList().addAll(performanceList);
-      if (!anEndpoint.isRemote()) {
-        inputCASReturned = true;
-        UimaTransport transport = getTransport(anEndpoint.getEndpoint());
-
-        if (getInProcessCache() != null && getInProcessCache().getSize() > 0
-                && getInProcessCache().entryExists(aCasReferenceId)) {
-          try {
-            CacheEntry ancestor = 
-                      getInProcessCache().
-                        getTopAncestorCasEntry(getInProcessCache().getCacheEntryForCAS(aCasReferenceId));
-            if ( ancestor != null ) {
-               ancestor.addDelegateMetrics(getKey(), performanceList);
-            }
-          } catch (Exception e) {
-            // An exception be be thrown here if the service is being stopped.
-            // The top level controller may have already cleaned up the cache
-            // and the getCacheEntryForCAS() will throw an exception. Ignore it
-            // here, we are shutting down.
-          }
-        }          
-        
-        UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
-                AsynchAEMessage.Response, getName());
-        message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-        ServicePerformance casStats = getCasStatistics(aCasReferenceId);
 
-        message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
-                .getRawCasSerializationTime());
-        message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
-                .getRawCasDeserializationTime());
-        message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
-        long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
-        message.addLongProperty(AsynchAEMessage.IdleTime, iT);
-        // Send reply back to the client. Use internal (non-jms) transport
-        if (!stopped) {
-          transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
-          dropStats(aCasReferenceId, getName());
-        }
-      } else {
         try {
-        	List<AnalysisEnginePerformanceMetrics> perfMetrics =
-					new ArrayList<AnalysisEnginePerformanceMetrics>();
-          String aeName = getMetaData().getName();
-         
+          
           CacheEntry entry =
                   getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-          for( AnalysisEnginePerformanceMetrics m : performanceList ) {
- //       	  System.out.println("...............BEFORE:  Name:"+m.getName()+" UniqueName:"+m.getUniqueName()+" How Many="+m.getNumProcessed());
-				boolean aggregate = m.getUniqueName().startsWith("/"+aeName);
-				int pos = m.getUniqueName().indexOf("/",1);
-				String uName = m.getUniqueName();
-				if ( pos > -1 && aeInstancePool.size() > 1 && aeName != null && aggregate) {
-					String st = m.getUniqueName().substring(pos);
-					uName = "/"+aeName+st;
-				} 
-				AnalysisEnginePerformanceMetrics newMetrics = 
-						new AnalysisEnginePerformanceMetrics(m.getName(),uName,m.getAnalysisTime(), m.getNumProcessed());
-			//	System.out.println("... Metrics - AE:"+metrics.getUniqueName()+" AE Analysis Time:"+metrics.getAnalysisTime());
-				perfMetrics.add(newMetrics);
-//	        	  System.out.println("...............AFTER:  Name:"+newMetrics.getName()+" UniqueName:"+newMetrics.getUniqueName()+" How Many="+newMetrics.getNumProcessed());
-
-          }
-          entry.addDelegateMetrics(getKey(), perfMetrics); //performanceList);
+          entry.addDelegateMetrics(getKey(), performanceList);
         } catch (Exception e) {
           // An exception be be thrown here if the service is being stopped.
           // The top level controller may have already cleaned up the cache
@@ -1230,12 +1166,11 @@ public class PrimitiveAnalysisEngineCont
         }
 
         if (!stopped && !clientUnreachable ) {
-            getOutputChannel().sendReply(getInProcessCache().getCacheEntryForCAS(aCasReferenceId), anEndpoint);
+//            getOutputChannel(anEndpoint).sendReply(getInProcessCache().getCacheEntryForCAS(aCasReferenceId), anEndpoint);
+            getOutputChannel(anEndpoint).sendReply(getLocalCache().lookupEntry(aCasReferenceId), anEndpoint);
         }
 
         inputCASReturned = true;
-      }
-      
       // Remove input CAS state entry from the local cache
       if (!isTopLevelComponent()) {
         localCache.lookupEntry(aCasReferenceId).setDropped(true);
@@ -1297,13 +1232,67 @@ public class PrimitiveAnalysisEngineCont
         		((CASImpl) aCAS).enableReset(true);
         	}
           // Remove input CAS cache entry if the CAS has been sent to the client
-          dropCAS(aCasReferenceId, true);
+            MessageContext mContext = getInProcessCache()
+                    .getMessageAccessorByReference(aCasReferenceId);
+            if ( mContext != null && mContext instanceof DirectMessageContext) {
+            	dropCASFromLocal(aCasReferenceId);
+            } else {
+                // Remove input CAS cache entry if the CAS has been sent to the client
+                dropCAS(aCasReferenceId, true);
+            }
+        	//dropCAS(aCasReferenceId, true);
+ 
           localCache.dumpContents();
         }
       }
     }
   }
 
+  private List<AnalysisEnginePerformanceMetrics>  getCasMetricList(CasStateEntry parentCasStateEntry, List<AnalysisEnginePerformanceMetrics> afterAnalysisList, List<AnalysisEnginePerformanceMetrics> beforeAnalysisList) {
+      List<AnalysisEnginePerformanceMetrics> performanceList = 
+    	        new ArrayList<AnalysisEnginePerformanceMetrics>();
+	  //  Diff the before process() performance metrics with post process performance
+      //  metrics
+      for (AnalysisEnginePerformanceMetrics after : afterAnalysisList) {
+        for( AnalysisEnginePerformanceMetrics before: beforeAnalysisList) {
+        	if ( before.getUniqueName().equals(after.getUniqueName())) {
+        	  boolean found = false;
+        	  AnalysisEnginePerformanceMetrics metrics = null;
+        	  for( AnalysisEnginePerformanceMetrics met : parentCasStateEntry.getAEPerformanceList() ) {
+                  String un = after.getUniqueName();
+        		  if ( un.indexOf("Components") >= -1 ) {
+        			  un = un.substring(un.indexOf("/"));
+        		  }
+           		  if ( met.getUniqueName().equals(un)) {
+                      long at = after.getAnalysisTime()- before.getAnalysisTime();
+                      metrics = new AnalysisEnginePerformanceMetrics(after.getName(),
+                              un,//after.getUniqueName(),
+                              met.getAnalysisTime()+at,
+                              after.getNumProcessed());
+                      found = true;
+                      parentCasStateEntry.getAEPerformanceList().remove(met);
+                      break;
+        		  } 
+        	  }
+        	  if ( !found ) {
+        		  String un = after.getUniqueName();
+        		  
+        		  if ( un.indexOf("Components") >= -1 ) {
+        			  un = un.substring(un.indexOf("/"));
+        		  }
+                  metrics = new AnalysisEnginePerformanceMetrics(after.getName(),
+                          un,//after.getUniqueName(),
+                          after.getAnalysisTime()- before.getAnalysisTime(),
+                          after.getNumProcessed());
+        		  
+        	  }
+            performanceList.add(metrics);
+            break;
+          }
+        }
+      }
+      return performanceList;
+  }
   private void addConfigIntParameter(String aParamName, int aParamValue) {
     ConfigurationParameter cp = new ConfigurationParameter_impl();
     cp.setMandatory(false);
@@ -1414,6 +1403,13 @@ public class PrimitiveAnalysisEngineCont
 
   public void stop() {
     super.stop(true);  // shutdown now
+    if ( getLocalCache().size() > 0 ) {
+    	for( Entry<String, LocalCache.CasStateEntry> entry : getLocalCache().entrySet()) {
+    		System.out.println("........... Controller:"+getComponentName()+" - stop() - Releasing CAS:"+entry.getKey());
+    		releaseNextCas(entry.getKey());
+    	}
+    }
+
     if (aeInstancePool != null) {
       try {
         aeInstancePool.destroy();
@@ -1517,7 +1513,7 @@ public class PrimitiveAnalysisEngineCont
   
   /**
    * The HeapDumpTimer is optionally used to dump the heap if a task takes too much time to finish.
-   * It is enabled from the System property -DheapDumpThreshold=x where x is a number of seconds 
+   * It is enabled from the System property -DheapDumpThreshold=<x> where x is a number of seconds 
    * the task is allowed to complete. If the task is not completed, the heap dump will be created. 
    * 
    *

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Mon Feb 26 18:54:11 2018
@@ -822,6 +822,7 @@ public abstract class Delegate {
    *          - command for which the timer is started
    */
   private void startDelegateGetMetaTimer(final String aCasReferenceId, final int aCommand) {
+	  Thread.dumpStack();
     synchronized( getMetaTimerLock ) {
       final long timeToWait = getTimeoutValueForCommand(aCommand);
       Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java Mon Feb 26 18:54:11 2018
@@ -43,6 +43,6 @@ public interface ErrorHandler {
   public boolean handleError(Throwable t, ErrorContext anErrorContext,
           AnalysisEngineController aController);
 
-  public Map getEndpointThresholdMap();
+  public Map<String,Threshold> getEndpointThresholdMap();
 
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java Mon Feb 26 18:54:11 2018
@@ -225,7 +225,7 @@ public abstract class ErrorHandlerBase {
 
                 if (aController instanceof AggregateAnalysisEngineController && (masterEndpoint != null && masterEndpoint.getStatus() == Endpoint.FAILED)) {
                   // Fetch an InputChannel that handles messages for a given delegate
-                  InputChannel iC = aController.getReplyInputChannel(masterEndpoint.getDelegateKey());
+                  InputChannel iC = aController.getInputChannel();
                   // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
                   iC.createListener(masterEndpoint.getDelegateKey(), null);
                   iC.removeDelegateFromFailedList(masterEndpoint.getDelegateKey());

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java Mon Feb 26 18:54:11 2018
@@ -27,11 +27,21 @@ import java.util.Map;
 
 import org.apache.uima.aae.controller.AnalysisEngineController;
 
-public class ErrorHandlerChain extends LinkedList {
-  public ErrorHandlerChain(List aChainofHandlers) {
+public class ErrorHandlerChain extends LinkedList<ErrorHandler>  {
+  /**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
+  public ErrorHandlerChain(List<ErrorHandler> aChainofHandlers) {
     this.addAll(aChainofHandlers);
   }
+  public ErrorHandlerChain() {
+  }
 
+  public void setErrorHandler(ErrorHandler eh ) {
+	    this.add(eh);
+  }
   public Map getThresholds() {
     Map thresholds = new HashMap();
     Iterator iterator = this.iterator();
@@ -51,7 +61,7 @@ public class ErrorHandlerChain extends L
       if (t instanceof AsynchAEException && t.getCause() != null) {
         cause = t.getCause();
       }
-      Iterator iterator = this.iterator();
+      Iterator<ErrorHandler> iterator = this.iterator();
       while (errorHandled == false && iterator.hasNext()) {
         ErrorHandler handler = ((ErrorHandler) iterator.next());
         errorHandled = handler.handleError(cause, anErrorContext, aController);

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,89 @@
+package org.apache.uima.aae.error;
+
+import java.util.Map;
+
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
+import org.apache.uima.resourceSpecifier.GetMetadataErrorsType;
+import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
+
+public class Thresholds {
+  	public static Threshold newThreshold() {
+		Threshold t = new Threshold();
+		t.setAction("terminate");
+		t.setContinueOnRetryFailure(false);
+		t.setMaxRetries(0);
+		t.setThreshold(1);
+		t.setWindow(0);
+		return t;
+	}
+	public static Threshold getThreshold(String action, int maxRetries ) {
+		Threshold t1 = newThreshold();
+		t1.setAction(action);
+		t1.setContinueOnRetryFailure(false);
+		t1.setMaxRetries(maxRetries);
+		t1.setThreshold(1);
+		
+		return t1;
+	}
+	   public static Threshold getThresholdFor(ProcessCasErrorsType processCasErrors) {
+	    	Threshold t;
+	    	if ( processCasErrors == null ) {
+	        	t = newThreshold();
+	    	} else {
+	        	t = getThreshold(processCasErrors.getThresholdAction(), processCasErrors.getMaxRetries() );
+	        	t.setThreshold(processCasErrors.getThresholdCount());
+	        	t.setContinueOnRetryFailure(Boolean.valueOf(processCasErrors.getContinueOnRetryFailure()));
+	        	t.setWindow(processCasErrors.getThresholdWindow());
+	    	}
+	    	return t;
+	    }
+	   public static Threshold getThresholdFor(GetMetadataErrorsType metaErrors) {
+	    	Threshold t;
+	    	if ( metaErrors == null ) {
+	    		t = newThreshold();
+	    	} else {
+	    		//metaErrors.get
+	    		t = getThreshold(metaErrors.getErrorAction(), metaErrors.getMaxRetries() );
+	    		
+	    	}
+	    	return t;
+	    }
+	   public static Threshold getThresholdFor(CollectionProcessCompleteErrorsType cpcErrors) {
+	    	Threshold t;
+	    	if ( cpcErrors == null ) {
+	    		t = newThreshold();
+	    	} else {
+	    		t = getThreshold(cpcErrors.getAdditionalErrorAction(), 0);
+	    	}
+	    	return t;
+	    }
+		public static void addDelegateErrorThreshold(AnalysisEngineDelegate delegate, GetMetadataErrorsType errorHandler, Map<String, Threshold> thresholdMap) {
+			if ( errorHandler != null ) {
+				Threshold t = getThresholdFor(errorHandler);
+				delegate.setGetMetaTimeout(errorHandler.getTimeout());
+				thresholdMap.put(delegate.getKey(), t);
+			} else {
+				thresholdMap.put(delegate.getKey(), newThreshold());
+			}
+		}
+		public static void addDelegateErrorThreshold(AnalysisEngineDelegate delegate, ProcessCasErrorsType errorHandler, Map<String, Threshold> thresholdMap) {
+			if ( errorHandler != null ) {
+				Threshold t = getThresholdFor(errorHandler);
+				delegate.setProcessTimeout(errorHandler.getTimeout());
+				thresholdMap.put(delegate.getKey(), t);
+			} else {
+				thresholdMap.put(delegate.getKey(), newThreshold());
+			}
+		}
+		public static void addDelegateErrorThreshold(AnalysisEngineDelegate delegate, CollectionProcessCompleteErrorsType  errorHandler, Map<String, Threshold> thresholdMap) {
+			if ( errorHandler != null ) {
+				Threshold t = getThresholdFor(errorHandler);
+				delegate.setProcessTimeout(errorHandler.getTimeout());
+				thresholdMap.put(delegate.getKey(), t);
+			} else {
+				thresholdMap.put(delegate.getKey(), newThreshold());
+			}
+		}
+
+}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java Mon Feb 26 18:54:11 2018
@@ -76,7 +76,7 @@ public class CpcErrorHandler extends Err
         if (aController instanceof AggregateAnalysisEngineController) {
           endpoint = ((AggregateAnalysisEngineController) aController).getClientEndpoint();
         }
-        aController.getOutputChannel().sendReply(t, null, null, endpoint,
+        aController.getOutputChannel(endpoint).sendReply(t, null, null, endpoint,
                 AsynchAEMessage.CollectionProcessComplete);
       } catch (Exception e) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Mon Feb 26 18:54:11 2018
@@ -113,13 +113,14 @@ public class ProcessCasErrorHandler exte
     if (anEndpoint != null && aCasReferenceId != null && !anEndpoint.isCasMultiplier()) {
       try {
         if (!anEndpoint.isRemote()) {
+        	/*
           anEndpoint.setReplyEndpoint(true);
           UimaTransport vmTransport = aController.getTransport(anEndpoint.getEndpoint());
           UimaMessage message = vmTransport.produceMessage(AsynchAEMessage.Process,
                   AsynchAEMessage.Response, aController.getName());
           message.addIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
           message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-
+*/
           Throwable wrapper = null;
           if (!(t instanceof UimaEEServiceException)) {
             // Strip off AsyncAEException and replace with UimaEEServiceException
@@ -129,6 +130,7 @@ public class ProcessCasErrorHandler exte
               wrapper = new UimaEEServiceException(t);
             }
           }
+          /*
           if (wrapper == null) {
             message.addObjectProperty(AsynchAEMessage.Cargo, t);
           } else {
@@ -138,6 +140,16 @@ public class ProcessCasErrorHandler exte
             vmTransport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
             aController.dropStats(aCasReferenceId, aController.getName());
           }
+          */
+          if (!aController.isStopped()) {
+              aController.dropStats(aCasReferenceId, aController.getName());
+            }
+        if (wrapper == null) {
+            aController.getOutputChannel(anEndpoint).sendReply(t, aCasReferenceId, null, anEndpoint,  AsynchAEMessage.Process);
+          } else {
+            aController.getOutputChannel(anEndpoint).sendReply(wrapper, aCasReferenceId, null, anEndpoint,  AsynchAEMessage.Process);
+          }
+
         } else {
           CasStateEntry stateEntry = null;
           String parentCasReferenceId = null;
@@ -152,7 +164,7 @@ public class ProcessCasErrorHandler exte
           }
 
           if (!aController.isStopped()) {
-            aController.getOutputChannel().sendReply(t, aCasReferenceId, parentCasReferenceId,
+            aController.getOutputChannel(anEndpoint).sendReply(t, aCasReferenceId, parentCasReferenceId,
                     anEndpoint, AsynchAEMessage.Process);
           }
         }
@@ -516,7 +528,7 @@ public class ProcessCasErrorHandler exte
         doSendReplyToClient = false;
         // Check if the CAS is a subordinate (has parent CAS).
         if (casStateEntry != null && casStateEntry.isSubordinate()) {
-          String parentCasReferenceId = casStateEntry.getInputCasReferenceId();
+          String parentCasReferenceId = casStateEntry.getParentCasReferenceId();
           if (parentCasReferenceId != null) {
             try {
               CacheEntry parentCasCacheEntry = aController.getInProcessCache().getCacheEntryForCAS(
@@ -542,7 +554,7 @@ public class ProcessCasErrorHandler exte
                   parentCasStateEntry.setFailed();
                   while (predecessorCas != null && predecessorCas.isSubordinate()) {
                     predecessorCas = aController.getLocalCache().lookupEntry(
-                            predecessorCas.getInputCasReferenceId());
+                            predecessorCas.getParentCasReferenceId());
                     predecessorCas.setFailed();
                   }
                   predecessorCas.addThrowable(t);
@@ -648,7 +660,7 @@ public class ProcessCasErrorHandler exte
               cmEndpoint.setReplyEndpoint(true);
               cmEndpoint.setIsCasMultiplier(true);
               cmEndpoint.setFreeCasEndpoint(true);
-              aController.getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS,
+              aController.getOutputChannel(cmEndpoint).sendRequest(AsynchAEMessage.ReleaseCAS,
                       cacheEntry.getCasReferenceId(), cmEndpoint);
             }
           }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java Mon Feb 26 18:54:11 2018
@@ -31,7 +31,7 @@ import org.apache.uima.aae.message.Messa
 import org.apache.uima.util.Level;
 
 public class MetadataRequestHandler_impl extends HandlerBase {
-  private static final Class CLASS_NAME = MetadataRequestHandler_impl.class;
+  private static final Class<?> CLASS_NAME = MetadataRequestHandler_impl.class;
 
   public MetadataRequestHandler_impl(String aName) {
     super(aName);