You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:22:15 UTC

svn commit: r810557 [4/7] - in /incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae: controller/ delegate/ deploymentDescriptor/

Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=810557&r1=810556&r2=810557&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Wed Sep  2 15:22:13 2009
@@ -96,1167 +96,1092 @@
 import org.apache.uima.resource.Resource_ImplBase;
 import org.apache.uima.util.Level;
 
-public abstract class BaseAnalysisEngineController extends Resource_ImplBase 
-implements AnalysisEngineController, EventSubscriber
-{
-	private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
-
-	private static final long DoNotProcessTTL = 30*60*1000;  // 30 minute time to live 
-	
-	protected volatile ControllerLatch latch = new ControllerLatch(this);
-
-	protected ConcurrentHashMap statsMap = new ConcurrentHashMap();
-
-	protected Monitor monitor = new MonitorBaseImpl();
-
-	protected Endpoint clientEndpoint;
-
-	private CountDownLatch inputChannelLatch = new CountDownLatch(1);
-	
-	private OutputChannel outputChannel;
-
-	private AsynchAECasManager casManager;
-
-	private InProcessCache inProcessCache;
-
-	protected AnalysisEngineController parentController;
-
-	private String endpointName;
-
-	protected ResourceSpecifier resourceSpecifier;
-
-	protected HashMap paramsMap;
-
-	protected InputChannel inputChannel;
-
-	protected ErrorHandlerChain errorHandlerChain;
-	
-	protected long errorCount = 0;
-
-	protected List inputChannelList = new ArrayList();
-	
-	protected ConcurrentHashMap inputChannelMap = new ConcurrentHashMap();
-
-	protected ConcurrentHashMap idleTimeMap = new ConcurrentHashMap();
-
-	private UimaEEAdminContext adminContext;
-
-	protected int componentCasPoolSize = 0;
-
-	protected long replyTime = 0;
-	
-	protected long idleTime = 0;
-
-	protected ConcurrentHashMap serviceErrorMap = new ConcurrentHashMap();
-
-	private boolean registeredWithJMXServer = false; 
-	protected String jmxContext = "";
-	
-	protected ConcurrentHashMap mBeanMap = new ConcurrentHashMap();
-	
-	protected ServicePerformance servicePerformance = null;
-	
-	protected ServiceErrors serviceErrors = null;
-	
-	protected ConcurrentHashMap timeSnapshotMap = new ConcurrentHashMap();
-
-	private String deploymentDescriptor = "";
-	
-	private JmxManagement jmxManagement = null;
-	
-	protected volatile boolean stopped = false;
-	
-	protected String delegateKey = null;
-	
-	protected List unregisteredDelegateList = new ArrayList();
-	
-	protected volatile boolean allDelegatesAreRemote = false;
-	
-	protected List controllerListeners = new ArrayList();
-	
-	protected volatile boolean serviceInitialized = false;
-	
-	protected ConcurrentHashMap perCasStatistics = new ConcurrentHashMap();
-
-	private volatile boolean casMultiplier = false;
-	
-	protected Object syncObject = new Object();
-	
-	//	Map holding outstanding CASes produced by Cas Multiplier that have to be acked
-	protected ConcurrentHashMap cmOutstandingCASes = new ConcurrentHashMap();
-	
-	private Object mux = new Object();
-	
-	private Object waitmux = new Object();
-	
-	private volatile boolean waitingForCAS = false;
-	
-	private long startTime = System.nanoTime();
-	
-	private long totalWaitTimeForCAS = 0;
-	
-	private long lastCASWaitTimeUpdate = 0;
-
-	private Map<Long, AnalysisThreadState> threadStateMap =
-		new HashMap<Long,AnalysisThreadState>();
-	
-	protected final Object finalStepMux = new Object();
-	
-	protected ConcurrentHashMap<String, UimaTransport> transports
-	  = new ConcurrentHashMap<String, UimaTransport>();
-	
-  protected ConcurrentHashMap<String, UimaMessageListener> messageListeners
-  = new ConcurrentHashMap<String, UimaMessageListener>();
-  
+public abstract class BaseAnalysisEngineController extends Resource_ImplBase implements
+        AnalysisEngineController, EventSubscriber {
+  private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
+
+  private static final long DoNotProcessTTL = 30 * 60 * 1000; // 30 minute time to live
+
+  protected volatile ControllerLatch latch = new ControllerLatch(this);
+
+  protected ConcurrentHashMap statsMap = new ConcurrentHashMap();
+
+  protected Monitor monitor = new MonitorBaseImpl();
+
+  protected Endpoint clientEndpoint;
+
+  private CountDownLatch inputChannelLatch = new CountDownLatch(1);
+
+  private OutputChannel outputChannel;
+
+  private AsynchAECasManager casManager;
+
+  private InProcessCache inProcessCache;
+
+  protected AnalysisEngineController parentController;
+
+  private String endpointName;
+
+  protected ResourceSpecifier resourceSpecifier;
+
+  protected HashMap paramsMap;
+
+  protected InputChannel inputChannel;
+
+  protected ErrorHandlerChain errorHandlerChain;
+
+  protected long errorCount = 0;
+
+  protected List inputChannelList = new ArrayList();
+
+  protected ConcurrentHashMap inputChannelMap = new ConcurrentHashMap();
+
+  protected ConcurrentHashMap idleTimeMap = new ConcurrentHashMap();
+
+  private UimaEEAdminContext adminContext;
+
+  protected int componentCasPoolSize = 0;
+
+  protected long replyTime = 0;
+
+  protected long idleTime = 0;
+
+  protected ConcurrentHashMap serviceErrorMap = new ConcurrentHashMap();
+
+  private boolean registeredWithJMXServer = false;
+
+  protected String jmxContext = "";
+
+  protected ConcurrentHashMap mBeanMap = new ConcurrentHashMap();
+
+  protected ServicePerformance servicePerformance = null;
+
+  protected ServiceErrors serviceErrors = null;
+
+  protected ConcurrentHashMap timeSnapshotMap = new ConcurrentHashMap();
+
+  private String deploymentDescriptor = "";
+
+  private JmxManagement jmxManagement = null;
+
+  protected volatile boolean stopped = false;
+
+  protected String delegateKey = null;
+
+  protected List unregisteredDelegateList = new ArrayList();
+
+  protected volatile boolean allDelegatesAreRemote = false;
+
+  protected List controllerListeners = new ArrayList();
+
+  protected volatile boolean serviceInitialized = false;
+
+  protected ConcurrentHashMap perCasStatistics = new ConcurrentHashMap();
+
+  private volatile boolean casMultiplier = false;
+
+  protected Object syncObject = new Object();
+
+  // Map holding outstanding CASes produced by Cas Multiplier that have to be acked
+  protected ConcurrentHashMap cmOutstandingCASes = new ConcurrentHashMap();
+
+  private Object mux = new Object();
+
+  private Object waitmux = new Object();
+
+  private volatile boolean waitingForCAS = false;
+
+  private long startTime = System.nanoTime();
+
+  private long totalWaitTimeForCAS = 0;
+
+  private long lastCASWaitTimeUpdate = 0;
+
+  private Map<Long, AnalysisThreadState> threadStateMap = new HashMap<Long, AnalysisThreadState>();
+
+  protected final Object finalStepMux = new Object();
+
+  protected ConcurrentHashMap<String, UimaTransport> transports = new ConcurrentHashMap<String, UimaTransport>();
+
+  protected ConcurrentHashMap<String, UimaMessageListener> messageListeners = new ConcurrentHashMap<String, UimaMessageListener>();
+
   private Exception initException = null;
-	
-  // Local cache for this controller only. This cache stores state of 
+
+  // Local cache for this controller only. This cache stores state of
   // each CAS. The actual CAS is still stored in the global cache. The
   // local cache is used to determine when each CAS can be removed as
   // it reaches the final step. Global cache is not a good place to
   // store this information if there are collocated (delegate) controllers
-  // A CAS state change made by one controller may effect another controller. 
+  // A CAS state change made by one controller may effect another controller.
   protected LocalCache localCache;
 
-  protected String aeDescriptor; 
-  //	List of Delegates
+  protected String aeDescriptor;
+
+  // List of Delegates
   protected List<Delegate> delegates = new ArrayList<Delegate>();
-  //  indicates whether or not we received a callback from the InProcessCache when
-  //  it becomes empty
-  protected volatile boolean  callbackReceived = false;
-  //  Monitor used in stop() to await a callback from InProcessCache
+
+  // indicates whether or not we received a callback from the InProcessCache when
+  // it becomes empty
+  protected volatile boolean callbackReceived = false;
+
+  // Monitor used in stop() to await a callback from InProcessCache
   protected Object callbackMonitor = new Object();
-  
+
   protected volatile boolean awaitingCacheCallbackNotification = false;
-  protected ConcurrentHashMap<String, String> abortedCasesMap = 
-    new ConcurrentHashMap<String, String>();
+
+  protected ConcurrentHashMap<String, String> abortedCasesMap = new ConcurrentHashMap<String, String>();
 
   protected String processPid = "";
+
   private CountDownLatch stopLatch = new CountDownLatch(1);
-  
-  //  Set to true when stopping the service
+
+  // Set to true when stopping the service
   private volatile boolean releasedAllCASes;
-  
-  protected List<DoNotProcessEntry> doNotProcessList = 
-    new ArrayList<DoNotProcessEntry>();
-  
-  private ScheduledExecutorService daemonServiceExecutor=null;
-  
-  private static final UimaAsVersion uimaAsVersion = 
-    new UimaAsVersion();
-  
+
+  protected List<DoNotProcessEntry> doNotProcessList = new ArrayList<DoNotProcessEntry>();
+
+  private ScheduledExecutorService daemonServiceExecutor = null;
+
+  private static final UimaAsVersion uimaAsVersion = new UimaAsVersion();
+
   public BaseAnalysisEngineController() {
-    
+
+  }
+
+  public BaseAnalysisEngineController(AnalysisEngineController aParentController,
+          int aComponentCasPoolSize, String anEndpointName, String aDescriptor,
+          AsynchAECasManager aCasManager, InProcessCache anInProcessCache) throws Exception {
+    this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager,
+            anInProcessCache, null, null);
+  }
+
+  public BaseAnalysisEngineController(AnalysisEngineController aParentController,
+          int aComponentCasPoolSize, String anEndpointName, String aDescriptor,
+          AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap)
+          throws Exception {
+    this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager,
+            anInProcessCache, aDestinationMap, null);
   }
