You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:22:15 UTC
svn commit: r810557 [4/7] - in
/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae:
controller/ delegate/ deploymentDescriptor/
Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=810557&r1=810556&r2=810557&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Wed Sep 2 15:22:13 2009
@@ -96,1167 +96,1092 @@
import org.apache.uima.resource.Resource_ImplBase;
import org.apache.uima.util.Level;
-public abstract class BaseAnalysisEngineController extends Resource_ImplBase
-implements AnalysisEngineController, EventSubscriber
-{
- private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
-
- private static final long DoNotProcessTTL = 30*60*1000; // 30 minute time to live
-
- protected volatile ControllerLatch latch = new ControllerLatch(this);
-
- protected ConcurrentHashMap statsMap = new ConcurrentHashMap();
-
- protected Monitor monitor = new MonitorBaseImpl();
-
- protected Endpoint clientEndpoint;
-
- private CountDownLatch inputChannelLatch = new CountDownLatch(1);
-
- private OutputChannel outputChannel;
-
- private AsynchAECasManager casManager;
-
- private InProcessCache inProcessCache;
-
- protected AnalysisEngineController parentController;
-
- private String endpointName;
-
- protected ResourceSpecifier resourceSpecifier;
-
- protected HashMap paramsMap;
-
- protected InputChannel inputChannel;
-
- protected ErrorHandlerChain errorHandlerChain;
-
- protected long errorCount = 0;
-
- protected List inputChannelList = new ArrayList();
-
- protected ConcurrentHashMap inputChannelMap = new ConcurrentHashMap();
-
- protected ConcurrentHashMap idleTimeMap = new ConcurrentHashMap();
-
- private UimaEEAdminContext adminContext;
-
- protected int componentCasPoolSize = 0;
-
- protected long replyTime = 0;
-
- protected long idleTime = 0;
-
- protected ConcurrentHashMap serviceErrorMap = new ConcurrentHashMap();
-
- private boolean registeredWithJMXServer = false;
- protected String jmxContext = "";
-
- protected ConcurrentHashMap mBeanMap = new ConcurrentHashMap();
-
- protected ServicePerformance servicePerformance = null;
-
- protected ServiceErrors serviceErrors = null;
-
- protected ConcurrentHashMap timeSnapshotMap = new ConcurrentHashMap();
-
- private String deploymentDescriptor = "";
-
- private JmxManagement jmxManagement = null;
-
- protected volatile boolean stopped = false;
-
- protected String delegateKey = null;
-
- protected List unregisteredDelegateList = new ArrayList();
-
- protected volatile boolean allDelegatesAreRemote = false;
-
- protected List controllerListeners = new ArrayList();
-
- protected volatile boolean serviceInitialized = false;
-
- protected ConcurrentHashMap perCasStatistics = new ConcurrentHashMap();
-
- private volatile boolean casMultiplier = false;
-
- protected Object syncObject = new Object();
-
- // Map holding outstanding CASes produced by Cas Multiplier that have to be acked
- protected ConcurrentHashMap cmOutstandingCASes = new ConcurrentHashMap();
-
- private Object mux = new Object();
-
- private Object waitmux = new Object();
-
- private volatile boolean waitingForCAS = false;
-
- private long startTime = System.nanoTime();
-
- private long totalWaitTimeForCAS = 0;
-
- private long lastCASWaitTimeUpdate = 0;
-
- private Map<Long, AnalysisThreadState> threadStateMap =
- new HashMap<Long,AnalysisThreadState>();
-
- protected final Object finalStepMux = new Object();
-
- protected ConcurrentHashMap<String, UimaTransport> transports
- = new ConcurrentHashMap<String, UimaTransport>();
-
- protected ConcurrentHashMap<String, UimaMessageListener> messageListeners
- = new ConcurrentHashMap<String, UimaMessageListener>();
-
+public abstract class BaseAnalysisEngineController extends Resource_ImplBase implements
+ AnalysisEngineController, EventSubscriber {
+ private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
+
+ private static final long DoNotProcessTTL = 30 * 60 * 1000; // 30 minute time to live
+
+ protected volatile ControllerLatch latch = new ControllerLatch(this);
+
+ protected ConcurrentHashMap statsMap = new ConcurrentHashMap();
+
+ protected Monitor monitor = new MonitorBaseImpl();
+
+ protected Endpoint clientEndpoint;
+
+ private CountDownLatch inputChannelLatch = new CountDownLatch(1);
+
+ private OutputChannel outputChannel;
+
+ private AsynchAECasManager casManager;
+
+ private InProcessCache inProcessCache;
+
+ protected AnalysisEngineController parentController;
+
+ private String endpointName;
+
+ protected ResourceSpecifier resourceSpecifier;
+
+ protected HashMap paramsMap;
+
+ protected InputChannel inputChannel;
+
+ protected ErrorHandlerChain errorHandlerChain;
+
+ protected long errorCount = 0;
+
+ protected List inputChannelList = new ArrayList();
+
+ protected ConcurrentHashMap inputChannelMap = new ConcurrentHashMap();
+
+ protected ConcurrentHashMap idleTimeMap = new ConcurrentHashMap();
+
+ private UimaEEAdminContext adminContext;
+
+ protected int componentCasPoolSize = 0;
+
+ protected long replyTime = 0;
+
+ protected long idleTime = 0;
+
+ protected ConcurrentHashMap serviceErrorMap = new ConcurrentHashMap();
+
+ private boolean registeredWithJMXServer = false;
+
+ protected String jmxContext = "";
+
+ protected ConcurrentHashMap mBeanMap = new ConcurrentHashMap();
+
+ protected ServicePerformance servicePerformance = null;
+
+ protected ServiceErrors serviceErrors = null;
+
+ protected ConcurrentHashMap timeSnapshotMap = new ConcurrentHashMap();
+
+ private String deploymentDescriptor = "";
+
+ private JmxManagement jmxManagement = null;
+
+ protected volatile boolean stopped = false;
+
+ protected String delegateKey = null;
+
+ protected List unregisteredDelegateList = new ArrayList();
+
+ protected volatile boolean allDelegatesAreRemote = false;
+
+ protected List controllerListeners = new ArrayList();
+
+ protected volatile boolean serviceInitialized = false;
+
+ protected ConcurrentHashMap perCasStatistics = new ConcurrentHashMap();
+
+ private volatile boolean casMultiplier = false;
+
+ protected Object syncObject = new Object();
+
+ // Map holding outstanding CASes produced by Cas Multiplier that have to be acked
+ protected ConcurrentHashMap cmOutstandingCASes = new ConcurrentHashMap();
+
+ private Object mux = new Object();
+
+ private Object waitmux = new Object();
+
+ private volatile boolean waitingForCAS = false;
+
+ private long startTime = System.nanoTime();
+
+ private long totalWaitTimeForCAS = 0;
+
+ private long lastCASWaitTimeUpdate = 0;
+
+ private Map<Long, AnalysisThreadState> threadStateMap = new HashMap<Long, AnalysisThreadState>();
+
+ protected final Object finalStepMux = new Object();
+
+ protected ConcurrentHashMap<String, UimaTransport> transports = new ConcurrentHashMap<String, UimaTransport>();
+
+ protected ConcurrentHashMap<String, UimaMessageListener> messageListeners = new ConcurrentHashMap<String, UimaMessageListener>();
+
private Exception initException = null;
-
- // Local cache for this controller only. This cache stores state of
+
+ // Local cache for this controller only. This cache stores state of
// each CAS. The actual CAS is still stored in the global cache. The
// local cache is used to determine when each CAS can be removed as
// it reaches the final step. Global cache is not a good place to
// store this information if there are collocated (delegate) controllers
- // A CAS state change made by one controller may effect another controller.
+ // A CAS state change made by one controller may effect another controller.
protected LocalCache localCache;
- protected String aeDescriptor;
- // List of Delegates
+ protected String aeDescriptor;
+
+ // List of Delegates
protected List<Delegate> delegates = new ArrayList<Delegate>();
- // indicates whether or not we received a callback from the InProcessCache when
- // it becomes empty
- protected volatile boolean callbackReceived = false;
- // Monitor used in stop() to await a callback from InProcessCache
+
+ // indicates whether or not we received a callback from the InProcessCache when
+ // it becomes empty
+ protected volatile boolean callbackReceived = false;
+
+ // Monitor used in stop() to await a callback from InProcessCache
protected Object callbackMonitor = new Object();
-
+
protected volatile boolean awaitingCacheCallbackNotification = false;
- protected ConcurrentHashMap<String, String> abortedCasesMap =
- new ConcurrentHashMap<String, String>();
+
+ protected ConcurrentHashMap<String, String> abortedCasesMap = new ConcurrentHashMap<String, String>();
protected String processPid = "";
+
private CountDownLatch stopLatch = new CountDownLatch(1);
-
- // Set to true when stopping the service
+
+ // Set to true when stopping the service
private volatile boolean releasedAllCASes;
-
- protected List<DoNotProcessEntry> doNotProcessList =
- new ArrayList<DoNotProcessEntry>();
-
- private ScheduledExecutorService daemonServiceExecutor=null;
-
- private static final UimaAsVersion uimaAsVersion =
- new UimaAsVersion();
-
+
+ protected List<DoNotProcessEntry> doNotProcessList = new ArrayList<DoNotProcessEntry>();
+
+ private ScheduledExecutorService daemonServiceExecutor = null;
+
+ private static final UimaAsVersion uimaAsVersion = new UimaAsVersion();
+
public BaseAnalysisEngineController() {
-
+
+ }
+
+ public BaseAnalysisEngineController(AnalysisEngineController aParentController,
+ int aComponentCasPoolSize, String anEndpointName, String aDescriptor,
+ AsynchAECasManager aCasManager, InProcessCache anInProcessCache) throws Exception {
+ this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager,
+ anInProcessCache, null, null);
+ }
+
+ public BaseAnalysisEngineController(AnalysisEngineController aParentController,
+ int aComponentCasPoolSize, String anEndpointName, String aDescriptor,
+ AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap)
+ throws Exception {
+ this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager,
+ anInProcessCache, aDestinationMap, null);
}
- public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache) throws Exception
- {
- this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, null, null);
- }
- public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap) throws Exception
- {
- this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, aDestinationMap, null);
- }
- public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap, JmxManagement aJmxManagement) throws Exception
- {
- this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, aDestinationMap, aJmxManagement);
- }
-
-
- public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, long anInitialCasHeapSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap, JmxManagement aJmxManagement) throws Exception
- {
- casManager = aCasManager;
- inProcessCache = anInProcessCache;
+
+ public BaseAnalysisEngineController(AnalysisEngineController aParentController,
+ int aComponentCasPoolSize, String anEndpointName, String aDescriptor,
+ AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap,
+ JmxManagement aJmxManagement) throws Exception {
+ this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager,
+ anInProcessCache, aDestinationMap, aJmxManagement);
+ }
+
+ public BaseAnalysisEngineController(AnalysisEngineController aParentController,
+ int aComponentCasPoolSize, long anInitialCasHeapSize, String anEndpointName,
+ String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache,
+ Map aDestinationMap, JmxManagement aJmxManagement) throws Exception {
+ casManager = aCasManager;
+ inProcessCache = anInProcessCache;
localCache = new LocalCache(this);
aeDescriptor = aDescriptor;
- parentController = aParentController;
- componentCasPoolSize = aComponentCasPoolSize;
-
- if ( this instanceof AggregateAnalysisEngineController )
- {
- // Populate a list of un-registered co-located delegates. A delegate will be taken off the un-registered list
- // when it calls its parent registerChildController() method.
- Set set = aDestinationMap.entrySet();
+ parentController = aParentController;
+ componentCasPoolSize = aComponentCasPoolSize;
+
+ if (this instanceof AggregateAnalysisEngineController) {
+ // Populate a list of un-registered co-located delegates. A delegate will be taken off the
+ // un-registered list
+ // when it calls its parent registerChildController() method.
+ Set set = aDestinationMap.entrySet();
synchronized (unregisteredDelegateList) {
- for( Iterator it = set.iterator(); it.hasNext();)
- {
- Map.Entry entry = (Map.Entry)it.next();
- Endpoint endpoint = (Endpoint)entry.getValue();
- if ( endpoint != null && !endpoint.isRemote() )
- {
+ for (Iterator it = set.iterator(); it.hasNext();) {
+ Map.Entry entry = (Map.Entry) it.next();
+ Endpoint endpoint = (Endpoint) entry.getValue();
+ if (endpoint != null && !endpoint.isRemote()) {
unregisteredDelegateList.add(entry.getKey());
}
}
- if ( unregisteredDelegateList.size() == 0 ) // All delegates are remote
+ if (unregisteredDelegateList.size() == 0) // All delegates are remote
{
allDelegatesAreRemote = true;
}
}
- }
+ }
-
- endpointName = anEndpointName;
- delegateKey = anEndpointName;
-
- if (this instanceof AggregateAnalysisEngineController)
- {
- ConcurrentHashMap endpoints = new ConcurrentHashMap();
- endpoints.putAll(aDestinationMap);
- // Create a map containing: Endpoint-DelegateKey pairs, to enable look-up
- // of a delegate key based on delegate's endpoint
- ((AggregateAnalysisEngineController) this).mapEndpointsToKeys(endpoints);
-
- }
- // If not the top level, retrieve the name of the endpoint from the parent
- if ( !isTopLevelComponent() )
- {
- Endpoint endpoint = ((AggregateAnalysisEngineController)parentController).lookUpEndpoint(endpointName, false);
- endpointName = endpoint.getEndpoint();
- }
- resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aDescriptor);
-
-
- if ( isTopLevelComponent())
- {
- // Check UIMA AS version againg the UIMA Core version. If not the same throw Exception
- if ( !uimaAsVersion.getVersionString().equals(UIMAFramework.getVersionString())) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "BaseAnalysisEngineController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_incompatible_version_WARNING",
- new Object[] { getComponentName(), uimaAsVersion.getVersionString(), UIMAFramework.getVersionString() });
- throw new ResourceInitializationException(new AsynchAEException("Version of UIMA-AS is Incompatible with a Version of UIMA Core. UIMA-AS Version:"+uimaAsVersion.getVersionString()+" Core UIMA Version:"+UIMAFramework.getVersionString()));
- }
- logPlatformInfo(getComponentName());
- }
- else
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "BaseAnalysisEngineController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_id_INFO",
- new Object[] { endpointName });
- }
- }
-
- // Is this service a CAS Multiplier?
- if ( (resourceSpecifier instanceof AnalysisEngineDescription &&
- ((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes())
- || resourceSpecifier instanceof CollectionReaderDescription)
- {
- casMultiplier = true;
- }
-
- paramsMap = new HashMap();
- if ( aJmxManagement == null )
- {
- jmxManagement = new JmxManager(getJMXDomain());
- }
- else
- {
- jmxManagement = aJmxManagement;
- if ( jmxManagement.getMBeanServer() != null )
- {
- paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, jmxManagement.getMBeanServer());
- }
- }
- paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, jmxManagement.getJmxDomain());
- if ( isTopLevelComponent() && this instanceof AggregateAnalysisEngineController)
- {
+ endpointName = anEndpointName;
+ delegateKey = anEndpointName;
+
+ if (this instanceof AggregateAnalysisEngineController) {
+ ConcurrentHashMap endpoints = new ConcurrentHashMap();
+ endpoints.putAll(aDestinationMap);
+ // Create a map containing: Endpoint-DelegateKey pairs, to enable look-up
+ // of a delegate key based on delegate's endpoint
+ ((AggregateAnalysisEngineController) this).mapEndpointsToKeys(endpoints);
+
+ }
+ // If not the top level, retrieve the name of the endpoint from the parent
+ if (!isTopLevelComponent()) {
+ Endpoint endpoint = ((AggregateAnalysisEngineController) parentController).lookUpEndpoint(
+ endpointName, false);
+ endpointName = endpoint.getEndpoint();
+ }
+ resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aDescriptor);
+
+ if (isTopLevelComponent()) {
+ // Check UIMA AS version againg the UIMA Core version. If not the same throw Exception
+ if (!uimaAsVersion.getVersionString().equals(UIMAFramework.getVersionString())) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "BaseAnalysisEngineController",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_incompatible_version_WARNING",
+ new Object[] { getComponentName(), uimaAsVersion.getVersionString(),
+ UIMAFramework.getVersionString() });
+ throw new ResourceInitializationException(new AsynchAEException(
+ "Version of UIMA-AS is Incompatible with a Version of UIMA Core. UIMA-AS Version:"
+ + uimaAsVersion.getVersionString() + " Core UIMA Version:"
+ + UIMAFramework.getVersionString()));
+ }
+ logPlatformInfo(getComponentName());
+ } else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "C'tor", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_using_vm_transport_INFO",
- new Object[] { getComponentName() });
+ "BaseAnalysisEngineController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_id_INFO", new Object[] { endpointName });
}
- }
+ }
- // Top level component?
- if (parentController == null)
- {
- paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, casManager.getResourceManager());
- initialize(resourceSpecifier, paramsMap);
- AnalysisEngineManagementImpl mbean = (AnalysisEngineManagementImpl)
- getUimaContextAdmin().getManagementInterface();
- // Override uima core jmx domain setting
- mbean.setName(getComponentName(), getUimaContextAdmin(),jmxManagement.getJmxDomain());
- if ( resourceSpecifier instanceof AnalysisEngineDescription )
- {
- // Is this service a CAS Multiplier?
- if ( ((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() )
- {
- System.out.println(getName()+"-Initializing CAS Pool for Context:"+getUimaContextAdmin().getQualifiedContextName());
- System.out.println(getComponentName()+"-CasMultiplier Cas Pool Size="+aComponentCasPoolSize+" Cas Initialial Heap Size:"+anInitialCasHeapSize);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "C'tor", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_multiplier_cas_pool_config_INFO",
- new Object[] { getComponentName(), aComponentCasPoolSize, anInitialCasHeapSize });
- }
- initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize);
- }
- }
- }
- else
- {
- UimaContext childContext = parentController.getChildUimaContext(endpointName);
- paramsMap.put(Resource.PARAM_UIMA_CONTEXT, childContext);
- initialize(resourceSpecifier, paramsMap);
- initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize );
- if (parentController instanceof AggregateAnalysisEngineController )
- {
-
- // Register self with the parent controller
- ((AggregateAnalysisEngineController) parentController).registerChildController(this,delegateKey);
- }
- }
-
- // Each component in the service hierarchy has its own index. The index is used
- // to construct jmx context path to which every object belongs.
- int index = getIndex();
-
- // Get uima ee jmx base context path
- jmxContext = getJmxContext();
- if ( !isTopLevelComponent() && this instanceof PrimitiveAnalysisEngineController )
- {
- String thisComponentName = ((AggregateAnalysisEngineController)parentController).lookUpDelegateKey(endpointName);
- jmxContext += ( thisComponentName + " Uima EE Service");
- }
-
- // Register InProcessCache with JMX under the top level component
- if ( inProcessCache != null && isTopLevelComponent() )
- {
- inProcessCache.setName(jmxManagement.getJmxDomain()+jmxContext+",name="+inProcessCache.getName());
- ObjectName on = new ObjectName(inProcessCache.getName() );
- jmxManagement.registerMBean(inProcessCache, on);
- }
- initializeServiceStats();
-
-
- // Show Serialization Strategy of each remote delegate
- if ( this instanceof AggregateAnalysisEngineController )
- {
+ // Is this service a CAS Multiplier?
+ if ((resourceSpecifier instanceof AnalysisEngineDescription && ((AnalysisEngineDescription) resourceSpecifier)
+ .getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes())
+ || resourceSpecifier instanceof CollectionReaderDescription) {
+ casMultiplier = true;
+ }
+
+ paramsMap = new HashMap();
+ if (aJmxManagement == null) {
+ jmxManagement = new JmxManager(getJMXDomain());
+ } else {
+ jmxManagement = aJmxManagement;
+ if (jmxManagement.getMBeanServer() != null) {
+ paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, jmxManagement.getMBeanServer());
+ }
+ }
+ paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, jmxManagement.getJmxDomain());
+ if (isTopLevelComponent() && this instanceof AggregateAnalysisEngineController) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "C'tor",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_using_vm_transport_INFO",
+ new Object[] { getComponentName() });
+ }
+ }
+
+ // Top level component?
+ if (parentController == null) {
+ paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, casManager.getResourceManager());
+ initialize(resourceSpecifier, paramsMap);
+ AnalysisEngineManagementImpl mbean = (AnalysisEngineManagementImpl) getUimaContextAdmin()
+ .getManagementInterface();
+ // Override uima core jmx domain setting
+ mbean.setName(getComponentName(), getUimaContextAdmin(), jmxManagement.getJmxDomain());
+ if (resourceSpecifier instanceof AnalysisEngineDescription) {
+ // Is this service a CAS Multiplier?
+ if (((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData()
+ .getOperationalProperties().getOutputsNewCASes()) {
+ System.out.println(getName() + "-Initializing CAS Pool for Context:"
+ + getUimaContextAdmin().getQualifiedContextName());
+ System.out.println(getComponentName() + "-CasMultiplier Cas Pool Size="
+ + aComponentCasPoolSize + " Cas Initialial Heap Size:" + anInitialCasHeapSize);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME)
+ .logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "C'tor",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_multiplier_cas_pool_config_INFO",
+ new Object[] { getComponentName(), aComponentCasPoolSize,
+ anInitialCasHeapSize });
+ }
+ initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize);
+ }
+ }
+ } else {
+ UimaContext childContext = parentController.getChildUimaContext(endpointName);
+ paramsMap.put(Resource.PARAM_UIMA_CONTEXT, childContext);
+ initialize(resourceSpecifier, paramsMap);
+ initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize);
+ if (parentController instanceof AggregateAnalysisEngineController) {
+
+ // Register self with the parent controller
+ ((AggregateAnalysisEngineController) parentController).registerChildController(this,
+ delegateKey);
+ }
+ }
+
+ // Each component in the service hierarchy has its own index. The index is used
+ // to construct jmx context path to which every object belongs.
+ int index = getIndex();
+
+ // Get uima ee jmx base context path
+ jmxContext = getJmxContext();
+ if (!isTopLevelComponent() && this instanceof PrimitiveAnalysisEngineController) {
+ String thisComponentName = ((AggregateAnalysisEngineController) parentController)
+ .lookUpDelegateKey(endpointName);
+ jmxContext += (thisComponentName + " Uima EE Service");
+ }
+
+ // Register InProcessCache with JMX under the top level component
+ if (inProcessCache != null && isTopLevelComponent()) {
+ inProcessCache.setName(jmxManagement.getJmxDomain() + jmxContext + ",name="
+ + inProcessCache.getName());
+ ObjectName on = new ObjectName(inProcessCache.getName());
+ jmxManagement.registerMBean(inProcessCache, on);
+ }
+ initializeServiceStats();
+
+ // Show Serialization Strategy of each remote delegate
+ if (this instanceof AggregateAnalysisEngineController) {
Set set = aDestinationMap.entrySet();
- for( Iterator it = set.iterator(); it.hasNext();)
- {
- Map.Entry entry = (Map.Entry)it.next();
- Endpoint endpoint = (Endpoint)entry.getValue();
- if ( endpoint != null && endpoint.isRemote() )
- {
- String key = ((AggregateAnalysisEngineController)this).lookUpDelegateKey(endpoint.getEndpoint());
- System.out.println("Service:"+getComponentName()+" Configured To Serialize CASes To Remote Delegate:"+key+" Using "+endpoint.getSerializer()+" Serialization");
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "C'tor", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_remote_delegate_serialization_INFO",
- new Object[] { getComponentName(), key, endpoint.getSerializer()});
+ for (Iterator it = set.iterator(); it.hasNext();) {
+ Map.Entry entry = (Map.Entry) it.next();
+ Endpoint endpoint = (Endpoint) entry.getValue();
+ if (endpoint != null && endpoint.isRemote()) {
+ String key = ((AggregateAnalysisEngineController) this).lookUpDelegateKey(endpoint
+ .getEndpoint());
+ System.out.println("Service:" + getComponentName()
+ + " Configured To Serialize CASes To Remote Delegate:" + key + " Using "
+ + endpoint.getSerializer() + " Serialization");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "C'tor",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_show_remote_delegate_serialization_INFO",
+ new Object[] { getComponentName(), key, endpoint.getSerializer() });
}
}
}
}
-
- // Create an instance of ControllerMBean and register it with JMX Server.
- // This bean exposes service lifecycle APIs to enable remote stop
- if ( isTopLevelComponent() ) {
+
+ // Create an instance of ControllerMBean and register it with JMX Server.
+ // This bean exposes service lifecycle APIs to enable remote stop
+ if (isTopLevelComponent()) {
Controller controller = new Controller(this);
- String jmxName = getManagementInterface().getJmxDomain()+"name="+"Controller";
- registerWithAgent( controller, jmxName);
+ String jmxName = getManagementInterface().getJmxDomain() + "name=" + "Controller";
+ registerWithAgent(controller, jmxName);
}
- }
-
- private void logPlatformInfo(String serviceName ) {
- if ( ManagementFactory.getPlatformMBeanServer() != null )
- {
+ }
+
+ private void logPlatformInfo(String serviceName) {
+ if (ManagementFactory.getPlatformMBeanServer() != null) {
RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
-// MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
+ // MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
- // bean.getName() should return a string that looks like this: PID@HOSTNAME
- // where PID is the process id and the HOSTNAME is the name of the machine
+ // bean.getName() should return a string that looks like this: PID@HOSTNAME
+ // where PID is the process id and the HOSTNAME is the name of the machine
processPid = bean.getName();
- if ( processPid != null && processPid.trim().length() > 0 ) {
+ if (processPid != null && processPid.trim().length() > 0) {
int endPos = processPid.indexOf("@"); // find the position where the PID ends
- if ( endPos > -1 ) {
- processPid = processPid.substring(0, endPos);
- }
+ if (endPos > -1) {
+ processPid = processPid.substring(0, endPos);
+ }
}
DateFormat df = new SimpleDateFormat("dd MMM yyyy HH:mm:ss ");
-
+
StringBuffer platformInfo = new StringBuffer();
platformInfo.append("\n+------------------------------------------------------------------");
- platformInfo.append("\n Starting UIMA AS Service - PID:"+processPid);
+ platformInfo.append("\n Starting UIMA AS Service - PID:" + processPid);
platformInfo.append("\n+------------------------------------------------------------------");
- platformInfo.append("\n+ Service Name:"+serviceName);
- platformInfo.append("\n+ Service Queue Name:"+endpointName);
- platformInfo.append("\n+ Service Start Time:"+df.format(bean.getStartTime()));
- platformInfo.append("\n+ UIMA AS Version:"+uimaAsVersion.getVersionString());
- platformInfo.append("\n+ UIMA Core Version:"+UIMAFramework.getVersionString());
- platformInfo.append("\n+ OS Name:"+osBean.getName());
- platformInfo.append("\n+ OS Version:"+osBean.getVersion());
- platformInfo.append("\n+ OS Architecture:"+osBean.getArch());
- platformInfo.append("\n+ OS CPU Count:"+osBean.getAvailableProcessors());
- platformInfo.append("\n+ JVM Vendor:"+bean.getVmVendor());
- platformInfo.append("\n+ JVM Name:"+bean.getVmName());
- platformInfo.append("\n+ JVM Version:"+bean.getVmVersion());
- platformInfo.append("\n+ JVM Input Args:"+bean.getInputArguments());
- platformInfo.append("\n+ JVM Classpath:"+bean.getClassPath());
- platformInfo.append("\n+ JVM LIB_PATH:"+bean.getLibraryPath());
+ platformInfo.append("\n+ Service Name:" + serviceName);
+ platformInfo.append("\n+ Service Queue Name:" + endpointName);
+ platformInfo.append("\n+ Service Start Time:" + df.format(bean.getStartTime()));
+ platformInfo.append("\n+ UIMA AS Version:" + uimaAsVersion.getVersionString());
+ platformInfo.append("\n+ UIMA Core Version:" + UIMAFramework.getVersionString());
+ platformInfo.append("\n+ OS Name:" + osBean.getName());
+ platformInfo.append("\n+ OS Version:" + osBean.getVersion());
+ platformInfo.append("\n+ OS Architecture:" + osBean.getArch());
+ platformInfo.append("\n+ OS CPU Count:" + osBean.getAvailableProcessors());
+ platformInfo.append("\n+ JVM Vendor:" + bean.getVmVendor());
+ platformInfo.append("\n+ JVM Name:" + bean.getVmName());
+ platformInfo.append("\n+ JVM Version:" + bean.getVmVersion());
+ platformInfo.append("\n+ JVM Input Args:" + bean.getInputArguments());
+ platformInfo.append("\n+ JVM Classpath:" + bean.getClassPath());
+ platformInfo.append("\n+ JVM LIB_PATH:" + bean.getLibraryPath());
platformInfo.append("\n+------------------------------------------------------------------");
System.out.println(platformInfo.toString());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "logPlatformInfo", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_platform_info__INFO",
- new Object[] { platformInfo.toString() });
+ "logPlatformInfo", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_show_platform_info__INFO", new Object[] { platformInfo.toString() });
}
- }
- public AnalysisEngineController getParentController() {
+ }
+
+ public AnalysisEngineController getParentController() {
return parentController;
- }
- public UimaTransport getTransport(String aKey) throws Exception
- {
- return getTransport( null, aKey);
}
- public UimaTransport getTransport(UimaAsContext asContext)
- throws Exception
- {
- String endpointName = getName();
- if ( !isTopLevelComponent() ) {
- endpointName = parentController.getName();
- }
- return getTransport(asContext, endpointName);
- }
-
- public UimaTransport getTransport(UimaAsContext asContext, String aKey)
- throws Exception
- {
- UimaTransport transport = null;
- if ( !transports.containsKey(aKey)) {
- transport = new VmTransport(asContext, this);
- if ( isStopped() ) {
- throw new ServiceShutdownException();
- }
- transports.put( aKey, transport);
- }
- else {
- transport = (UimaTransport) transports.get(aKey);
- }
-
- return transport;
- }
-
- /**
- * Initializes transport used for internal messaging between collocated Uima AS services.
- */
- public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception
- {
- // If this controller is an Aggregate Controller, force delegates to initialize
- // their internal transports.
- if ( this instanceof AggregateAnalysisEngineController )
- {
- // Get a list of all colocated delegate controllers.
- List childControllers = ((AggregateAnalysisEngineController_impl)this).childControllerList;
-
- for( int i=0; i < childControllers.size(); i++)
- {
- AnalysisEngineController ctrl = (AnalysisEngineController)childControllers.get(i);
- // Force initialization
- ctrl.initializeVMTransport(parentControllerReplyConsumerCount);
- }
- }
-
- // Only delegate controllers execute the logic below
- if ( parentController != null )
- {
- UimaAsContext uimaAsContext = new UimaAsContext();
- if ( !registeredWithJMXServer )
- {
- registeredWithJMXServer = true;
- registerServiceWithJMX(jmxContext, false);
- }
- // Determine how many consumer threads to create. First though use the parent Aggregate Controller
- // to lookup this delegate key. Next fetch the delegate endpoint which contains
- // concurrentConsumers property.
- String key = ((AggregateAnalysisEngineController)parentController).lookUpDelegateKey(getName());
- int concurrentRequestConsumers = 1;
- int concurrentReplyConsumers = 1;
- if ( key != null )
- {
- Endpoint e = ((AggregateAnalysisEngineController)parentController).lookUpEndpoint(key, false);
- concurrentRequestConsumers = e.getConcurrentRequestConsumers();
- concurrentReplyConsumers = e.getConcurrentReplyConsumers();
- }
-
- System.out.println("Controller:"+getComponentName()+" Starting Request Listener With "+concurrentRequestConsumers+" Concurrent Consumers. Reply Listener Configured With "+concurrentReplyConsumers+" Concurrent Consumers");
-
- uimaAsContext.setConcurrentConsumerCount(concurrentRequestConsumers);
- uimaAsContext.put("EndpointName", endpointName);
-
- UimaTransport vmTransport = getTransport(uimaAsContext);
- // Creates delegate Listener for receiving requests from the parent
- UimaMessageListener messageListener = vmTransport.produceUimaMessageListener();
- // Plug in message handlers
- messageListener.initialize(uimaAsContext);
- // Store the listener
- messageListeners.put(getName(), messageListener);
- // Creates parent controller dispatcher for this delegate. The dispatcher is wired
- // with this delegate's listener.
- UimaAsContext uimaAsContext2 = new UimaAsContext();
- // Set up as many reply threads as there are threads to process requests
- uimaAsContext2.setConcurrentConsumerCount(concurrentReplyConsumers);
- uimaAsContext2.put("EndpointName", endpointName);
- UimaTransport parentVmTransport = parentController.getTransport(uimaAsContext2, endpointName);
-
- parentVmTransport.produceUimaMessageDispatcher(vmTransport);
- // Creates parent listener for receiving replies from this delegate.
- UimaMessageListener parentListener = parentVmTransport.produceUimaMessageListener();
- // Plug in message handlers
- parentListener.initialize(uimaAsContext2);
- // Creates delegate's dispatcher. It is wired to send replies to the parent's listener.
- vmTransport.produceUimaMessageDispatcher(parentVmTransport);
- // Register input queue with JMX. This is an internal (non-jms) queue where clients
- // send requests to this service.
- vmTransport.registerWithJMX(this, "VmInputQueue");
- parentVmTransport.registerWithJMX( this, "VmReplyQueue");
- }
-
- }
-
- public synchronized UimaMessageListener getUimaMessageListener(String aDelegateKey)
- {
- return messageListeners.get(aDelegateKey);
- }
-
-
- /**
- * Get the domain for Uima JMX. The domain includes a fixed string plus the name of the
- * top level component. All uima ee objects are rooted at this domain.
- */
-
- public String getJMXDomain()
- {
- // Keep calling controllers until the top level component is reached
- if ( !isTopLevelComponent() )
- {
- return parentController.getJMXDomain();
- }
- else
- {
- // The domain includes the name of the top level component
- return "org.apache.uima:type=ee.jms.services,s="+getComponentName()+" Uima EE Service,";
- }
- }
- public JmxManagement getManagementInterface()
- {
- return jmxManagement;
- }
-
- /**
- * Returns a unique id for each component in the service hierarchy.
- * The top level component's id is always = 0
- *
- */
- public int getIndex()
- {
- if ( isTopLevelComponent() )
- {
- return 0;
- }
- return parentController.getIndex()+1;
- }
-
-
- private void initializeServiceStats()
- {
- Statistic statistic = null;
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalDeserializeTime)) == null )
- {
- statistic = new LongNumericStatistic(Monitor.TotalDeserializeTime);
- getMonitor().addStatistic("", statistic);
- }
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalSerializeTime)) == null )
- {
- statistic = new LongNumericStatistic(Monitor.TotalSerializeTime);
- getMonitor().addStatistic("", statistic);
- }
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.IdleTime)) == null )
- {
- statistic = new LongNumericStatistic(Monitor.IdleTime);
- getMonitor().addStatistic("", statistic);
- }
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.ProcessCount)) == null )
- {
- statistic = new LongNumericStatistic(Monitor.ProcessCount);
- getMonitor().addStatistic("", statistic);
- }
- if ( this instanceof PrimitiveAnalysisEngineController )
- {
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.ProcessErrorCount)) == null )
- {
- statistic = new LongNumericStatistic(Monitor.ProcessErrorCount);
- getMonitor().addStatistic("", statistic);
- }
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalProcessErrorCount)) == null )
- {
- statistic = new LongNumericStatistic(Monitor.TotalProcessErrorCount);
- getMonitor().addStatistic("", statistic);
- }
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalAEProcessTime)) == null )
- {
- statistic = new LongNumericStatistic(Monitor.TotalAEProcessTime);
- getMonitor().addStatistic("", statistic);
- }
- }
- }
- private void removeFromJmxServer( ObjectName anMBean) throws Exception
- {
- jmxManagement.unregisterMBean(anMBean);
- }
-
- /**
- * This is called once during initialization to compute the position of the
- * component in the JMX hierarchy and create a context path that will be used
- * to register the component in the JMX registry.
- */
- public String getJmxContext()
- {
- if ( isTopLevelComponent() )
- {
- if ( this instanceof AggregateAnalysisEngineController )
- {
- return "p0="+getComponentName()+" Components";
- }
- else if ( this instanceof PrimitiveAnalysisEngineController )
- {
- return "p0="+getComponentName()+" Uima EE";
- }
-
- }
- // Get the position of the component in the hierarchy. Each component
- // is registered with a unique context string that is composed of
- // the domain+<key,value> pair, where the key=p+<index>. The index is
- // incremented for every component. An example of a hierarchy would be
- // something like:
- // <domain>,s=<service name>,p0=<service name>,p1=<aggregate service>,p2=<delegate service>
-
- int index = getIndex();
- String parentContext = parentController.getJmxContext();
- if ( parentController.isTopLevelComponent())
- {
- index=1;
- }
- if ( this instanceof AggregateAnalysisEngineController )
- {
- String thisComponentName = getComponentName();
- if ( !isTopLevelComponent() && endpointName != null)
- {
- thisComponentName = ((AggregateAnalysisEngineController)parentController).lookUpDelegateKey(endpointName);
- }
- return parentContext+",p"+index+"="+thisComponentName+" Components";
- }
- else
- {
- return parentContext+",p"+index+"=";
- }
- }
- /**
- * Register a component with a given name with JMX MBeanServer
- *
- * @param o - component to register with JMX
- * @param aName - full jmx context name for the component
- */
- protected void registerWithAgent( Object o, String aName)
- {
- try
- {
- ObjectName on = new ObjectName( aName );
- jmxManagement.registerMBean(o, on);
- }
- catch( Exception e)
- {
- // Log and move on
+ public UimaTransport getTransport(String aKey) throws Exception {
+ return getTransport(null, aKey);
+ }
+
+ public UimaTransport getTransport(UimaAsContext asContext) throws Exception {
+ String endpointName = getName();
+ if (!isTopLevelComponent()) {
+ endpointName = parentController.getName();
+ }
+ return getTransport(asContext, endpointName);
+ }
+
+ public UimaTransport getTransport(UimaAsContext asContext, String aKey) throws Exception {
+ UimaTransport transport = null;
+ if (!transports.containsKey(aKey)) {
+ transport = new VmTransport(asContext, this);
+ if (isStopped()) {
+ throw new ServiceShutdownException();
+ }
+ transports.put(aKey, transport);
+ } else {
+ transport = (UimaTransport) transports.get(aKey);
+ }
+
+ return transport;
+ }
+
+ /**
+ * Initializes transport used for internal messaging between collocated Uima AS services.
+ */
+ public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception {
+ // If this controller is an Aggregate Controller, force delegates to initialize
+ // their internal transports.
+ if (this instanceof AggregateAnalysisEngineController) {
+ // Get a list of all colocated delegate controllers.
+ List childControllers = ((AggregateAnalysisEngineController_impl) this).childControllerList;
+
+ for (int i = 0; i < childControllers.size(); i++) {
+ AnalysisEngineController ctrl = (AnalysisEngineController) childControllers.get(i);
+ // Force initialization
+ ctrl.initializeVMTransport(parentControllerReplyConsumerCount);
+ }
+ }
+
+ // Only delegate controllers execute the logic below
+ if (parentController != null) {
+ UimaAsContext uimaAsContext = new UimaAsContext();
+ if (!registeredWithJMXServer) {
+ registeredWithJMXServer = true;
+ registerServiceWithJMX(jmxContext, false);
+ }
+ // Determine how many consumer threads to create. First though use the parent Aggregate
+ // Controller
+ // to lookup this delegate key. Next fetch the delegate endpoint which contains
+ // concurrentConsumers property.
+ String key = ((AggregateAnalysisEngineController) parentController)
+ .lookUpDelegateKey(getName());
+ int concurrentRequestConsumers = 1;
+ int concurrentReplyConsumers = 1;
+ if (key != null) {
+ Endpoint e = ((AggregateAnalysisEngineController) parentController).lookUpEndpoint(key,
+ false);
+ concurrentRequestConsumers = e.getConcurrentRequestConsumers();
+ concurrentReplyConsumers = e.getConcurrentReplyConsumers();
+ }
+
+ System.out.println("Controller:" + getComponentName() + " Starting Request Listener With "
+ + concurrentRequestConsumers
+ + " Concurrent Consumers. Reply Listener Configured With " + concurrentReplyConsumers
+ + " Concurrent Consumers");
+
+ uimaAsContext.setConcurrentConsumerCount(concurrentRequestConsumers);
+ uimaAsContext.put("EndpointName", endpointName);
+
+ UimaTransport vmTransport = getTransport(uimaAsContext);
+ // Creates delegate Listener for receiving requests from the parent
+ UimaMessageListener messageListener = vmTransport.produceUimaMessageListener();
+ // Plug in message handlers
+ messageListener.initialize(uimaAsContext);
+ // Store the listener
+ messageListeners.put(getName(), messageListener);
+ // Creates parent controller dispatcher for this delegate. The dispatcher is wired
+ // with this delegate's listener.
+ UimaAsContext uimaAsContext2 = new UimaAsContext();
+ // Set up as many reply threads as there are threads to process requests
+ uimaAsContext2.setConcurrentConsumerCount(concurrentReplyConsumers);
+ uimaAsContext2.put("EndpointName", endpointName);
+ UimaTransport parentVmTransport = parentController.getTransport(uimaAsContext2, endpointName);
+
+ parentVmTransport.produceUimaMessageDispatcher(vmTransport);
+ // Creates parent listener for receiving replies from this delegate.
+ UimaMessageListener parentListener = parentVmTransport.produceUimaMessageListener();
+ // Plug in message handlers
+ parentListener.initialize(uimaAsContext2);
+ // Creates delegate's dispatcher. It is wired to send replies to the parent's listener.
+ vmTransport.produceUimaMessageDispatcher(parentVmTransport);
+ // Register input queue with JMX. This is an internal (non-jms) queue where clients
+ // send requests to this service.
+ vmTransport.registerWithJMX(this, "VmInputQueue");
+ parentVmTransport.registerWithJMX(this, "VmReplyQueue");
+ }
+
+ }
+
+ public synchronized UimaMessageListener getUimaMessageListener(String aDelegateKey) {
+ return messageListeners.get(aDelegateKey);
+ }
+
+ /**
+ * Get the domain for Uima JMX. The domain includes a fixed string plus the name of the top level
+ * component. All uima ee objects are rooted at this domain.
+ */
+
+ public String getJMXDomain() {
+ // Keep calling controllers until the top level component is reached
+ if (!isTopLevelComponent()) {
+ return parentController.getJMXDomain();
+ } else {
+ // The domain includes the name of the top level component
+ return "org.apache.uima:type=ee.jms.services,s=" + getComponentName() + " Uima EE Service,";
+ }
+ }
+
+ public JmxManagement getManagementInterface() {
+ return jmxManagement;
+ }
+
+ /**
+ * Returns a unique id for each component in the service hierarchy. The top level component's id
+ * is always = 0
+ *
+ */
+ public int getIndex() {
+ if (isTopLevelComponent()) {
+ return 0;
+ }
+ return parentController.getIndex() + 1;
+ }
+
+ private void initializeServiceStats() {
+ Statistic statistic = null;
+ if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.TotalDeserializeTime)) == null) {
+ statistic = new LongNumericStatistic(Monitor.TotalDeserializeTime);
+ getMonitor().addStatistic("", statistic);
+ }
+ if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.TotalSerializeTime)) == null) {
+ statistic = new LongNumericStatistic(Monitor.TotalSerializeTime);
+ getMonitor().addStatistic("", statistic);
+ }
+ if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.IdleTime)) == null) {
+ statistic = new LongNumericStatistic(Monitor.IdleTime);
+ getMonitor().addStatistic("", statistic);
+ }
+ if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessCount)) == null) {
+ statistic = new LongNumericStatistic(Monitor.ProcessCount);
+ getMonitor().addStatistic("", statistic);
+ }
+ if (this instanceof PrimitiveAnalysisEngineController) {
+ if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessErrorCount)) == null) {
+ statistic = new LongNumericStatistic(Monitor.ProcessErrorCount);
+ getMonitor().addStatistic("", statistic);
+ }
+ if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.TotalProcessErrorCount)) == null) {
+ statistic = new LongNumericStatistic(Monitor.TotalProcessErrorCount);
+ getMonitor().addStatistic("", statistic);
+ }
+ if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.TotalAEProcessTime)) == null) {
+ statistic = new LongNumericStatistic(Monitor.TotalAEProcessTime);
+ getMonitor().addStatistic("", statistic);
+ }
+ }
+ }
+
+ private void removeFromJmxServer(ObjectName anMBean) throws Exception {
+ jmxManagement.unregisterMBean(anMBean);
+ }
+
+ /**
+ * This is called once during initialization to compute the position of the component in the JMX
+ * hierarchy and create a context path that will be used to register the component in the JMX
+ * registry.
+ */
+ public String getJmxContext() {
+ if (isTopLevelComponent()) {
+ if (this instanceof AggregateAnalysisEngineController) {
+ return "p0=" + getComponentName() + " Components";
+ } else if (this instanceof PrimitiveAnalysisEngineController) {
+ return "p0=" + getComponentName() + " Uima EE";
+ }
+
+ }
+ // Get the position of the component in the hierarchy. Each component
+ // is registered with a unique context string that is composed of
+ // the domain+<key,value> pair, where the key=p+<index>. The index is
+ // incremented for every component. An example of a hierarchy would be
+ // something like:
+ // <domain>,s=<service name>,p0=<service name>,p1=<aggregate service>,p2=<delegate service>
+
+ int index = getIndex();
+ String parentContext = parentController.getJmxContext();
+ if (parentController.isTopLevelComponent()) {
+ index = 1;
+ }
+ if (this instanceof AggregateAnalysisEngineController) {
+ String thisComponentName = getComponentName();
+ if (!isTopLevelComponent() && endpointName != null) {
+ thisComponentName = ((AggregateAnalysisEngineController) parentController)
+ .lookUpDelegateKey(endpointName);
+ }
+ return parentContext + ",p" + index + "=" + thisComponentName + " Components";
+ } else {
+ return parentContext + ",p" + index + "=";
+ }
+ }
+
+ /**
+ * Register a component with a given name with JMX MBeanServer
+ *
+ * @param o
+ * - component to register with JMX
+ * @param aName
+ * - full jmx context name for the component
+ */
+ protected void registerWithAgent(Object o, String aName) {
+ try {
+ ObjectName on = new ObjectName(aName);
+ jmxManagement.registerMBean(o, on);
+ } catch (Exception e) {
+ // Log and move on
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "setListenerContainer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
- new Object[] { e });
+ "setListenerContainer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", new Object[] { e });
}
- }
- }
- public void registerVmQueueWithJMX( Object o, String aName ) throws Exception {
- String jmxName = getManagementInterface().getJmxDomain()
- +jmxContext+",name="+getComponentName()+"_"+aName;
- registerWithAgent( o, jmxName);
-
- if ( "VmReplyQueue".equals( aName )) {
- getServiceInfo().setReplyQueueName(jmxName);
- } else {
- getServiceInfo().setInputQueueName(jmxName);
- }
- }
- protected void registerServiceWithJMX(String key_value_list, boolean remote)
- {
- String thisComponentName = getComponentName();
-
- String name = "";
- getIndex();
- servicePerformance = new ServicePerformance(this);
- name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+servicePerformance.getLabel();
-
- registerWithAgent(servicePerformance, name );
- servicePerformance.setIdleTime(System.nanoTime());
+ }
+ }
+
+ public void registerVmQueueWithJMX(Object o, String aName) throws Exception {
+ String jmxName = getManagementInterface().getJmxDomain() + jmxContext + ",name="
+ + getComponentName() + "_" + aName;
+ registerWithAgent(o, jmxName);
+
+ if ("VmReplyQueue".equals(aName)) {
+ getServiceInfo().setReplyQueueName(jmxName);
+ } else {
+ getServiceInfo().setInputQueueName(jmxName);
+ }
+ }
+
+ protected void registerServiceWithJMX(String key_value_list, boolean remote) {
+ String thisComponentName = getComponentName();
+
+ String name = "";
+ getIndex();
+ servicePerformance = new ServicePerformance(this);
+ name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
+ + servicePerformance.getLabel();
+
+ registerWithAgent(servicePerformance, name);
+ servicePerformance.setIdleTime(System.nanoTime());
ServiceInfo serviceInfo = null;
- if ( remote )
- {
- serviceInfo = getInputChannel().getServiceInfo();
- }
- else
- {
+ if (remote) {
+ serviceInfo = getInputChannel().getServiceInfo();
+ } else {
serviceInfo = new ServiceInfo();
serviceInfo.setBrokerURL(getBrokerURL());
serviceInfo.setInputQueueName(getName());
serviceInfo.setState("Active");
- }
- ServiceInfo pServiceInfo = null;
+ }
+ ServiceInfo pServiceInfo = null;
- if ( this instanceof PrimitiveAnalysisEngineController )
- {
- pServiceInfo = ((PrimitiveAnalysisEngineController)this).getServiceInfo();
- servicePerformance.setProcessThreadCount(((PrimitiveAnalysisEngineController)this).getServiceInfo().getAnalysisEngineInstanceCount());
- }
- else
- {
- pServiceInfo =
- ((AggregateAnalysisEngineController)this).getServiceInfo();
- pServiceInfo.setAggregate(true);
- }
- // If this is a Cas Multiplier, add the key to the JMX MBean.
- // This will help the JMX Monitor to fetch the CM Cas Pool MBean
- if ( isCasMultiplier() )
- {
+ if (this instanceof PrimitiveAnalysisEngineController) {
+ pServiceInfo = ((PrimitiveAnalysisEngineController) this).getServiceInfo();
+ servicePerformance.setProcessThreadCount(((PrimitiveAnalysisEngineController) this)
+ .getServiceInfo().getAnalysisEngineInstanceCount());
+ } else {
+ pServiceInfo = ((AggregateAnalysisEngineController) this).getServiceInfo();
+ pServiceInfo.setAggregate(true);
+ }
+ // If this is a Cas Multiplier, add the key to the JMX MBean.
+ // This will help the JMX Monitor to fetch the CM Cas Pool MBean
+ if (isCasMultiplier()) {
pServiceInfo.setServiceKey(getUimaContextAdmin().getQualifiedContextName());
}
-
- if ( pServiceInfo != null )
- {
- name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceInfo.getLabel();
- if ( !isTopLevelComponent() )
- {
- pServiceInfo.setBrokerURL("Embedded Broker");
- }
- else
- {
- pServiceInfo.setTopLevel();
- }
- if ( isCasMultiplier())
- {
- pServiceInfo.setCASMultiplier();
- }
- registerWithAgent(pServiceInfo, name );
- }
-
- serviceErrors = new ServiceErrors();
- name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceErrors.getLabel();
- registerWithAgent(serviceErrors, name );
- }
-
- protected void cleanUp() throws Exception
- {
- if ( inProcessCache != null && isTopLevelComponent() )
- {
- ObjectName on = new ObjectName(inProcessCache.getName() );
- removeFromJmxServer(on);
- }
- }
-
- /**
- * Override the default JmxManager
- */
- public void setJmxManagement(JmxManagement aJmxManagement)
- {
- jmxManagement = aJmxManagement;
- }
-
-
- private void initializeComponentCasPool(int aComponentCasPoolSize, long anInitialCasHeapSize )
- {
- if (aComponentCasPoolSize > 0)
- {
- EECasManager_impl cm = (EECasManager_impl) getResourceManager().getCasManager();
- cm.setInitialCasHeapSize(anInitialCasHeapSize);
+ if (pServiceInfo != null) {
+ name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
+ + serviceInfo.getLabel();
+ if (!isTopLevelComponent()) {
+ pServiceInfo.setBrokerURL("Embedded Broker");
+ } else {
+ pServiceInfo.setTopLevel();
+ }
+ if (isCasMultiplier()) {
+ pServiceInfo.setCASMultiplier();
+ }
+ registerWithAgent(pServiceInfo, name);
+ }
+
+ serviceErrors = new ServiceErrors();
+ name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
+ + serviceErrors.getLabel();
+ registerWithAgent(serviceErrors, name);
+ }
+
+ protected void cleanUp() throws Exception {
+ if (inProcessCache != null && isTopLevelComponent()) {
+ ObjectName on = new ObjectName(inProcessCache.getName());
+ removeFromJmxServer(on);
+ }
+ }
+
+ /**
+ * Override the default JmxManager
+ */
+ public void setJmxManagement(JmxManagement aJmxManagement) {
+ jmxManagement = aJmxManagement;
+ }
+
+ private void initializeComponentCasPool(int aComponentCasPoolSize, long anInitialCasHeapSize) {
+ if (aComponentCasPoolSize > 0) {
+ EECasManager_impl cm = (EECasManager_impl) getResourceManager().getCasManager();
+ cm.setInitialCasHeapSize(anInitialCasHeapSize);
cm.setPoolSize(getUimaContextAdmin().getUniqueName(), aComponentCasPoolSize);
- System.out.println("Component:"+getComponentName()+" Cas Pool:"+getUimaContextAdmin().getQualifiedContextName()+" Size:"+aComponentCasPoolSize+" Cas Heap Size:"+anInitialCasHeapSize/4 +" cells");
+ System.out.println("Component:" + getComponentName() + " Cas Pool:"
+ + getUimaContextAdmin().getQualifiedContextName() + " Size:" + aComponentCasPoolSize
+ + " Cas Heap Size:" + anInitialCasHeapSize / 4 + " cells");
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "initializeComponentCasPool", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_pool_config_INFO",
- new Object[] { getComponentName(), getUimaContextAdmin().getQualifiedContextName(), aComponentCasPoolSize, anInitialCasHeapSize/4});
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "initializeComponentCasPool",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_cas_pool_config_INFO",
+ new Object[] { getComponentName(), getUimaContextAdmin().getQualifiedContextName(),
+ aComponentCasPoolSize, anInitialCasHeapSize / 4 });
+ }
+ }
+
+ }
+
+ public boolean isTopLevelComponent() {
+ return (parentController == null);
+ }
+
+ /**
+ * Returns the name of the component. The name comes from the analysis engine descriptor
+ */
+ public String getComponentName() {
+ return ((ResourceCreationSpecifier) resourceSpecifier).getMetaData().getName();
+ }
+
+ /**
+ * Print the component name rather than the class name
+ */
+ public String toString() {
+ return getComponentName();
+ }
+
+ public void addTimeSnapshot(long snapshot, String aKey) {
+ if (timeSnapshotMap.containsKey(aKey)) {
+ timeSnapshotMap.remove(aKey);
+ }
+ timeSnapshotMap.put(aKey, snapshot);
+
+ }
+
+ public void addServiceInfo(ServiceInfo aServiceInfo) {
+ ServiceInfo sInfo = null;
+
+ if (this instanceof PrimitiveAnalysisEngineController) {
+ sInfo = ((PrimitiveAnalysisEngineController) this).getServiceInfo();
+ } else if (this instanceof AggregateAnalysisEngineController) {
+ sInfo = ((AggregateAnalysisEngineController) this).getServiceInfo();
+ }
+ if (sInfo != null) {
+ sInfo.setBrokerURL(aServiceInfo.getBrokerURL());
+ sInfo.setInputQueueName(aServiceInfo.getInputQueueName());
+ sInfo.setState(aServiceInfo.getState());
+ sInfo.setDeploymentDescriptor(deploymentDescriptor);
+ if (isCasMultiplier()) {
+ sInfo.setCASMultiplier();
}
- }
+ } else {
+ System.out.println("!!!!!!!!!!!!!!! ServiceInfo instance is NULL");
+ }
+
+ }
+
+ public long getTimeSnapshot(String aKey) {
+ if (timeSnapshotMap.containsKey(aKey)) {
+ Object value = timeSnapshotMap.get(aKey);
+ if (value != null) {
+ return ((Long) value).longValue();
+ }
+ }
+ return 0;
- }
+ }
+
+ public ServicePerformance getServicePerformance() {
+ return servicePerformance;
+ }
- public boolean isTopLevelComponent()
- {
- return (parentController == null);
- }
-
- /**
- * Returns the name of the component. The name comes from the analysis engine descriptor
- */
- public String getComponentName()
- {
- return ((ResourceCreationSpecifier)resourceSpecifier).getMetaData().getName();
- }
-
- /**
- * Print the component name rather than the class name
- */
- public String toString()
- {
- return getComponentName();
- }
-
- public void addTimeSnapshot( long snapshot, String aKey )
- {
- if ( timeSnapshotMap.containsKey(aKey) )
- {
- timeSnapshotMap.remove(aKey);
- }
- timeSnapshotMap.put(aKey, snapshot);
-
- }
- public void addServiceInfo( ServiceInfo aServiceInfo )
- {
- ServiceInfo sInfo = null;
-
- if ( this instanceof PrimitiveAnalysisEngineController )
- {
- sInfo =
- ((PrimitiveAnalysisEngineController)this).getServiceInfo();
- }
- else if ( this instanceof AggregateAnalysisEngineController )
- {
- sInfo =
- ((AggregateAnalysisEngineController)this).getServiceInfo();
- }
- if ( sInfo != null )
- {
- sInfo.setBrokerURL(aServiceInfo.getBrokerURL());
- sInfo.setInputQueueName(aServiceInfo.getInputQueueName());
- sInfo.setState(aServiceInfo.getState());
- sInfo.setDeploymentDescriptor(deploymentDescriptor);
- if ( isCasMultiplier())
- {
- sInfo.setCASMultiplier();
- }
- }
- else
- {
- System.out.println("!!!!!!!!!!!!!!! ServiceInfo instance is NULL");
- }
-
- }
- public long getTimeSnapshot( String aKey )
- {
- if ( timeSnapshotMap.containsKey(aKey) )
- {
- Object value = timeSnapshotMap.get(aKey);
- if (value != null) {
- return ((Long)value).longValue();
- }
- }
- return 0;
-
- }
- public ServicePerformance getServicePerformance()
- {
- return servicePerformance;
- }
- public ServiceErrors getServiceErrors()
- {
- return serviceErrors;
- }
-
- public UimaContext getChildUimaContext(String aDelegateEndpointName) throws Exception
- {
- if (this instanceof AggregateAnalysisEngineController)
- {
- String key = ((AggregateAnalysisEngineController) this).lookUpDelegateKey(aDelegateEndpointName);
- if (key == null )
- {
- if ( ((AggregateAnalysisEngineController) this).isDelegateKeyValid(aDelegateEndpointName) )
- {
- key = aDelegateEndpointName;
- }
- }
-
- if ( key == null )
- {
- throw new AsynchAEException(getName()+"-Unable to look up delegate "+aDelegateEndpointName+" in internal map");
- }
- UimaContextAdmin uctx = getUimaContextAdmin();
-
- // retrieve the sofa mappings for input/output sofas of this analysis engine
- HashMap sofamap = new HashMap();
- if (resourceSpecifier instanceof AnalysisEngineDescription)
- {
- AnalysisEngineDescription desc = (AnalysisEngineDescription) resourceSpecifier;
- SofaMapping[] sofaMappings = desc.getSofaMappings();
- if (sofaMappings != null && sofaMappings.length > 0)
- {
- for (int s = 0; s < sofaMappings.length; s++)
- {
- // the mapping is for this analysis engine
- if (sofaMappings[s].getComponentKey().equals(key))
- {
- // if component sofa name is null, replace it with
- // the default for TCAS sofa name
- // This is to support old style TCAS
- if (sofaMappings[s].getComponentSofaName() == null)
- sofaMappings[s].setComponentSofaName(CAS.NAME_DEFAULT_SOFA);
- sofamap.put(sofaMappings[s].getComponentSofaName(), sofaMappings[s].getAggregateSofaName());
- }
- }
- }
- }
- // create child UimaContext and insert into mInitParams map
- return uctx.createChild(key, sofamap);
- }
- return null;
- }
-
- public void setInputChannel(InputChannel anInputChannel) throws Exception
- {
- inputChannel = anInputChannel;
- inputChannelList.add(anInputChannel);
-
- inputChannelLatch.countDown();
- if ( !registeredWithJMXServer )
- {
- registeredWithJMXServer = true;
- registerServiceWithJMX(jmxContext, false);
- }
- }
- public void addInputChannel( InputChannel anInputChannel )
- {
- if ( !inputChannelMap.containsKey(anInputChannel.getInputQueueName()))
- {
+ public ServiceErrors getServiceErrors() {
+ return serviceErrors;
+ }
+
+ public UimaContext getChildUimaContext(String aDelegateEndpointName) throws Exception {
+ if (this instanceof AggregateAnalysisEngineController) {
+ String key = ((AggregateAnalysisEngineController) this)
+ .lookUpDelegateKey(aDelegateEndpointName);
+ if (key == null) {
+ if (((AggregateAnalysisEngineController) this).isDelegateKeyValid(aDelegateEndpointName)) {
+ key = aDelegateEndpointName;
+ }
+ }
+
+ if (key == null) {
+ throw new AsynchAEException(getName() + "-Unable to look up delegate "
+ + aDelegateEndpointName + " in internal map");
+ }
+ UimaContextAdmin uctx = getUimaContextAdmin();
+
+ // retrieve the sofa mappings for input/output sofas of this analysis engine
+ HashMap sofamap = new HashMap();
+ if (resourceSpecifier instanceof AnalysisEngineDescription) {
+ AnalysisEngineDescription desc = (AnalysisEngineDescription) resourceSpecifier;
+ SofaMapping[] sofaMappings = desc.getSofaMappings();
+ if (sofaMappings != null && sofaMappings.length > 0) {
+ for (int s = 0; s < sofaMappings.length; s++) {
+ // the mapping is for this analysis engine
+ if (sofaMappings[s].getComponentKey().equals(key)) {
+ // if component sofa name is null, replace it with
+ // the default for TCAS sofa name
+ // This is to support old style TCAS
+ if (sofaMappings[s].getComponentSofaName() == null)
+ sofaMappings[s].setComponentSofaName(CAS.NAME_DEFAULT_SOFA);
+ sofamap.put(sofaMappings[s].getComponentSofaName(), sofaMappings[s]
+ .getAggregateSofaName());
+ }
+ }
+ }
+ }
+ // create child UimaContext and insert into mInitParams map
+ return uctx.createChild(key, sofamap);
+ }
+ return null;
+ }
+
+ public void setInputChannel(InputChannel anInputChannel) throws Exception {
+ inputChannel = anInputChannel;
+ inputChannelList.add(anInputChannel);
+
+ inputChannelLatch.countDown();
+ if (!registeredWithJMXServer) {
+ registeredWithJMXServer = true;
+ registerServiceWithJMX(jmxContext, false);
+ }
+ }
+
+ public void addInputChannel(InputChannel anInputChannel) {
+ if (!inputChannelMap.containsKey(anInputChannel.getInputQueueName())) {
inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
- if ( inputChannelList.contains(anInputChannel)) {
+ if (inputChannelList.contains(anInputChannel)) {
inputChannelList.add(anInputChannel);
}
- }
- }
- public InputChannel getInputChannel()
- {
- try
- {
- inputChannelLatch.await();
-
- }
- catch( Exception e){}
-
- return inputChannel;
- }
-
- public void dropCAS(CAS aCAS)
- {
- if (aCAS != null)
- {
- // Check if this method was called while another thread is stopping the service.
- // This is a special case. Normally the releasedAllCASes is false. It is only
- // true if another thread forcefully released CASes.
- if ( releasedAllCASes ) {
- // All CASes could have been forcefully released in the stop() method. If another
- // thread was operating on a CAS while the stop() was releasing the CAS we may
- // get an exception which will be ignored. We are shutting down. The forceful
- // CAS release is not part of the graceful. In that case, stop() is only called
- // when ALL CASes are fully processed. Only than stop() is called and since ALL
- // CASes are released at that point we would not see any exceptions.
- try {
- aCAS.release();
- } catch( Exception e) {}
- } else {
- aCAS.release();
- }
-
- }
-
- }
-
-
- public synchronized void saveReplyTime( long snapshot, String aKey )
- {
- replyTime = snapshot;
- }
-
- public synchronized long getReplyTime()
- {
- return replyTime;
- }
-
- protected void handleAction( String anAction, String anEndpoint, ErrorContext anErrorContext )
- throws Exception
- {
-
-
- String casReferenceId = null;
- if ( anErrorContext != null )
- {
- casReferenceId = (String)anErrorContext.get( AsynchAEMessage.CasReference);
- }
-
- if ( ErrorHandler.TERMINATE.equalsIgnoreCase(anAction))
- {
- // Propagate terminate event to the top controller and begin shutdown of this service along
- // with all collocated delegates (if any)
- if ( anErrorContext != null && anErrorContext.containsKey(ErrorContext.THROWABLE_ERROR) && anErrorContext.containsKey(AsynchAEMessage.CasReference)) {
- terminate((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR), (String) anErrorContext.get(AsynchAEMessage.CasReference));
- } else {
- terminate();
- }
- }
- else if ( ErrorHandler.DISABLE.equalsIgnoreCase(anAction) )
- {
-
- if ( anEndpoint != null )
- {
- Endpoint endpoint = null;
- List list = new ArrayList();
- String key = "";
- if ( (endpoint = ((AggregateAnalysisEngineController)this).lookUpEndpoint(anEndpoint, false)) == null )
- {
- key = ((AggregateAnalysisEngineController)this).lookUpDelegateKey(anEndpoint);
- endpoint = ((AggregateAnalysisEngineController)this).lookUpEndpoint(key, false);
- list.add(key);
- }
- else
- {
- key = anEndpoint;
- list.add(anEndpoint);
- }
- ((AggregateAnalysisEngineController_impl)this).disableDelegates(list,casReferenceId);
-
- if ( key != null && key.trim().length() > 0) {
- // Delegate has been disabled. Cleanup Delegate's lists. Each Delegate
- // maintains a list of CASes pending reply and a different list of CASes
- // pending dispatch. The first list contains CASes sent to the delegate.
- // When a reply is received from the delegate, the CAS is removed from
- // the list. The second list contains CASes that have been delayed
- // while the service was in the TIMEDOUT state. These CASes were to
- // be dispatched to the delegate once its state is reset to OK. It is
- // reset to OK state when the delegate responds to the client PING
- // request. Since we have disabled the delegate, remove ALL CASes from
- // both lists and send them through the ErrorHandler one at a time
- // as if these CASes timed out.
-
- Delegate delegate = ((AggregateAnalysisEngineController)this).lookupDelegate(key);
- // Cancel the delegate timer. No more responses are expected
- delegate.cancelDelegateTimer();
- // Check if we should force timeout on all CASes in a pending state. If this
- // method is called from ProcessCasErrorHandler we will skip this since we
- // want to first completely handle the CAS exception. Once that CAS exception
- // is handled, the ProcessCasErrorHandler will call forceTimeoutOnPendingCases
- // to time out CASes in pending lists
- if ( anErrorContext.containsKey( AsynchAEMessage.SkipPendingLists) == false ) {
- // If the delegate has CASes pending reply still, send each CAS
- // from the pending list through the error handler with
- // MessageTimeoutException as a cause of error
- forceTimeoutOnPendingCases(key);
+ }
+ }
+
+ public InputChannel getInputChannel() {
+ try {
+ inputChannelLatch.await();
+
+ } catch (Exception e) {
+ }
+
+ return inputChannel;
+ }
+
+ public void dropCAS(CAS aCAS) {
+ if (aCAS != null) {
+ // Check if this method was called while another thread is stopping the service.
+ // This is a special case. Normally the releasedAllCASes is false. It is only
+ // true if another thread forcefully released CASes.
+ if (releasedAllCASes) {
+ // All CASes could have been forcefully released in the stop() method. If another
+ // thread was operating on a CAS while the stop() was releasing the CAS we may
+ // get an exception which will be ignored. We are shutting down. The forceful
+ // CAS release is not part of the graceful. In that case, stop() is only called
+ // when ALL CASes are fully processed. Only than stop() is called and since ALL
+ // CASes are released at that point we would not see any exceptions.
+ try {
+ aCAS.release();
+ } catch (Exception e) {
+ }
+ } else {
+ aCAS.release();
+ }
+
+ }
+
+ }
+
+ public synchronized void saveReplyTime(long snapshot, String aKey) {
+ replyTime = snapshot;
+ }
+
+ public synchronized long getReplyTime() {
+ return replyTime;
+ }
+
+ protected void handleAction(String anAction, String anEndpoint, ErrorContext anErrorContext)
+ throws Exception {
+
+ String casReferenceId = null;
+ if (anErrorContext != null) {
+ casReferenceId = (String) anErrorContext.get(AsynchAEMessage.CasReference);
+ }
+
+ if (ErrorHandler.TERMINATE.equalsIgnoreCase(anAction)) {
+ // Propagate terminate event to the top controller and begin shutdown of this service along
+ // with all collocated delegates (if any)
+ if (anErrorContext != null && anErrorContext.containsKey(ErrorContext.THROWABLE_ERROR)
+ && anErrorContext.containsKey(AsynchAEMessage.CasReference)) {
+ terminate((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR),
+ (String) anErrorContext.get(AsynchAEMessage.CasReference));
+ } else {
+ terminate();
+ }
+ } else if (ErrorHandler.DISABLE.equalsIgnoreCase(anAction)) {
+
+ if (anEndpoint != null) {
+ Endpoint endpoint = null;
+ List list = new ArrayList();
+ String key = "";
+ if ((endpoint = ((AggregateAnalysisEngineController) this)
+ .lookUpEndpoint(anEndpoint, false)) == null) {
+ key = ((AggregateAnalysisEngineController) this).lookUpDelegateKey(anEndpoint);
+ endpoint = ((AggregateAnalysisEngineController) this).lookUpEndpoint(key, false);
+ list.add(key);
+ } else {
+ key = anEndpoint;
+ list.add(anEndpoint);
+ }
+ ((AggregateAnalysisEngineController_impl) this).disableDelegates(list, casReferenceId);
+
+ if (key != null && key.trim().length() > 0) {
+ // Delegate has been disabled. Cleanup Delegate's lists. Each Delegate
+ // maintains a list of CASes pending reply and a different list of CASes
+ // pending dispatch. The first list contains CASes sent to the delegate.
+ // When a reply is received from the delegate, the CAS is removed from
+ // the list. The second list contains CASes that have been delayed
+ // while the service was in the TIMEDOUT state. These CASes were to
+ // be dispatched to the delegate once its state is reset to OK. It is
+ // reset to OK state when the delegate responds to the client PING
+ // request. Since we have disabled the delegate, remove ALL CASes from
+ // both lists and send them through the ErrorHandler one at a time
+ // as if these CASes timed out.
+
+ Delegate delegate = ((AggregateAnalysisEngineController) this).lookupDelegate(key);
+ // Cancel the delegate timer. No more responses are expected
+ delegate.cancelDelegateTimer();
+ // Check if we should force timeout on all CASes in a pending state. If this
+ // method is called from ProcessCasErrorHandler we will skip this since we
+ // want to first completely handle the CAS exception. Once that CAS exception
+ // is handled, the ProcessCasErrorHandler will call forceTimeoutOnPendingCases
+ // to time out CASes in pending lists
+ if (anErrorContext.containsKey(AsynchAEMessage.SkipPendingLists) == false) {
+ // If the delegate has CASes pending reply still, send each CAS
+ // from the pending list through the error handler with
+ // MessageTimeoutException as a cause of error
+ forceTimeoutOnPendingCases(key);
+ }
+ }
+
+ if (endpoint != null) {
+ try {
+ // Fetch all Cas entries currently awaiting reply from the delegate that was just
+ // disabled. In case the delegate was in a parallel step, we need to adjust the
+ // the number of responses received to account for the failed delegate. This
+ // enables the processing to continue.
+ CacheEntry[] entries = getInProcessCache().getCacheEntriesForEndpoint(
+ endpoint.getEndpoint());
+ if (entries != null) {
+ for (int i = 0; i < entries.length; i++) {
+ CasStateEntry cse = localCache.lookupEntry(entries[i].getCasReferenceId());
+ // Check if this is a parallel step
+ int parallelDelegateCount = cse.getNumberOfParallelDelegates();
+ // Check if all delegated responded
+ if (parallelDelegateCount > 1
+ && cse.howManyDelegatesResponded() < parallelDelegateCount) {
+ // increment responders
+ cse.incrementHowManyDelegatesResponded();
+ }
+ }
}
+ } catch (Exception e) {
+ }
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_disabled_delegate_INFO", new Object[] { getComponentName(), key });
+ }
+ }
+ } else if (ErrorHandler.CONTINUE.equalsIgnoreCase(anAction)) {
+ if (anEndpoint != null) {
+ String key = ((AggregateAnalysisEngineController) this).lookUpDelegateKey(anEndpoint);
+ Exception ex = (Exception) anErrorContext.get(ErrorContext.THROWABLE_ERROR);
+ boolean continueOnError = ((AggregateAnalysisEngineController) this).continueOnError(
+ casReferenceId, key, ex);
+ if (continueOnError) {
+ CacheEntry entry = null;
+ try {
+ entry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ } catch (AsynchAEException e) {
+ System.out.println("Controller:" + getComponentName() + " CAS:" + casReferenceId
+ + " Not Found In Cache");
}
+ CAS cas = null;
+ // Make sure that the ErrorHandler did not drop the cache entry and the CAS
+ if (entry != null && ((cas = entry.getCas()) != null)) {
+ ((AggregateAnalysisEngineController) this).process(cas, casReferenceId);
+ }
+ }
+ }
+ } else if (ErrorHandler.DROPCAS.equalsIgnoreCase(anAction)) {
+ if (casReferenceId != null) {
+ dropCAS(casReferenceId, true);
+ }
+ }
+
+ }
- if ( endpoint != null ) {
- try {
- // Fetch all Cas entries currently awaiting reply from the delegate that was just
- // disabled. In case the delegate was in a parallel step, we need to adjust the
- // the number of responses received to account for the failed delegate. This
- // enables the processing to continue.
- CacheEntry[] entries = getInProcessCache().getCacheEntriesForEndpoint(endpoint.getEndpoint());
- if ( entries != null ) {
- for ( int i=0; i < entries.length; i++ ) {
- CasStateEntry cse = localCache.lookupEntry(entries[i].getCasReferenceId());
- // Check if this is a parallel step
- int parallelDelegateCount = cse.getNumberOfParallelDelegates();
- // Check if all delegated responded
- if ( parallelDelegateCount > 1 && cse.howManyDelegatesResponded() < parallelDelegateCount) {
- // increment responders
- cse.incrementHowManyDelegatesResponded();
- }
- }
- }
- } catch( Exception e) {
- }
- }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_INFO",
- new Object[] { getComponentName(), key });
- }
- }
- }
- else if ( ErrorHandler.CONTINUE.equalsIgnoreCase(anAction) )
- {
- if ( anEndpoint != null )
- {
- String key = ((AggregateAnalysisEngineController)this).lookUpDelegateKey(anEndpoint);
- Exception ex = (Exception)anErrorContext.get(ErrorContext.THROWABLE_ERROR);
- boolean continueOnError =
- ((AggregateAnalysisEngineController)this).continueOnError(casReferenceId, key, ex);
- if ( continueOnError )
- {
- CacheEntry entry = null;
- try
- {
- entry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
- }
- catch( AsynchAEException e) {
- System.out.println("Controller:"+getComponentName()+" CAS:"+casReferenceId+" Not Found In Cache");
- }
- CAS cas = null;
- // Make sure that the ErrorHandler did not drop the cache entry and the CAS
- if ( entry != null && (( cas = entry.getCas()) != null ) )
- {
- ((AggregateAnalysisEngineController)this).process(cas, casReferenceId);
- }
- }
- }
- }
- else if(ErrorHandler.DROPCAS.equalsIgnoreCase( anAction ))
- {
- if ( casReferenceId != null)
- {
- dropCAS(casReferenceId, true);
- }
- }
-
- }
-
- public void forceTimeoutOnPendingCases(String key ) {
- Delegate delegate = ((AggregateAnalysisEngineController)this).lookupDelegate(key);
- // Cancel the delegate timer. No more responses are expected
+ public void forceTimeoutOnPendingCases(String key) {
+ Delegate delegate = ((AggregateAnalysisEngineController) this).lookupDelegate(key);
+ // Cancel the delegate timer. No more responses are expected
delegate.cancelDelegateTimer();
Endpoint endpoint = delegate.getEndpoint();
- // If the delegate has CASes pending reply still, send each CAS
- // from the pending list through the error handler with
- // MessageTimeoutException as a cause of error
- while ( delegate.getCasPendingReplyListSize() > 0 ) {
+ // If the delegate has CASes pending reply still, send each CAS
+ // from the pending list through the error handler with
+ // MessageTimeoutException as a cause of error
+ while (delegate.getCasPendingReplyListSize() > 0) {
String timedOutCasId = delegate.removeOldestCasFromOutstandingList();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_force_cas_timeout__INFO",
- new Object[] { getComponentName(), key, timedOutCasId, " Pending Reply List" });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleAction",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_force_cas_timeout__INFO",
+ new Object[] { getComponentName(), key, timedOutCasId, " Pending Reply List" });
}
-
+
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, timedOutCasId);
errorContext.add(AsynchAEMessage.Endpoint, endpoint);
getErrorHandlerChain().handle(new ForcedMessageTimeoutException(), errorContext, this);
}
- // If the delegate has CASes pending dispatch, send each CAS
- // from the pending dispatch list through the error handler with
- // MessageTimeoutException as a cause of error
- while ( delegate.getCasPendingDispatchListSize() > 0 ) {
+ // If the delegate has CASes pending dispatch, send each CAS
+ // from the pending dispatch list through the error handler with
+ // MessageTimeoutException as a cause of error
+ while (delegate.getCasPendingDispatchListSize() > 0) {
String timedOutCasId = delegate.removeOldestFromPendingDispatchList();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
[... 3056 lines stripped ...]