-	public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache) throws Exception
-	{
-		this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, null, null);
-	}
-	public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap) throws Exception
-	{
-		this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, aDestinationMap, null);
-	}	
-	public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap, JmxManagement aJmxManagement) throws Exception
-	{
-		this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, aDestinationMap, aJmxManagement);
-	}
-	
-	
-	public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, long anInitialCasHeapSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap, JmxManagement aJmxManagement) throws Exception
-	{
-	  casManager = aCasManager;
-		inProcessCache = anInProcessCache;
+
+  public BaseAnalysisEngineController(AnalysisEngineController aParentController,
+          int aComponentCasPoolSize, String anEndpointName, String aDescriptor,
+          AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap,
+          JmxManagement aJmxManagement) throws Exception {
+    this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager,
+            anInProcessCache, aDestinationMap, aJmxManagement);
+  }
+
+  public BaseAnalysisEngineController(AnalysisEngineController aParentController,
+          int aComponentCasPoolSize, long anInitialCasHeapSize, String anEndpointName,
+          String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache,
+          Map aDestinationMap, JmxManagement aJmxManagement) throws Exception {
+    casManager = aCasManager;
+    inProcessCache = anInProcessCache;
     localCache = new LocalCache(this);
     aeDescriptor = aDescriptor;
-		parentController = aParentController;
-		componentCasPoolSize = aComponentCasPoolSize;
-		
-		if ( this instanceof AggregateAnalysisEngineController )
-		{
-			//	Populate a list of un-registered co-located delegates. A delegate will be taken off the un-registered list
-			//	when it calls its parent registerChildController() method.
-			Set set = aDestinationMap.entrySet();
+    parentController = aParentController;
+    componentCasPoolSize = aComponentCasPoolSize;
+
+    if (this instanceof AggregateAnalysisEngineController) {
+      // Populate a list of un-registered co-located delegates. A delegate will be taken off the
+      // un-registered list
+      // when it calls its parent registerChildController() method.
+      Set set = aDestinationMap.entrySet();
       synchronized (unregisteredDelegateList) {
-        for( Iterator it = set.iterator(); it.hasNext();)
-        {
-          Map.Entry entry = (Map.Entry)it.next();
-          Endpoint endpoint = (Endpoint)entry.getValue();
-          if ( endpoint != null && !endpoint.isRemote() )
-          {
+        for (Iterator it = set.iterator(); it.hasNext();) {
+          Map.Entry entry = (Map.Entry) it.next();
+          Endpoint endpoint = (Endpoint) entry.getValue();
+          if (endpoint != null && !endpoint.isRemote()) {
             unregisteredDelegateList.add(entry.getKey());
           }
         }
-        if ( unregisteredDelegateList.size() == 0 ) // All delegates are remote
+        if (unregisteredDelegateList.size() == 0) // All delegates are remote
         {
           allDelegatesAreRemote = true;
         }
       }
-		}
+    }
 
-		
-		endpointName = anEndpointName;
-		delegateKey = anEndpointName;
-		
-		if (this instanceof AggregateAnalysisEngineController)
-		{
-			ConcurrentHashMap endpoints = new ConcurrentHashMap();
-			endpoints.putAll(aDestinationMap);
-			//	Create a map containing: Endpoint-DelegateKey pairs, to enable look-up
-			//	of a delegate key based on delegate's endpoint
-			((AggregateAnalysisEngineController) this).mapEndpointsToKeys(endpoints);
-		
-		}
-		//	If not the top level, retrieve the name of the endpoint from the parent 
-		if ( !isTopLevelComponent() )
-		{
-			Endpoint endpoint = ((AggregateAnalysisEngineController)parentController).lookUpEndpoint(endpointName, false);
-			endpointName = endpoint.getEndpoint();
-		} 
-		resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aDescriptor);
-
-		
-		if ( isTopLevelComponent())
-		{
-		  //  Check UIMA AS version againg the UIMA Core version. If not the same throw Exception
-	      if ( !uimaAsVersion.getVersionString().equals(UIMAFramework.getVersionString())) {
-	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                  "BaseAnalysisEngineController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_incompatible_version_WARNING",
-                  new Object[] { getComponentName(), uimaAsVersion.getVersionString(), UIMAFramework.getVersionString() });
-	        throw new ResourceInitializationException(new AsynchAEException("Version of UIMA-AS is Incompatible with a Version of UIMA Core. UIMA-AS Version:"+uimaAsVersion.getVersionString()+" Core UIMA Version:"+UIMAFramework.getVersionString()));
-	      }
-	      logPlatformInfo(getComponentName());
-		}
-		else
-		{
-	    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-	      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-	                "BaseAnalysisEngineController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_id_INFO",
-	                new Object[] { endpointName });
-	    }
-		}
-		
-		//	Is this service a CAS Multiplier?
-		if ( (resourceSpecifier instanceof AnalysisEngineDescription &&
-				((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes()) 
-				|| resourceSpecifier instanceof CollectionReaderDescription)
-		{
-			casMultiplier = true;
-		}
-		
-		paramsMap = new HashMap();
-		if ( aJmxManagement == null )
-		{
-			jmxManagement = new JmxManager(getJMXDomain());
-		}
-		else
-		{
-			jmxManagement = aJmxManagement;
-			if ( jmxManagement.getMBeanServer() != null )
-			{
-				paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, jmxManagement.getMBeanServer()); 
-			}
-		}
-		paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, jmxManagement.getJmxDomain()); 
-		if ( isTopLevelComponent() && this instanceof AggregateAnalysisEngineController)
-		{
+    endpointName = anEndpointName;
+    delegateKey = anEndpointName;
+
+    if (this instanceof AggregateAnalysisEngineController) {
+      ConcurrentHashMap endpoints = new ConcurrentHashMap();
+      endpoints.putAll(aDestinationMap);
+      // Create a map containing: Endpoint-DelegateKey pairs, to enable look-up
+      // of a delegate key based on delegate's endpoint
+      ((AggregateAnalysisEngineController) this).mapEndpointsToKeys(endpoints);
+
+    }
+    // If not the top level, retrieve the name of the endpoint from the parent
+    if (!isTopLevelComponent()) {
+      Endpoint endpoint = ((AggregateAnalysisEngineController) parentController).lookUpEndpoint(
+              endpointName, false);
+      endpointName = endpoint.getEndpoint();
+    }
+    resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aDescriptor);
+
+    if (isTopLevelComponent()) {
+      // Check UIMA AS version againg the UIMA Core version. If not the same throw Exception
+      if (!uimaAsVersion.getVersionString().equals(UIMAFramework.getVersionString())) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.INFO,
+                CLASS_NAME.getName(),
+                "BaseAnalysisEngineController",
+                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_incompatible_version_WARNING",
+                new Object[] { getComponentName(), uimaAsVersion.getVersionString(),
+                    UIMAFramework.getVersionString() });
+        throw new ResourceInitializationException(new AsynchAEException(
+                "Version of UIMA-AS is Incompatible with a Version of UIMA Core. UIMA-AS Version:"
+                        + uimaAsVersion.getVersionString() + " Core UIMA Version:"
+                        + UIMAFramework.getVersionString()));
+      }
+      logPlatformInfo(getComponentName());
+    } else {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-              "C'tor", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_using_vm_transport_INFO",
-              new Object[] { getComponentName() });
+                "BaseAnalysisEngineController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_service_id_INFO", new Object[] { endpointName });
       }
-		}
+    }
 
-		//	Top level component?
-		if (parentController == null)
-		{
-		    paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, casManager.getResourceManager());
-			initialize(resourceSpecifier, paramsMap);
-			AnalysisEngineManagementImpl mbean = (AnalysisEngineManagementImpl)
-				getUimaContextAdmin().getManagementInterface();
-			//	Override uima core jmx domain setting
-			mbean.setName(getComponentName(), getUimaContextAdmin(),jmxManagement.getJmxDomain());
-			if ( resourceSpecifier instanceof AnalysisEngineDescription )
-			{
-				//	Is this service a CAS Multiplier?
-				if ( ((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() )
-				{
-					System.out.println(getName()+"-Initializing CAS Pool for Context:"+getUimaContextAdmin().getQualifiedContextName());
-					System.out.println(getComponentName()+"-CasMultiplier Cas Pool Size="+aComponentCasPoolSize+" Cas Initialial Heap Size:"+anInitialCasHeapSize);
-	         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-	           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-		                    "C'tor", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_multiplier_cas_pool_config_INFO",
-		                    new Object[] { getComponentName(), aComponentCasPoolSize, anInitialCasHeapSize });
-	         }
-					initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize);
-				}
-			}
-		}
-		else
-		{
-			UimaContext childContext = parentController.getChildUimaContext(endpointName);
-			paramsMap.put(Resource.PARAM_UIMA_CONTEXT, childContext);
-			initialize(resourceSpecifier, paramsMap);
-			initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize );
-			if (parentController instanceof AggregateAnalysisEngineController )
-			{
-
-			  //	Register self with the parent controller 
-				((AggregateAnalysisEngineController) parentController).registerChildController(this,delegateKey);
-			}
-		}
-
-		//	Each component in the service hierarchy has its own index. The index is used
-		//	to construct jmx context path to which every object belongs. 
-		int index = getIndex();
-		
-		//	Get uima ee jmx base context path 
-		jmxContext = getJmxContext();
-		if ( !isTopLevelComponent() && this instanceof PrimitiveAnalysisEngineController )
-		{
-			String thisComponentName = ((AggregateAnalysisEngineController)parentController).lookUpDelegateKey(endpointName);
-			jmxContext += ( thisComponentName + " Uima EE Service");
-		}
-		
-		//	Register InProcessCache with JMX under the top level component
-		if ( inProcessCache != null && isTopLevelComponent() )
-		{
-			inProcessCache.setName(jmxManagement.getJmxDomain()+jmxContext+",name="+inProcessCache.getName());
-			ObjectName on = new ObjectName(inProcessCache.getName() );
-			jmxManagement.registerMBean(inProcessCache, on);
-		}
-		initializeServiceStats();
-
-		
-		//  Show Serialization Strategy of each remote delegate
-    if ( this instanceof AggregateAnalysisEngineController )
-    {
+    // Is this service a CAS Multiplier?
+    if ((resourceSpecifier instanceof AnalysisEngineDescription && ((AnalysisEngineDescription) resourceSpecifier)
+            .getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes())
+            || resourceSpecifier instanceof CollectionReaderDescription) {
+      casMultiplier = true;
+    }
+
+    paramsMap = new HashMap();
+    if (aJmxManagement == null) {
+      jmxManagement = new JmxManager(getJMXDomain());
+    } else {
+      jmxManagement = aJmxManagement;
+      if (jmxManagement.getMBeanServer() != null) {
+        paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, jmxManagement.getMBeanServer());
+      }
+    }
+    paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, jmxManagement.getJmxDomain());
+    if (isTopLevelComponent() && this instanceof AggregateAnalysisEngineController) {
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "C'tor",
+                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_using_vm_transport_INFO",
+                new Object[] { getComponentName() });
+      }
+    }
+
+    // Top level component?
+    if (parentController == null) {
+      paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, casManager.getResourceManager());
+      initialize(resourceSpecifier, paramsMap);
+      AnalysisEngineManagementImpl mbean = (AnalysisEngineManagementImpl) getUimaContextAdmin()
+              .getManagementInterface();
+      // Override uima core jmx domain setting
+      mbean.setName(getComponentName(), getUimaContextAdmin(), jmxManagement.getJmxDomain());
+      if (resourceSpecifier instanceof AnalysisEngineDescription) {
+        // Is this service a CAS Multiplier?
+        if (((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData()
+                .getOperationalProperties().getOutputsNewCASes()) {
+          System.out.println(getName() + "-Initializing CAS Pool for Context:"
+                  + getUimaContextAdmin().getQualifiedContextName());
+          System.out.println(getComponentName() + "-CasMultiplier Cas Pool Size="
+                  + aComponentCasPoolSize + " Cas Initialial Heap Size:" + anInitialCasHeapSize);
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+            UIMAFramework.getLogger(CLASS_NAME)
+                    .logrb(
+                            Level.INFO,
+                            CLASS_NAME.getName(),
+                            "C'tor",
+                            UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                            "UIMAEE_multiplier_cas_pool_config_INFO",
+                            new Object[] { getComponentName(), aComponentCasPoolSize,
+                                anInitialCasHeapSize });
+          }
+          initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize);
+        }
+      }
+    } else {
+      UimaContext childContext = parentController.getChildUimaContext(endpointName);
+      paramsMap.put(Resource.PARAM_UIMA_CONTEXT, childContext);
+      initialize(resourceSpecifier, paramsMap);
+      initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize);
+      if (parentController instanceof AggregateAnalysisEngineController) {
+
+        // Register self with the parent controller
+        ((AggregateAnalysisEngineController) parentController).registerChildController(this,
+                delegateKey);
+      }
+    }
+
+    // Each component in the service hierarchy has its own index. The index is used
+    // to construct jmx context path to which every object belongs.
+    int index = getIndex();
+
+    // Get uima ee jmx base context path
+    jmxContext = getJmxContext();
+    if (!isTopLevelComponent() && this instanceof PrimitiveAnalysisEngineController) {
+      String thisComponentName = ((AggregateAnalysisEngineController) parentController)
+              .lookUpDelegateKey(endpointName);
+      jmxContext += (thisComponentName + " Uima EE Service");
+    }
+
+    // Register InProcessCache with JMX under the top level component
+    if (inProcessCache != null && isTopLevelComponent()) {
+      inProcessCache.setName(jmxManagement.getJmxDomain() + jmxContext + ",name="
+              + inProcessCache.getName());
+      ObjectName on = new ObjectName(inProcessCache.getName());
+      jmxManagement.registerMBean(inProcessCache, on);
+    }
+    initializeServiceStats();
+
+    // Show Serialization Strategy of each remote delegate
+    if (this instanceof AggregateAnalysisEngineController) {
       Set set = aDestinationMap.entrySet();
-      for( Iterator it = set.iterator(); it.hasNext();)
-      {
-        Map.Entry entry = (Map.Entry)it.next();
-        Endpoint endpoint = (Endpoint)entry.getValue();
-        if ( endpoint != null && endpoint.isRemote() )
-        {
-          String key = ((AggregateAnalysisEngineController)this).lookUpDelegateKey(endpoint.getEndpoint());
-          System.out.println("Service:"+getComponentName()+" Configured To Serialize CASes To Remote Delegate:"+key+" Using "+endpoint.getSerializer()+" Serialization");
-          if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                    "C'tor", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_remote_delegate_serialization_INFO",
-                    new Object[] { getComponentName(), key, endpoint.getSerializer()});
+      for (Iterator it = set.iterator(); it.hasNext();) {
+        Map.Entry entry = (Map.Entry) it.next();
+        Endpoint endpoint = (Endpoint) entry.getValue();
+        if (endpoint != null && endpoint.isRemote()) {
+          String key = ((AggregateAnalysisEngineController) this).lookUpDelegateKey(endpoint
+                  .getEndpoint());
+          System.out.println("Service:" + getComponentName()
+                  + " Configured To Serialize CASes To Remote Delegate:" + key + " Using "
+                  + endpoint.getSerializer() + " Serialization");
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "C'tor",
+                    UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAEE_show_remote_delegate_serialization_INFO",
+                    new Object[] { getComponentName(), key, endpoint.getSerializer() });
           }
         }
       }
     }
-    
-    //  Create an instance of ControllerMBean and register it with JMX Server.
-    //  This bean exposes service lifecycle APIs to enable remote stop
-    if ( isTopLevelComponent() ) {
+
+    // Create an instance of ControllerMBean and register it with JMX Server.
+    // This bean exposes service lifecycle APIs to enable remote stop
+    if (isTopLevelComponent()) {
       Controller controller = new Controller(this);
-      String jmxName = getManagementInterface().getJmxDomain()+"name="+"Controller";
-      registerWithAgent( controller, jmxName);
+      String jmxName = getManagementInterface().getJmxDomain() + "name=" + "Controller";
+      registerWithAgent(controller, jmxName);
     }
-	}
-	
-	private void logPlatformInfo(String serviceName ) {
-    if ( ManagementFactory.getPlatformMBeanServer() != null )
-    {
+  }
+
+  private void logPlatformInfo(String serviceName) {
+    if (ManagementFactory.getPlatformMBeanServer() != null) {
       RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
-//      MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
+      // MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
       OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
-      //  bean.getName() should return a string that looks like this: PID@HOSTNAME
-      //  where PID is the process id and the HOSTNAME is the name of the machine
+      // bean.getName() should return a string that looks like this: PID@HOSTNAME
+      // where PID is the process id and the HOSTNAME is the name of the machine
       processPid = bean.getName();
-      if ( processPid != null && processPid.trim().length() > 0 ) {
+      if (processPid != null && processPid.trim().length() > 0) {
         int endPos = processPid.indexOf("@"); // find the position where the PID ends
-        if ( endPos > -1 ) {
-          processPid = processPid.substring(0, endPos);  
-       }
+        if (endPos > -1) {
+          processPid = processPid.substring(0, endPos);
+        }
       }
       DateFormat df = new SimpleDateFormat("dd MMM yyyy HH:mm:ss ");
-      
+
       StringBuffer platformInfo = new StringBuffer();
       platformInfo.append("\n+------------------------------------------------------------------");
-      platformInfo.append("\n                   Starting UIMA AS Service - PID:"+processPid);
+      platformInfo.append("\n                   Starting UIMA AS Service - PID:" + processPid);
       platformInfo.append("\n+------------------------------------------------------------------");
-      platformInfo.append("\n+ Service Name:"+serviceName);
-      platformInfo.append("\n+ Service Queue Name:"+endpointName);
-      platformInfo.append("\n+ Service Start Time:"+df.format(bean.getStartTime()));
-      platformInfo.append("\n+ UIMA AS Version:"+uimaAsVersion.getVersionString());
-      platformInfo.append("\n+ UIMA Core Version:"+UIMAFramework.getVersionString());
-      platformInfo.append("\n+ OS Name:"+osBean.getName());
-      platformInfo.append("\n+ OS Version:"+osBean.getVersion());
-      platformInfo.append("\n+ OS Architecture:"+osBean.getArch());
-      platformInfo.append("\n+ OS CPU Count:"+osBean.getAvailableProcessors());
-      platformInfo.append("\n+ JVM Vendor:"+bean.getVmVendor());
-      platformInfo.append("\n+ JVM Name:"+bean.getVmName());
-      platformInfo.append("\n+ JVM Version:"+bean.getVmVersion());
-      platformInfo.append("\n+ JVM Input Args:"+bean.getInputArguments());
-      platformInfo.append("\n+ JVM Classpath:"+bean.getClassPath());
-      platformInfo.append("\n+ JVM LIB_PATH:"+bean.getLibraryPath());
+      platformInfo.append("\n+ Service Name:" + serviceName);
+      platformInfo.append("\n+ Service Queue Name:" + endpointName);
+      platformInfo.append("\n+ Service Start Time:" + df.format(bean.getStartTime()));
+      platformInfo.append("\n+ UIMA AS Version:" + uimaAsVersion.getVersionString());
+      platformInfo.append("\n+ UIMA Core Version:" + UIMAFramework.getVersionString());
+      platformInfo.append("\n+ OS Name:" + osBean.getName());
+      platformInfo.append("\n+ OS Version:" + osBean.getVersion());
+      platformInfo.append("\n+ OS Architecture:" + osBean.getArch());
+      platformInfo.append("\n+ OS CPU Count:" + osBean.getAvailableProcessors());
+      platformInfo.append("\n+ JVM Vendor:" + bean.getVmVendor());
+      platformInfo.append("\n+ JVM Name:" + bean.getVmName());
+      platformInfo.append("\n+ JVM Version:" + bean.getVmVersion());
+      platformInfo.append("\n+ JVM Input Args:" + bean.getInputArguments());
+      platformInfo.append("\n+ JVM Classpath:" + bean.getClassPath());
+      platformInfo.append("\n+ JVM LIB_PATH:" + bean.getLibraryPath());
       platformInfo.append("\n+------------------------------------------------------------------");
       System.out.println(platformInfo.toString());
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-              "logPlatformInfo", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_platform_info__INFO",
-              new Object[] { platformInfo.toString() });
+              "logPlatformInfo", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+              "UIMAEE_show_platform_info__INFO", new Object[] { platformInfo.toString() });
     }
-	}
-	public AnalysisEngineController getParentController() {
+  }
+
+  public AnalysisEngineController getParentController() {
     return parentController;
-	}
-  public UimaTransport getTransport(String aKey) throws Exception
-  {
-    return getTransport( null, aKey);
   }
 
-	 public UimaTransport getTransport(UimaAsContext asContext)
-	 throws Exception
-	 {
-	   String endpointName = getName();
-	   if ( !isTopLevelComponent() ) {
-	     endpointName = parentController.getName();
-	   }
-     return getTransport(asContext, endpointName);
-	 }
-
-	 public UimaTransport getTransport(UimaAsContext asContext, String aKey)
-   throws Exception
-   {
-     UimaTransport transport = null;
-     if ( !transports.containsKey(aKey)) {
-       transport = new VmTransport(asContext, this);
-       if ( isStopped() ) {
-         throw new ServiceShutdownException();
-       }
-       transports.put( aKey, transport);
-     }
-     else {
-       transport = (UimaTransport) transports.get(aKey);
-     }
-       
-     return transport;
-   }
-
-	 /**
-	  * Initializes transport used for internal messaging between collocated Uima AS services.
-	  */
-	 public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception
-	 {
-	   //  If this controller is an Aggregate Controller, force delegates to initialize
-	   //  their internal transports.
-	   if ( this instanceof AggregateAnalysisEngineController )
-	   {
-	     //  Get a list of all colocated delegate controllers.
-	     List childControllers = ((AggregateAnalysisEngineController_impl)this).childControllerList;
-	     
-	     for( int i=0; i < childControllers.size(); i++) 
-	     {
-	       AnalysisEngineController ctrl = (AnalysisEngineController)childControllers.get(i);
-	       //  Force initialization
-	       ctrl.initializeVMTransport(parentControllerReplyConsumerCount);
-	     }
-	   }
-	   
-	   //  Only delegate controllers execute the logic below
-	   if ( parentController != null )
-     {
-       UimaAsContext uimaAsContext = new UimaAsContext();
-       if ( !registeredWithJMXServer )
-       {
-         registeredWithJMXServer = true;
-         registerServiceWithJMX(jmxContext, false);
-       }
-       // Determine how many consumer threads to create. First though use the parent Aggregate Controller
-       // to lookup this delegate key. Next fetch the delegate endpoint which contains 
-       // concurrentConsumers property.
-       String key = ((AggregateAnalysisEngineController)parentController).lookUpDelegateKey(getName());
-       int concurrentRequestConsumers = 1;
-       int concurrentReplyConsumers = 1;
-       if ( key != null )
-       {
-         Endpoint e = ((AggregateAnalysisEngineController)parentController).lookUpEndpoint(key, false);
-         concurrentRequestConsumers = e.getConcurrentRequestConsumers();
-         concurrentReplyConsumers = e.getConcurrentReplyConsumers();
-       }
-       
-       System.out.println("Controller:"+getComponentName()+" Starting Request Listener With "+concurrentRequestConsumers+" Concurrent Consumers. Reply Listener Configured With "+concurrentReplyConsumers+" Concurrent Consumers");
-       
-       uimaAsContext.setConcurrentConsumerCount(concurrentRequestConsumers);
-       uimaAsContext.put("EndpointName", endpointName);
-       
-       UimaTransport vmTransport = getTransport(uimaAsContext);
-       // Creates delegate Listener for receiving requests from the parent
-       UimaMessageListener messageListener = vmTransport.produceUimaMessageListener();
-       // Plug in message handlers
-       messageListener.initialize(uimaAsContext);
-       // Store the listener
-       messageListeners.put(getName(), messageListener);
-       // Creates parent controller dispatcher for this delegate. The dispatcher is wired
-       // with this delegate's listener.
-       UimaAsContext uimaAsContext2 = new UimaAsContext();
-       // Set up as many reply threads as there are threads to process requests
-       uimaAsContext2.setConcurrentConsumerCount(concurrentReplyConsumers);
-       uimaAsContext2.put("EndpointName", endpointName);
-       UimaTransport parentVmTransport = parentController.getTransport(uimaAsContext2, endpointName);
-
-       parentVmTransport.produceUimaMessageDispatcher(vmTransport);
-       // Creates parent listener for receiving replies from this delegate. 
-       UimaMessageListener parentListener = parentVmTransport.produceUimaMessageListener();
-       // Plug in message handlers
-       parentListener.initialize(uimaAsContext2);
-       // Creates delegate's dispatcher. It is wired to send replies to the parent's listener.
-       vmTransport.produceUimaMessageDispatcher(parentVmTransport);
-       // Register input queue with JMX. This is an internal (non-jms) queue where clients
-       // send requests to this service.
-       vmTransport.registerWithJMX(this, "VmInputQueue");
-       parentVmTransport.registerWithJMX( this, "VmReplyQueue");
-     }
-
-	 }
-	 
-	 public synchronized UimaMessageListener getUimaMessageListener(String aDelegateKey)
-	 {
-	   return messageListeners.get(aDelegateKey);
-	 }
-
-	
-	/**
-	 * Get the domain for Uima JMX. The domain includes a fixed string plus the name of the 
-	 * top level component. All uima ee objects are rooted at this domain. 
-	 */
-
-	public String getJMXDomain()
-	{
-		//	Keep calling controllers until the top level component is reached
-		if ( !isTopLevelComponent() )
-		{
-			return parentController.getJMXDomain();
-		}
-		else
-		{
-			//	The domain includes the name of the top level component
-			return "org.apache.uima:type=ee.jms.services,s="+getComponentName()+" Uima EE Service,";
-		}
-	}
-	public JmxManagement getManagementInterface()
-	{
-		return jmxManagement;
-	}
-
-	/**
-	 * Returns a unique id for each component in the service hierarchy.
-	 * The top level component's id is always = 0
-	 * 
-	 */
-	public int getIndex()
-	{
-		if ( isTopLevelComponent() )
-		{
-			return 0;
-		}
-		return parentController.getIndex()+1;
-	}
-	
-	
-	private void initializeServiceStats()
-	{
-		Statistic statistic = null;
-		if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalDeserializeTime)) == null )
-		{
-			statistic = new LongNumericStatistic(Monitor.TotalDeserializeTime);
-			getMonitor().addStatistic("", statistic);
-		}
-		if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalSerializeTime)) == null )
-		{
-			statistic = new LongNumericStatistic(Monitor.TotalSerializeTime);
-			getMonitor().addStatistic("", statistic);
-		}
-		if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.IdleTime)) == null )
-		{
-			statistic = new LongNumericStatistic(Monitor.IdleTime);
-			getMonitor().addStatistic("", statistic);
-		}
-		if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.ProcessCount)) == null )
-		{
-			statistic = new LongNumericStatistic(Monitor.ProcessCount);
-			getMonitor().addStatistic("", statistic);
-		}
-		if ( this instanceof PrimitiveAnalysisEngineController )
-		{
-			if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.ProcessErrorCount)) == null )
-			{
-				statistic = new LongNumericStatistic(Monitor.ProcessErrorCount);
-				getMonitor().addStatistic("", statistic);
-			}
-			if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalProcessErrorCount)) == null )
-			{
-				statistic = new LongNumericStatistic(Monitor.TotalProcessErrorCount);
-				getMonitor().addStatistic("", statistic);
-			}
-			if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalAEProcessTime)) == null )
-			{
-				statistic = new LongNumericStatistic(Monitor.TotalAEProcessTime);
-				getMonitor().addStatistic("", statistic);
-			}
-		}
-	}
-	private void removeFromJmxServer( ObjectName anMBean) throws Exception
-	{
-		jmxManagement.unregisterMBean(anMBean);
-	}
-	
-	/**
-	 * This is called once during initialization to compute the position of the 
-	 * component in the JMX hierarchy and create a context path that will be used
-	 * to register the component in the JMX registry.
-	 */
-	public String getJmxContext()
-	{
-		if ( isTopLevelComponent() )
-		{
-			if ( this instanceof AggregateAnalysisEngineController )
-			{
-				return "p0="+getComponentName()+" Components";
-			}
-			else if ( this instanceof PrimitiveAnalysisEngineController )
-			{
-				return "p0="+getComponentName()+" Uima EE";
-			}
-				
-		}
-		//	Get the position of the component in the hierarchy. Each component
-		//	is registered with a unique context string that is composed of
-		//	the domain+<key,value> pair, where the key=p+<index>. The index is
-		//	incremented for every component. An example of a hierarchy would be
-		//	something like:
-		//	<domain>,s=<service name>,p0=<service name>,p1=<aggregate service>,p2=<delegate service>
-		
-		int index = getIndex();
-		String parentContext = parentController.getJmxContext();
-		if ( parentController.isTopLevelComponent())
-		{
-			index=1;
-		}
-		if ( this instanceof AggregateAnalysisEngineController )
-		{
-			String thisComponentName = getComponentName();
-			if ( !isTopLevelComponent() && endpointName != null)
-			{
-				thisComponentName = ((AggregateAnalysisEngineController)parentController).lookUpDelegateKey(endpointName);
-			}
-			return parentContext+",p"+index+"="+thisComponentName+" Components";
-		}
-		else
-		{
-			return parentContext+",p"+index+"=";
-		}
-	}
-	/**
-	 * Register a component with a given name with JMX MBeanServer
-	 * 
-	 * @param o - component to register with JMX
-	 * @param aName - full jmx context name for the component 
-	 */
-	protected void registerWithAgent( Object o, String aName)
-	{
-		try
-		{
-			ObjectName on = new ObjectName( aName );
-			jmxManagement.registerMBean(o, on);
-		}
-		catch( Exception e)
-		{
-			//	Log and move on
+  public UimaTransport getTransport(String aKey) throws Exception {
+    return getTransport(null, aKey);
+  }
+
+  public UimaTransport getTransport(UimaAsContext asContext) throws Exception {
+    String endpointName = getName();
+    if (!isTopLevelComponent()) {
+      endpointName = parentController.getName();
+    }
+    return getTransport(asContext, endpointName);
+  }
+
+  public UimaTransport getTransport(UimaAsContext asContext, String aKey) throws Exception {
+    UimaTransport transport = null;
+    if (!transports.containsKey(aKey)) {
+      transport = new VmTransport(asContext, this);
+      if (isStopped()) {
+        throw new ServiceShutdownException();
+      }
+      transports.put(aKey, transport);
+    } else {
+      transport = (UimaTransport) transports.get(aKey);
+    }
+
+    return transport;
+  }
+
+  /**
+   * Initializes transport used for internal messaging between collocated Uima AS services.
+   */
+  public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception {
+    // If this controller is an Aggregate Controller, force delegates to initialize
+    // their internal transports.
+    if (this instanceof AggregateAnalysisEngineController) {
+      // Get a list of all colocated delegate controllers.
+      List childControllers = ((AggregateAnalysisEngineController_impl) this).childControllerList;
+
+      for (int i = 0; i < childControllers.size(); i++) {
+        AnalysisEngineController ctrl = (AnalysisEngineController) childControllers.get(i);
+        // Force initialization
+        ctrl.initializeVMTransport(parentControllerReplyConsumerCount);
+      }
+    }
+
+    // Only delegate controllers execute the logic below
+    if (parentController != null) {
+      UimaAsContext uimaAsContext = new UimaAsContext();
+      if (!registeredWithJMXServer) {
+        registeredWithJMXServer = true;
+        registerServiceWithJMX(jmxContext, false);
+      }
+      // Determine how many consumer threads to create. First though use the parent Aggregate
+      // Controller
+      // to lookup this delegate key. Next fetch the delegate endpoint which contains
+      // concurrentConsumers property.
+      String key = ((AggregateAnalysisEngineController) parentController)
+              .lookUpDelegateKey(getName());
+      int concurrentRequestConsumers = 1;
+      int concurrentReplyConsumers = 1;
+      if (key != null) {
+        Endpoint e = ((AggregateAnalysisEngineController) parentController).lookUpEndpoint(key,
+                false);
+        concurrentRequestConsumers = e.getConcurrentRequestConsumers();
+        concurrentReplyConsumers = e.getConcurrentReplyConsumers();
+      }
+
+      System.out.println("Controller:" + getComponentName() + " Starting Request Listener With "
+              + concurrentRequestConsumers
+              + " Concurrent Consumers. Reply Listener Configured With " + concurrentReplyConsumers
+              + " Concurrent Consumers");
+
+      uimaAsContext.setConcurrentConsumerCount(concurrentRequestConsumers);
+      uimaAsContext.put("EndpointName", endpointName);
+
+      UimaTransport vmTransport = getTransport(uimaAsContext);
+      // Creates delegate Listener for receiving requests from the parent
+      UimaMessageListener messageListener = vmTransport.produceUimaMessageListener();
+      // Plug in message handlers
+      messageListener.initialize(uimaAsContext);
+      // Store the listener
+      messageListeners.put(getName(), messageListener);
+      // Creates parent controller dispatcher for this delegate. The dispatcher is wired
+      // with this delegate's listener.
+      UimaAsContext uimaAsContext2 = new UimaAsContext();
+      // Set up as many reply threads as there are threads to process requests
+      uimaAsContext2.setConcurrentConsumerCount(concurrentReplyConsumers);
+      uimaAsContext2.put("EndpointName", endpointName);
+      UimaTransport parentVmTransport = parentController.getTransport(uimaAsContext2, endpointName);
+
+      parentVmTransport.produceUimaMessageDispatcher(vmTransport);
+      // Creates parent listener for receiving replies from this delegate.
+      UimaMessageListener parentListener = parentVmTransport.produceUimaMessageListener();
+      // Plug in message handlers
+      parentListener.initialize(uimaAsContext2);
+      // Creates delegate's dispatcher. It is wired to send replies to the parent's listener.
+      vmTransport.produceUimaMessageDispatcher(parentVmTransport);
+      // Register input queue with JMX. This is an internal (non-jms) queue where clients
+      // send requests to this service.
+      vmTransport.registerWithJMX(this, "VmInputQueue");
+      parentVmTransport.registerWithJMX(this, "VmReplyQueue");
+    }
+
+  }
+
+  public synchronized UimaMessageListener getUimaMessageListener(String aDelegateKey) {
+    return messageListeners.get(aDelegateKey);
+  }
+
+  /**
+   * Get the domain for Uima JMX. The domain includes a fixed string plus the name of the top level
+   * component. All uima ee objects are rooted at this domain.
+   */
+
+  public String getJMXDomain() {
+    // Keep calling controllers until the top level component is reached
+    if (!isTopLevelComponent()) {
+      return parentController.getJMXDomain();
+    } else {
+      // The domain includes the name of the top level component
+      return "org.apache.uima:type=ee.jms.services,s=" + getComponentName() + " Uima EE Service,";
+    }
+  }
+
+  public JmxManagement getManagementInterface() {
+    return jmxManagement;
+  }
+
+  /**
+   * Returns a unique id for each component in the service hierarchy. The top level component's id
+   * is always = 0
+   * 
+   */
+  public int getIndex() {
+    if (isTopLevelComponent()) {
+      return 0;
+    }
+    return parentController.getIndex() + 1;
+  }
+
+  private void initializeServiceStats() {
+    Statistic statistic = null;
+    if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.TotalDeserializeTime)) == null) {
+      statistic = new LongNumericStatistic(Monitor.TotalDeserializeTime);
+      getMonitor().addStatistic("", statistic);
+    }
+    if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.TotalSerializeTime)) == null) {
+      statistic = new LongNumericStatistic(Monitor.TotalSerializeTime);
+      getMonitor().addStatistic("", statistic);
+    }
+    if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.IdleTime)) == null) {
+      statistic = new LongNumericStatistic(Monitor.IdleTime);
+      getMonitor().addStatistic("", statistic);
+    }
+    if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessCount)) == null) {
+      statistic = new LongNumericStatistic(Monitor.ProcessCount);
+      getMonitor().addStatistic("", statistic);
+    }
+    if (this instanceof PrimitiveAnalysisEngineController) {
+      if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessErrorCount)) == null) {
+        statistic = new LongNumericStatistic(Monitor.ProcessErrorCount);
+        getMonitor().addStatistic("", statistic);
+      }
+      if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.TotalProcessErrorCount)) == null) {
+        statistic = new LongNumericStatistic(Monitor.TotalProcessErrorCount);
+        getMonitor().addStatistic("", statistic);
+      }
+      if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.TotalAEProcessTime)) == null) {
+        statistic = new LongNumericStatistic(Monitor.TotalAEProcessTime);
+        getMonitor().addStatistic("", statistic);
+      }
+    }
+  }
+
+  private void removeFromJmxServer(ObjectName anMBean) throws Exception {
+    jmxManagement.unregisterMBean(anMBean);
+  }
+
+  /**
+   * This is called once during initialization to compute the position of the component in the JMX
+   * hierarchy and create a context path that will be used to register the component in the JMX
+   * registry.
+   */
+  public String getJmxContext() {
+    if (isTopLevelComponent()) {
+      if (this instanceof AggregateAnalysisEngineController) {
+        return "p0=" + getComponentName() + " Components";
+      } else if (this instanceof PrimitiveAnalysisEngineController) {
+        return "p0=" + getComponentName() + " Uima EE";
+      }
+
+    }
+    // Get the position of the component in the hierarchy. Each component
+    // is registered with a unique context string that is composed of
+    // the domain+<key,value> pair, where the key=p+<index>. The index is
+    // incremented for every component. An example of a hierarchy would be
+    // something like:
+    // <domain>,s=<service name>,p0=<service name>,p1=<aggregate service>,p2=<delegate service>
+
+    int index = getIndex();
+    String parentContext = parentController.getJmxContext();
+    if (parentController.isTopLevelComponent()) {
+      index = 1;
+    }
+    if (this instanceof AggregateAnalysisEngineController) {
+      String thisComponentName = getComponentName();
+      if (!isTopLevelComponent() && endpointName != null) {
+        thisComponentName = ((AggregateAnalysisEngineController) parentController)
+                .lookUpDelegateKey(endpointName);
+      }
+      return parentContext + ",p" + index + "=" + thisComponentName + " Components";
+    } else {
+      return parentContext + ",p" + index + "=";
+    }
+  }
+
+  /**
+   * Register a component with a given name with JMX MBeanServer
+   * 
+   * @param o
+   *          - component to register with JMX
+   * @param aName
+   *          - full jmx context name for the component
+   */
+  protected void registerWithAgent(Object o, String aName) {
+    try {
+      ObjectName on = new ObjectName(aName);
+      jmxManagement.registerMBean(o, on);
+    } catch (Exception e) {
+      // Log and move on
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "setListenerContainer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
-                    new Object[] { e });
+                "setListenerContainer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_exception__WARNING", new Object[] { e });
       }
-		}
-	}
-	public void registerVmQueueWithJMX( Object o, String aName ) throws Exception {
-	  String jmxName = getManagementInterface().getJmxDomain()
-            +jmxContext+",name="+getComponentName()+"_"+aName;
-	  registerWithAgent( o, jmxName);
-	  
-	  if ( "VmReplyQueue".equals( aName )) {
-	    getServiceInfo().setReplyQueueName(jmxName);
-	  } else {
-	    getServiceInfo().setInputQueueName(jmxName);
-	  }
-	}
-	protected void registerServiceWithJMX(String key_value_list, boolean remote) 
-	{
-		String thisComponentName = getComponentName();
-		
-		String name = "";
-		getIndex(); 
-		servicePerformance = new ServicePerformance(this);
-		name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+servicePerformance.getLabel();
-		
-		registerWithAgent(servicePerformance, name );
-		servicePerformance.setIdleTime(System.nanoTime());
+    }
+  }
+
+  public void registerVmQueueWithJMX(Object o, String aName) throws Exception {
+    String jmxName = getManagementInterface().getJmxDomain() + jmxContext + ",name="
+            + getComponentName() + "_" + aName;
+    registerWithAgent(o, jmxName);
+
+    if ("VmReplyQueue".equals(aName)) {
+      getServiceInfo().setReplyQueueName(jmxName);
+    } else {
+      getServiceInfo().setInputQueueName(jmxName);
+    }
+  }
+
+  protected void registerServiceWithJMX(String key_value_list, boolean remote) {
+    String thisComponentName = getComponentName();
+
+    String name = "";
+    getIndex();
+    servicePerformance = new ServicePerformance(this);
+    name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
+            + servicePerformance.getLabel();
+
+    registerWithAgent(servicePerformance, name);
+    servicePerformance.setIdleTime(System.nanoTime());
     ServiceInfo serviceInfo = null;
-		if ( remote )
-		{
-	    serviceInfo = getInputChannel().getServiceInfo();
-		}
-		else
-		{
+    if (remote) {
+      serviceInfo = getInputChannel().getServiceInfo();
+    } else {
       serviceInfo = new ServiceInfo();
       serviceInfo.setBrokerURL(getBrokerURL());
       serviceInfo.setInputQueueName(getName());
       serviceInfo.setState("Active");
-		}
-		ServiceInfo pServiceInfo = null;
+    }
+    ServiceInfo pServiceInfo = null;
 
-		if ( this instanceof PrimitiveAnalysisEngineController )
-		{
-			pServiceInfo = ((PrimitiveAnalysisEngineController)this).getServiceInfo();
-			servicePerformance.setProcessThreadCount(((PrimitiveAnalysisEngineController)this).getServiceInfo().getAnalysisEngineInstanceCount());
-		}
-		else
-		{
-			pServiceInfo = 
-				((AggregateAnalysisEngineController)this).getServiceInfo();
-			pServiceInfo.setAggregate(true);
-		}
-    //  If this is a Cas Multiplier, add the key to the JMX MBean.
-    //  This will help the JMX Monitor to fetch the CM Cas Pool MBean
-    if ( isCasMultiplier() )
-    {
+    if (this instanceof PrimitiveAnalysisEngineController) {
+      pServiceInfo = ((PrimitiveAnalysisEngineController) this).getServiceInfo();
+      servicePerformance.setProcessThreadCount(((PrimitiveAnalysisEngineController) this)
+              .getServiceInfo().getAnalysisEngineInstanceCount());
+    } else {
+      pServiceInfo = ((AggregateAnalysisEngineController) this).getServiceInfo();
+      pServiceInfo.setAggregate(true);
+    }
+    // If this is a Cas Multiplier, add the key to the JMX MBean.
+    // This will help the JMX Monitor to fetch the CM Cas Pool MBean
+    if (isCasMultiplier()) {
       pServiceInfo.setServiceKey(getUimaContextAdmin().getQualifiedContextName());
     }
 
-		
-		if ( pServiceInfo != null )
-		{
-			name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceInfo.getLabel();
-			if ( !isTopLevelComponent() )
-			{
-				pServiceInfo.setBrokerURL("Embedded Broker");
-			}
-			else
-			{
-				pServiceInfo.setTopLevel();
-			}
-			if ( isCasMultiplier())
-			{
-				pServiceInfo.setCASMultiplier();
-			}
-			registerWithAgent(pServiceInfo, name );
-		}
-
-		serviceErrors = new ServiceErrors();
-		name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceErrors.getLabel();
-		registerWithAgent(serviceErrors, name );
-	}
-
-	protected void cleanUp() throws Exception
-	{
-		if ( inProcessCache != null && isTopLevelComponent() )
-		{
-			ObjectName on = new ObjectName(inProcessCache.getName() );
-			removeFromJmxServer(on);
-		}
-	}
-
-	/**
-	 * Override the default JmxManager
-	 */
-	public void setJmxManagement(JmxManagement aJmxManagement)
-	{
-		jmxManagement = aJmxManagement;
-	}
-	
-	
-	private void initializeComponentCasPool(int aComponentCasPoolSize, long anInitialCasHeapSize )
-	{
-		if (aComponentCasPoolSize > 0)
-		{
-			EECasManager_impl cm = (EECasManager_impl) getResourceManager().getCasManager();
-			cm.setInitialCasHeapSize(anInitialCasHeapSize);
+    if (pServiceInfo != null) {
+      name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
+              + serviceInfo.getLabel();
+      if (!isTopLevelComponent()) {
+        pServiceInfo.setBrokerURL("Embedded Broker");
+      } else {
+        pServiceInfo.setTopLevel();
+      }
+      if (isCasMultiplier()) {
+        pServiceInfo.setCASMultiplier();
+      }
+      registerWithAgent(pServiceInfo, name);
+    }
+
+    serviceErrors = new ServiceErrors();
+    name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
+            + serviceErrors.getLabel();
+    registerWithAgent(serviceErrors, name);
+  }
+
+  protected void cleanUp() throws Exception {
+    if (inProcessCache != null && isTopLevelComponent()) {
+      ObjectName on = new ObjectName(inProcessCache.getName());
+      removeFromJmxServer(on);
+    }
+  }
+
+  /**
+   * Override the default JmxManager
+   */
+  public void setJmxManagement(JmxManagement aJmxManagement) {
+    jmxManagement = aJmxManagement;
+  }
+
+  private void initializeComponentCasPool(int aComponentCasPoolSize, long anInitialCasHeapSize) {
+    if (aComponentCasPoolSize > 0) {
+      EECasManager_impl cm = (EECasManager_impl) getResourceManager().getCasManager();
+      cm.setInitialCasHeapSize(anInitialCasHeapSize);
       cm.setPoolSize(getUimaContextAdmin().getUniqueName(), aComponentCasPoolSize);
-			System.out.println("Component:"+getComponentName()+" Cas Pool:"+getUimaContextAdmin().getQualifiedContextName()+" Size:"+aComponentCasPoolSize+" Cas Heap Size:"+anInitialCasHeapSize/4 +" cells");
+      System.out.println("Component:" + getComponentName() + " Cas Pool:"
+              + getUimaContextAdmin().getQualifiedContextName() + " Size:" + aComponentCasPoolSize
+              + " Cas Heap Size:" + anInitialCasHeapSize / 4 + " cells");
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-	                "initializeComponentCasPool", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_pool_config_INFO",
-	                new Object[] { getComponentName(), getUimaContextAdmin().getQualifiedContextName(), aComponentCasPoolSize, anInitialCasHeapSize/4});
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.INFO,
+                CLASS_NAME.getName(),
+                "initializeComponentCasPool",
+                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_cas_pool_config_INFO",
+                new Object[] { getComponentName(), getUimaContextAdmin().getQualifiedContextName(),
+                    aComponentCasPoolSize, anInitialCasHeapSize / 4 });
+      }
+    }
+
+  }
+
+  public boolean isTopLevelComponent() {
+    return (parentController == null);
+  }
+
+  /**
+   * Returns the name of the component. The name comes from the analysis engine descriptor
+   */
+  public String getComponentName() {
+    return ((ResourceCreationSpecifier) resourceSpecifier).getMetaData().getName();
+  }
+
+  /**
+   * Print the component name rather than the class name
+   */
+  public String toString() {
+    return getComponentName();
+  }
+
+  public void addTimeSnapshot(long snapshot, String aKey) {
+    if (timeSnapshotMap.containsKey(aKey)) {
+      timeSnapshotMap.remove(aKey);
+    }
+    timeSnapshotMap.put(aKey, snapshot);
+
+  }
+
+  public void addServiceInfo(ServiceInfo aServiceInfo) {
+    ServiceInfo sInfo = null;
+
+    if (this instanceof PrimitiveAnalysisEngineController) {
+      sInfo = ((PrimitiveAnalysisEngineController) this).getServiceInfo();
+    } else if (this instanceof AggregateAnalysisEngineController) {
+      sInfo = ((AggregateAnalysisEngineController) this).getServiceInfo();
+    }
+    if (sInfo != null) {
+      sInfo.setBrokerURL(aServiceInfo.getBrokerURL());
+      sInfo.setInputQueueName(aServiceInfo.getInputQueueName());
+      sInfo.setState(aServiceInfo.getState());
+      sInfo.setDeploymentDescriptor(deploymentDescriptor);
+      if (isCasMultiplier()) {
+        sInfo.setCASMultiplier();
       }
-		}
+    } else {
+      System.out.println("!!!!!!!!!!!!!!! ServiceInfo instance is NULL");
+    }
+
+  }
+
+  public long getTimeSnapshot(String aKey) {
+    if (timeSnapshotMap.containsKey(aKey)) {
+      Object value = timeSnapshotMap.get(aKey);
+      if (value != null) {
+        return ((Long) value).longValue();
+      }
+    }
+    return 0;
 
-	}
+  }
+
+  public ServicePerformance getServicePerformance() {
+    return servicePerformance;
+  }
 
-	public boolean isTopLevelComponent()
-	{
-		return (parentController == null);
-	}
-	
-	/**
-	 * Returns the name of the component. The name comes from the analysis engine descriptor
-	 */
-	public String getComponentName()
-	{
-		return ((ResourceCreationSpecifier)resourceSpecifier).getMetaData().getName();
-	}
-	
-	/**
-	 * Print the component name rather than the class name
-	 */
-	public String toString()
-	{
-		return getComponentName();
-	}
-	
-	public void addTimeSnapshot( long snapshot, String aKey )
-	{
-		if ( timeSnapshotMap.containsKey(aKey) )
-		{
-			timeSnapshotMap.remove(aKey);
-		}
-		timeSnapshotMap.put(aKey, snapshot);
-		
-	}
-	public void addServiceInfo( ServiceInfo aServiceInfo )
-	{
-		ServiceInfo sInfo = null;
-
-		if ( this instanceof PrimitiveAnalysisEngineController )
-		{
-			sInfo = 
-				((PrimitiveAnalysisEngineController)this).getServiceInfo();
-		}
-		else if ( this instanceof AggregateAnalysisEngineController )
-		{
-			sInfo = 
-				((AggregateAnalysisEngineController)this).getServiceInfo();
-		}
-		if ( sInfo != null )
-		{
-			sInfo.setBrokerURL(aServiceInfo.getBrokerURL());
-			sInfo.setInputQueueName(aServiceInfo.getInputQueueName());
-			sInfo.setState(aServiceInfo.getState());
-			sInfo.setDeploymentDescriptor(deploymentDescriptor);
-			if ( isCasMultiplier())
-			{
-				sInfo.setCASMultiplier();
-			}
-		}
-		else
-		{
-			System.out.println("!!!!!!!!!!!!!!! ServiceInfo instance is NULL");
-		}
-
-	}
-	public long getTimeSnapshot( String aKey )
-	{
-		if ( timeSnapshotMap.containsKey(aKey) )
-		{
-		  Object value = timeSnapshotMap.get(aKey);
-		  if (value != null) {
-	      return ((Long)value).longValue();
-		  }
-		}
-		return 0;
-		
-	}
-	public ServicePerformance getServicePerformance()
-	{
-		return servicePerformance;
-	}
-	public ServiceErrors getServiceErrors()
-	{
-		return serviceErrors;
-	}
-
-	public UimaContext getChildUimaContext(String aDelegateEndpointName) throws Exception
-	{
-		if (this instanceof AggregateAnalysisEngineController)
-		{
-			String key = ((AggregateAnalysisEngineController) this).lookUpDelegateKey(aDelegateEndpointName);
-			if (key == null )
-			{
-				if ( ((AggregateAnalysisEngineController) this).isDelegateKeyValid(aDelegateEndpointName) )
-				{
-					key = aDelegateEndpointName;
-				}
-			}
-			
-			if ( key == null )
-			{
-				throw new AsynchAEException(getName()+"-Unable to look up delegate "+aDelegateEndpointName+" in internal map");
-			}
-			UimaContextAdmin uctx = getUimaContextAdmin();
-
-			// retrieve the sofa mappings for input/output sofas of this analysis engine
-			HashMap sofamap = new HashMap();
-			if (resourceSpecifier instanceof AnalysisEngineDescription)
-			{
-				AnalysisEngineDescription desc = (AnalysisEngineDescription) resourceSpecifier;
-				SofaMapping[] sofaMappings = desc.getSofaMappings();
-				if (sofaMappings != null && sofaMappings.length > 0)
-				{
-					for (int s = 0; s < sofaMappings.length; s++)
-					{
-						// the mapping is for this analysis engine
-						if (sofaMappings[s].getComponentKey().equals(key))
-						{
-							// if component sofa name is null, replace it with
-							// the default for TCAS sofa name
-							// This is to support old style TCAS
-							if (sofaMappings[s].getComponentSofaName() == null)
-								sofaMappings[s].setComponentSofaName(CAS.NAME_DEFAULT_SOFA);
-							sofamap.put(sofaMappings[s].getComponentSofaName(), sofaMappings[s].getAggregateSofaName());
-						}
-					}
-				}
-			}
-			// create child UimaContext and insert into mInitParams map
-			return uctx.createChild(key, sofamap);
-		}
-		return null;
-	}
-
-	public void setInputChannel(InputChannel anInputChannel) throws Exception
-	{
-		inputChannel = anInputChannel;
-		inputChannelList.add(anInputChannel);
-
-		inputChannelLatch.countDown();
-		if ( !registeredWithJMXServer )
-		{
-			registeredWithJMXServer = true;
-			registerServiceWithJMX(jmxContext, false);
-		}
-	}
-	public void addInputChannel( InputChannel anInputChannel )
-	{
-		if ( !inputChannelMap.containsKey(anInputChannel.getInputQueueName()))
-		{
+  public ServiceErrors getServiceErrors() {
+    return serviceErrors;
+  }
+
+  public UimaContext getChildUimaContext(String aDelegateEndpointName) throws Exception {
+    if (this instanceof AggregateAnalysisEngineController) {
+      String key = ((AggregateAnalysisEngineController) this)
+              .lookUpDelegateKey(aDelegateEndpointName);
+      if (key == null) {
+        if (((AggregateAnalysisEngineController) this).isDelegateKeyValid(aDelegateEndpointName)) {
+          key = aDelegateEndpointName;
+        }
+      }
+
+      if (key == null) {
+        throw new AsynchAEException(getName() + "-Unable to look up delegate "
+                + aDelegateEndpointName + " in internal map");
+      }
+      UimaContextAdmin uctx = getUimaContextAdmin();
+
+      // retrieve the sofa mappings for input/output sofas of this analysis engine
+      HashMap sofamap = new HashMap();
+      if (resourceSpecifier instanceof AnalysisEngineDescription) {
+        AnalysisEngineDescription desc = (AnalysisEngineDescription) resourceSpecifier;
+        SofaMapping[] sofaMappings = desc.getSofaMappings();
+        if (sofaMappings != null && sofaMappings.length > 0) {
+          for (int s = 0; s < sofaMappings.length; s++) {
+            // the mapping is for this analysis engine
+            if (sofaMappings[s].getComponentKey().equals(key)) {
+              // if component sofa name is null, replace it with
+              // the default for TCAS sofa name
+              // This is to support old style TCAS
+              if (sofaMappings[s].getComponentSofaName() == null)
+                sofaMappings[s].setComponentSofaName(CAS.NAME_DEFAULT_SOFA);
+              sofamap.put(sofaMappings[s].getComponentSofaName(), sofaMappings[s]
+                      .getAggregateSofaName());
+            }
+          }
+        }
+      }
+      // create child UimaContext and insert into mInitParams map
+      return uctx.createChild(key, sofamap);
+    }
+    return null;
+  }
+
+  public void setInputChannel(InputChannel anInputChannel) throws Exception {
+    inputChannel = anInputChannel;
+    inputChannelList.add(anInputChannel);
+
+    inputChannelLatch.countDown();
+    if (!registeredWithJMXServer) {
+      registeredWithJMXServer = true;
+      registerServiceWithJMX(jmxContext, false);
+    }
+  }
+
+  public void addInputChannel(InputChannel anInputChannel) {
+    if (!inputChannelMap.containsKey(anInputChannel.getInputQueueName())) {
       inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
-      if ( inputChannelList.contains(anInputChannel)) {
+      if (inputChannelList.contains(anInputChannel)) {
         inputChannelList.add(anInputChannel);
       }
-		}
-	}
-	public InputChannel getInputChannel()
-	{
-		try
-		{
-			inputChannelLatch.await();
-			
-		}
-		catch( Exception e){}
-		
-		return inputChannel;
-	}
-
-	public void dropCAS(CAS aCAS)
-	{
-		if (aCAS != null)
-		{
-		  //  Check if this method was called while another thread is stopping the service.
-		  //  This is a special case. Normally the releasedAllCASes is false. It is only
-		  //  true if another thread forcefully released CASes. 
-		  if ( releasedAllCASes ) {
-		    //  All CASes could have been forcefully released in the stop() method. If another
-		    //  thread was operating on a CAS while the stop() was releasing the CAS we may
-		    //  get an exception which will be ignored. We are shutting down. The forceful 
-		    //  CAS release is not part of the graceful. In that case, stop() is only called
-		    //  when ALL CASes are fully processed. Only than stop() is called and since ALL
-		    //  CASes are released at that point we would not see any exceptions.
-		    try {
-		      aCAS.release();
-		    } catch( Exception e) {}
-		  } else {
-	      aCAS.release();
-		  }
-		  
-		}
-
-	}
-	
-	
-	public synchronized void saveReplyTime( long snapshot, String aKey )
-	{
-		replyTime = snapshot;
-	}
-	
-	public synchronized long getReplyTime()
-	{
-		return replyTime;
-	}
-
-	protected void handleAction( String anAction, String anEndpoint, ErrorContext anErrorContext )
-	throws Exception
-	{
-		
-		
-		String casReferenceId = null;
-		if ( anErrorContext != null )
-		{
-			casReferenceId = (String)anErrorContext.get( AsynchAEMessage.CasReference);
-		}
-		
-		if ( ErrorHandler.TERMINATE.equalsIgnoreCase(anAction))
-		{
-			//	Propagate terminate event to the top controller and begin shutdown of this service along
-			//	with all collocated delegates (if any)
-		  if ( anErrorContext != null && anErrorContext.containsKey(ErrorContext.THROWABLE_ERROR) && anErrorContext.containsKey(AsynchAEMessage.CasReference)) {
-        terminate((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR), (String) anErrorContext.get(AsynchAEMessage.CasReference));
-		  } else {
-	      terminate();
-		  }
-		}
-		else if ( ErrorHandler.DISABLE.equalsIgnoreCase(anAction)  )
-		{
-
-				if ( anEndpoint != null )
-				{
-				  Endpoint endpoint = null;
-					List list = new ArrayList();
-					String key = "";
-					if ( (endpoint = ((AggregateAnalysisEngineController)this).lookUpEndpoint(anEndpoint, false)) == null )
-					{
-						key = ((AggregateAnalysisEngineController)this).lookUpDelegateKey(anEndpoint);
-						endpoint = ((AggregateAnalysisEngineController)this).lookUpEndpoint(key, false);
-						list.add(key);
-					}
-					else
-					{
-						key = anEndpoint;
-						list.add(anEndpoint);
-					}
-          ((AggregateAnalysisEngineController_impl)this).disableDelegates(list,casReferenceId);
-
-          if ( key != null && key.trim().length() > 0) {
-            //  Delegate has been disabled. Cleanup Delegate's lists. Each Delegate
-            //  maintains a list of CASes pending reply and a different list of CASes
-            //  pending dispatch. The first list contains CASes sent to the delegate.
-            //  When a reply is received from the delegate, the CAS is removed from
-            //  the list. The second list contains CASes that have been delayed
-            //  while the service was in the TIMEDOUT state. These CASes were to 
-            //  be dispatched to the delegate once its state is reset to OK. It is
-            //  reset to OK state when the delegate responds to the client PING
-            //  request. Since we have disabled the delegate, remove ALL CASes from
-            //  both lists and send them through the ErrorHandler one at a time 
-            //  as if these CASes timed out.
-            
-            Delegate delegate = ((AggregateAnalysisEngineController)this).lookupDelegate(key);
-            //  Cancel the delegate timer. No more responses are expected 
-            delegate.cancelDelegateTimer();
-            //  Check if we should force timeout on all CASes in a pending state. If this
-            //  method is called from ProcessCasErrorHandler we will skip this since we
-            //  want to first completely handle the CAS exception. Once that CAS exception 
-            //  is handled, the ProcessCasErrorHandler will call forceTimeoutOnPendingCases
-            //  to time out CASes in pending lists
-            if ( anErrorContext.containsKey( AsynchAEMessage.SkipPendingLists) == false ) {
-              //  If the delegate has CASes pending reply still, send each CAS
-              //  from the pending list through the error handler with 
-              //  MessageTimeoutException as a cause of error
-              forceTimeoutOnPendingCases(key);
+    }
+  }
+
+  public InputChannel getInputChannel() {
+    try {
+      inputChannelLatch.await();
+
+    } catch (Exception e) {
+    }
+
+    return inputChannel;
+  }
+
+  public void dropCAS(CAS aCAS) {
+    if (aCAS != null) {
+      // Check if this method was called while another thread is stopping the service.
+      // This is a special case. Normally the releasedAllCASes is false. It is only
+      // true if another thread forcefully released CASes.
+      if (releasedAllCASes) {
+        // All CASes could have been forcefully released in the stop() method. If another
+        // thread was operating on a CAS while the stop() was releasing the CAS we may
+        // get an exception which will be ignored. We are shutting down. The forceful
+        // CAS release is not part of the graceful. In that case, stop() is only called
+        // when ALL CASes are fully processed. Only than stop() is called and since ALL
+        // CASes are released at that point we would not see any exceptions.
+        try {
+          aCAS.release();
+        } catch (Exception e) {
+        }
+      } else {
+        aCAS.release();
+      }
+
+    }
+
+  }
+
+  public synchronized void saveReplyTime(long snapshot, String aKey) {
+    replyTime = snapshot;
+  }
+
+  public synchronized long getReplyTime() {
+    return replyTime;
+  }
+
+  protected void handleAction(String anAction, String anEndpoint, ErrorContext anErrorContext)
+          throws Exception {
+
+    String casReferenceId = null;
+    if (anErrorContext != null) {
+      casReferenceId = (String) anErrorContext.get(AsynchAEMessage.CasReference);
+    }
+
+    if (ErrorHandler.TERMINATE.equalsIgnoreCase(anAction)) {
+      // Propagate terminate event to the top controller and begin shutdown of this service along
+      // with all collocated delegates (if any)
+      if (anErrorContext != null && anErrorContext.containsKey(ErrorContext.THROWABLE_ERROR)
+              && anErrorContext.containsKey(AsynchAEMessage.CasReference)) {
+        terminate((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR),
+                (String) anErrorContext.get(AsynchAEMessage.CasReference));
+      } else {
+        terminate();
+      }
+    } else if (ErrorHandler.DISABLE.equalsIgnoreCase(anAction)) {
+
+      if (anEndpoint != null) {
+        Endpoint endpoint = null;
+        List list = new ArrayList();
+        String key = "";
+        if ((endpoint = ((AggregateAnalysisEngineController) this)
+                .lookUpEndpoint(anEndpoint, false)) == null) {
+          key = ((AggregateAnalysisEngineController) this).lookUpDelegateKey(anEndpoint);
+          endpoint = ((AggregateAnalysisEngineController) this).lookUpEndpoint(key, false);
+          list.add(key);
+        } else {
+          key = anEndpoint;
+          list.add(anEndpoint);
+        }
+        ((AggregateAnalysisEngineController_impl) this).disableDelegates(list, casReferenceId);
+
+        if (key != null && key.trim().length() > 0) {
+          // Delegate has been disabled. Cleanup Delegate's lists. Each Delegate
+          // maintains a list of CASes pending reply and a different list of CASes
+          // pending dispatch. The first list contains CASes sent to the delegate.
+          // When a reply is received from the delegate, the CAS is removed from
+          // the list. The second list contains CASes that have been delayed
+          // while the service was in the TIMEDOUT state. These CASes were to
+          // be dispatched to the delegate once its state is reset to OK. It is
+          // reset to OK state when the delegate responds to the client PING
+          // request. Since we have disabled the delegate, remove ALL CASes from
+          // both lists and send them through the ErrorHandler one at a time
+          // as if these CASes timed out.
+
+          Delegate delegate = ((AggregateAnalysisEngineController) this).lookupDelegate(key);
+          // Cancel the delegate timer. No more responses are expected
+          delegate.cancelDelegateTimer();
+          // Check if we should force timeout on all CASes in a pending state. If this
+          // method is called from ProcessCasErrorHandler we will skip this since we
+          // want to first completely handle the CAS exception. Once that CAS exception
+          // is handled, the ProcessCasErrorHandler will call forceTimeoutOnPendingCases
+          // to time out CASes in pending lists
+          if (anErrorContext.containsKey(AsynchAEMessage.SkipPendingLists) == false) {
+            // If the delegate has CASes pending reply still, send each CAS
+            // from the pending list through the error handler with
+            // MessageTimeoutException as a cause of error
+            forceTimeoutOnPendingCases(key);
+          }
+        }
+
+        if (endpoint != null) {
+          try {
+            // Fetch all Cas entries currently awaiting reply from the delegate that was just
+            // disabled. In case the delegate was in a parallel step, we need to adjust the
+            // the number of responses received to account for the failed delegate. This
+            // enables the processing to continue.
+            CacheEntry[] entries = getInProcessCache().getCacheEntriesForEndpoint(
+                    endpoint.getEndpoint());
+            if (entries != null) {
+              for (int i = 0; i < entries.length; i++) {
+                CasStateEntry cse = localCache.lookupEntry(entries[i].getCasReferenceId());
+                // Check if this is a parallel step
+                int parallelDelegateCount = cse.getNumberOfParallelDelegates();
+                // Check if all delegated responded
+                if (parallelDelegateCount > 1
+                        && cse.howManyDelegatesResponded() < parallelDelegateCount) {
+                  // increment responders
+                  cse.incrementHowManyDelegatesResponded();
+                }
+              }
             }
+          } catch (Exception e) {
+          }
+        }
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                  "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAEE_disabled_delegate_INFO", new Object[] { getComponentName(), key });
+        }
+      }
+    } else if (ErrorHandler.CONTINUE.equalsIgnoreCase(anAction)) {
+      if (anEndpoint != null) {
+        String key = ((AggregateAnalysisEngineController) this).lookUpDelegateKey(anEndpoint);
+        Exception ex = (Exception) anErrorContext.get(ErrorContext.THROWABLE_ERROR);
+        boolean continueOnError = ((AggregateAnalysisEngineController) this).continueOnError(
+                casReferenceId, key, ex);
+        if (continueOnError) {
+          CacheEntry entry = null;
+          try {
+            entry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
+          } catch (AsynchAEException e) {
+            System.out.println("Controller:" + getComponentName() + " CAS:" + casReferenceId
+                    + " Not Found In Cache");
           }
+          CAS cas = null;
+          // Make sure that the ErrorHandler did not drop the cache entry and the CAS
+          if (entry != null && ((cas = entry.getCas()) != null)) {
+            ((AggregateAnalysisEngineController) this).process(cas, casReferenceId);
+          }
+        }
+      }
+    } else if (ErrorHandler.DROPCAS.equalsIgnoreCase(anAction)) {
+      if (casReferenceId != null) {
+        dropCAS(casReferenceId, true);
+      }
+    }
+
+  }
 
-          if ( endpoint != null ) {
-	          try {
-	            // Fetch all Cas entries currently awaiting reply from the delegate that was just
-	            // disabled. In case the delegate was in a parallel step, we need to adjust the
-	            // the number of responses received to account for the failed delegate. This 
-	            // enables the processing to continue.
-	            CacheEntry[] entries = getInProcessCache().getCacheEntriesForEndpoint(endpoint.getEndpoint());
-	            if ( entries != null ) {
-	              for ( int i=0; i < entries.length; i++ ) {
-	                CasStateEntry cse = localCache.lookupEntry(entries[i].getCasReferenceId());
-	                // Check if this is a parallel step
-                  int parallelDelegateCount = cse.getNumberOfParallelDelegates();
-	                // Check if all delegated responded
-	                if ( parallelDelegateCount > 1 && cse.howManyDelegatesResponded() < parallelDelegateCount) {
-	                  // increment responders 
-	                  cse.incrementHowManyDelegatesResponded();
-	                }
-	              }
-	            }
-	          } catch( Exception e) {
-	          }
-					}
-	         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-	           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-		                "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_INFO",
-		                new Object[] { getComponentName(), key });
-	         }
-				}
-		}
-		else if ( ErrorHandler.CONTINUE.equalsIgnoreCase(anAction) )
-		{
-			if ( anEndpoint != null )
-			{
-				String key = ((AggregateAnalysisEngineController)this).lookUpDelegateKey(anEndpoint);
-				Exception ex = (Exception)anErrorContext.get(ErrorContext.THROWABLE_ERROR);
-				boolean continueOnError = 
-					((AggregateAnalysisEngineController)this).continueOnError(casReferenceId, key, ex);
-				if ( continueOnError )
-				{
-					CacheEntry entry = null;
-					try
-					{
-						entry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
-					}
-					catch( AsynchAEException e) {
-	           System.out.println("Controller:"+getComponentName()+" CAS:"+casReferenceId+" Not Found In Cache");
-					}
-                    CAS cas = null;
-					//	Make sure that the ErrorHandler did not drop the cache entry and the CAS
-					if ( entry != null && (( cas = entry.getCas()) != null ) )
-					{
-						((AggregateAnalysisEngineController)this).process(cas, casReferenceId);
-					}
-				}
-			}
-		}
-		else if(ErrorHandler.DROPCAS.equalsIgnoreCase( anAction )) 
-		{
-			if ( casReferenceId != null)
-			{
-				dropCAS(casReferenceId, true);
-			}
-		}
-
-	}
-
-	public void forceTimeoutOnPendingCases(String key ) {
-    Delegate delegate = ((AggregateAnalysisEngineController)this).lookupDelegate(key);
-    //  Cancel the delegate timer. No more responses are expected 
+  public void forceTimeoutOnPendingCases(String key) {
+    Delegate delegate = ((AggregateAnalysisEngineController) this).lookupDelegate(key);
+    // Cancel the delegate timer. No more responses are expected
     delegate.cancelDelegateTimer();
     Endpoint endpoint = delegate.getEndpoint();
-    //  If the delegate has CASes pending reply still, send each CAS
-    //  from the pending list through the error handler with 
-    //  MessageTimeoutException as a cause of error
-    while ( delegate.getCasPendingReplyListSize() > 0 ) {
+    // If the delegate has CASes pending reply still, send each CAS
+    // from the pending list through the error handler with
+    // MessageTimeoutException as a cause of error
+    while (delegate.getCasPendingReplyListSize() > 0) {
       String timedOutCasId = delegate.removeOldestCasFromOutstandingList();
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-               "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_force_cas_timeout__INFO",
-               new Object[] { getComponentName(), key, timedOutCasId, " Pending Reply List" });
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleAction",
+                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_force_cas_timeout__INFO",
+                new Object[] { getComponentName(), key, timedOutCasId, " Pending Reply List" });
       }
-      
+
       ErrorContext errorContext = new ErrorContext();
       errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
       errorContext.add(AsynchAEMessage.CasReference, timedOutCasId);
       errorContext.add(AsynchAEMessage.Endpoint, endpoint);
       getErrorHandlerChain().handle(new ForcedMessageTimeoutException(), errorContext, this);
     }
-    //  If the delegate has CASes pending dispatch, send each CAS
-    //  from the pending dispatch list through the error handler with 
-    //  MessageTimeoutException as a cause of error
-    while ( delegate.getCasPendingDispatchListSize() > 0 ) {
+    // If the delegate has CASes pending dispatch, send each CAS
+    // from the pending dispatch list through the error handler with
+    // MessageTimeoutException as a cause of error
+    while (delegate.getCasPendingDispatchListSize() > 0) {
       String timedOutCasId = delegate.removeOldestFromPendingDispatchList();
 
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),

[... 3056 lines stripped ...]