You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:53:06 UTC
svn commit: r688174 [2/3] - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main:
java/org/apache/uima/aae/ java/org/apache/uima/aae/client/
java/org/apache/uima/aae/controller/
java/org/apache/uima/aae/error/handler/ java/org/apache/uima/aa...
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Fri Aug 22 11:53:05 2008
@@ -19,6 +19,8 @@
package org.apache.uima.aae.controller;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -64,6 +66,7 @@
import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
import org.apache.uima.analysis_engine.metadata.SofaMapping;
import org.apache.uima.cas.CAS;
+import org.apache.uima.collection.CollectionReaderDescription;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceCreationSpecifier;
import org.apache.uima.resource.ResourceSpecifier;
@@ -148,6 +151,30 @@
protected ConcurrentHashMap perCasStatistics = new ConcurrentHashMap();
+ private boolean casMultiplier = false;
+
+ protected Object syncObject = new Object();
+
+ // Map holding outstanding CASes produced by Cas Multiplier that have to be acked
+ protected ConcurrentHashMap cmOutstandingCASes = new ConcurrentHashMap();
+
+ private Object mux = new Object();
+
+ private Object waitmux = new Object();
+
+ private boolean waitingForCAS = false;
+
+ private long startTime = System.nanoTime();
+
+ private long totalWaitTimeForCAS = 0;
+
+ private long lastCASWaitTimeUpdate = 0;
+
+ private Map<Long, AnalysisThreadState> threadStateMap =
+ new HashMap<Long,AnalysisThreadState>();
+
+
+
public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache) throws Exception
{
this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, null, null);
@@ -213,7 +240,14 @@
new Object[] { endpointName });
resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aDescriptor);
-
+ // Is this service a CAS Multiplier?
+ if ( (resourceSpecifier instanceof AnalysisEngineDescription &&
+ ((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes())
+ || resourceSpecifier instanceof CollectionReaderDescription)
+ {
+ casMultiplier = true;
+ }
+
paramsMap = new HashMap();
if ( aJmxManagement == null )
{
@@ -239,8 +273,9 @@
getUimaContextAdmin().getManagementInterface();
// Override uima core jmx domain setting
mbean.setName(getComponentName(), getUimaContextAdmin(),jmxManagement.getJmxDomain());
- if ( this instanceof PrimitiveAnalysisEngineController && resourceSpecifier instanceof AnalysisEngineDescription )
+ if ( resourceSpecifier instanceof AnalysisEngineDescription )
{
+ // Is this service a CAS Multiplier?
if ( ((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() )
{
System.out.println(getName()+"-Initializing CAS Pool for Context:"+getUimaContextAdmin().getQualifiedContextName());
@@ -350,11 +385,6 @@
}
if ( this instanceof PrimitiveAnalysisEngineController )
{
-// if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.ProcessCount)) == null )
-// {
-// statistic = new LongNumericStatistic(Monitor.ProcessCount);
-// getMonitor().addStatistic("", statistic);
-// }
if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.ProcessErrorCount)) == null )
{
statistic = new LongNumericStatistic(Monitor.ProcessErrorCount);
@@ -450,43 +480,52 @@
String name = "";
int index = getIndex();
- servicePerformance = new ServicePerformance();
-// name = getJMXDomain()+key_value_list+",name="+thisComponentName+"_"+servicePerformance.getLabel();
+ servicePerformance = new ServicePerformance(this);
name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+servicePerformance.getLabel();
-
registerWithAgent(servicePerformance, name );
-
+ servicePerformance.setIdleTime(System.nanoTime());
+
ServiceInfo serviceInfo = getInputChannel().getServiceInfo();
ServiceInfo pServiceInfo = null;
if ( this instanceof PrimitiveAnalysisEngineController )
{
pServiceInfo = ((PrimitiveAnalysisEngineController)this).getServiceInfo();
+ servicePerformance.setProcessThreadCount(((PrimitiveAnalysisEngineController)this).getServiceInfo().getAnalysisEngineInstanceCount());
+ // If this is a Cas Multiplier, add the key to the JMX MBean.
+ // This will help the JMX Monitor to fetch the CM Cas Pool MBean
+ if ( isCasMultiplier() )
+ {
+ pServiceInfo.setServiceKey(getUimaContextAdmin().getQualifiedContextName());
+ }
}
else
{
pServiceInfo =
((AggregateAnalysisEngineController)this).getServiceInfo();
+ pServiceInfo.setAggregate(true);
}
if ( pServiceInfo != null )
{
-// name = getJMXDomain()+key_value_list+",name="+thisComponentName+"_"+serviceInfo.getLabel();
name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceInfo.getLabel();
-
-
if ( !isTopLevelComponent() )
{
pServiceInfo.setBrokerURL("Embedded Broker");
}
+ else
+ {
+ pServiceInfo.setTopLevel();
+ }
+ if ( isCasMultiplier())
+ {
+ pServiceInfo.setCASMultiplier();
+ }
registerWithAgent(pServiceInfo, name );
}
serviceErrors = new ServiceErrors();
-// name = getJMXDomain()+key_value_list+",name="+thisComponentName+"_"+serviceErrors.getLabel();
name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceErrors.getLabel();
-
-
registerWithAgent(serviceErrors, name );
}
@@ -519,6 +558,7 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"initializeComponentCasPool", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_pool_config_INFO",
new Object[] { getComponentName(), getUimaContextAdmin().getQualifiedContextName(), aComponentCasPoolSize, anInitialCasHeapSize/4});
+
}
}
@@ -573,6 +613,10 @@
sInfo.setInputQueueName(aServiceInfo.getInputQueueName());
sInfo.setState(aServiceInfo.getState());
sInfo.setDeploymentDescriptor(deploymentDescriptor);
+ if ( isCasMultiplier())
+ {
+ sInfo.setCASMultiplier();
+ }
}
else
{
@@ -661,23 +705,12 @@
registeredWithJMXServer = true;
registerServiceWithJMX(jmxContext, false);
}
-/*
- if ( this instanceof AggregateAnalysisEngineController )
- {
- AggregateAnalysisEngineController aC = (AggregateAnalysisEngineController)this;
- if ( aC.requestForMetaSentToRemotes() == false && allDelegatesAreRemote )
- {
- aC.setRequestForMetaSentToRemotes();
- aC.sendRequestForMetadataToRemoteDelegates();
- }
-*/
}
public void addInputChannel( InputChannel anInputChannel )
{
if ( !inputChannelMap.containsKey(anInputChannel.getInputQueueName()))
{
inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
-
}
}
public InputChannel getInputChannel()
@@ -711,26 +744,7 @@
{
return replyTime;
}
-
- public long getIdleTime( String aKey )
- {
- return idleTime;
- }
-
- public synchronized void saveIdleTime( long snapshot, String aKey, boolean accumulate )
- {
- if ( accumulate )
- {
- LongNumericStatistic statistic;
- // Accumulate idle time across all processing threads
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.IdleTime)) != null )
- {
- statistic.increment(snapshot);
- }
- }
- getServicePerformance().incrementIdleTime(snapshot);
- idleTime += snapshot;
- }
+
protected void handleAction( String anAction, String anEndpoint, ErrorContext anErrorContext )
throws Exception
{
@@ -865,7 +879,7 @@
"dropCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_removed_cache_entry__FINE",
new Object[] {aCasReferenceId, getComponentName() });
}
- inProcessCache.dumpContents();
+ inProcessCache.dumpContents(getComponentName());
}
}
// Remove stats from the map maintaining CAS specific stats
@@ -1081,7 +1095,7 @@
}
else
{
- casStats = new ServicePerformance();
+ casStats = new ServicePerformance(this);
perCasStatistics.put( aCasReferenceId, casStats);
}
return casStats;
@@ -1111,53 +1125,6 @@
}
/**
- * Logs controller statistics in a uima log.
- *
- * @param aComponentName
- * @param aStatsMap
- */
-/*
- protected void logStats(String aComponentName, Map aStatsMap)
- {
- float totalIdleTime = 0;
- long numberCASesProcessed = 0;
- float totalDeserializeTime = 0;
- float totalSerializeTime = 0;
-
- if ( aStatsMap.containsKey(Monitor.IdleTime))
- {
- totalIdleTime = ((Float)aStatsMap.get(Monitor.IdleTime)).floatValue();
- }
- if ( aStatsMap.containsKey(Monitor.ProcessCount))
- {
- numberCASesProcessed = ((Long)aStatsMap.get(Monitor.ProcessCount)).longValue();
- }
- if ( aStatsMap.containsKey(Monitor.TotalDeserializeTime))
- {
- totalDeserializeTime = ((Float)aStatsMap.get(Monitor.TotalDeserializeTime)).floatValue();
- }
- if ( aStatsMap.containsKey(Monitor.TotalDeserializeTime))
- {
- totalSerializeTime = ((Float)aStatsMap.get(Monitor.TotalSerializeTime)).floatValue();
- }
- float totalAEProcessTime=0;
- if ( aStatsMap.containsKey(Monitor.TotalAEProcessTime))
- {
- totalAEProcessTime = ((Float)aStatsMap.get(Monitor.TotalAEProcessTime)).floatValue();
- }
-
- if ( totalAEProcessTime > 0 )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "logStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_dump_primitive_stats__INFO", new Object[] { aComponentName, totalIdleTime, numberCASesProcessed, totalDeserializeTime, totalSerializeTime, totalAEProcessTime });
- }
- else
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "logStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_dump_aggregate_stats__INFO", new Object[] { aComponentName, totalIdleTime, numberCASesProcessed, totalDeserializeTime, totalSerializeTime });
- }
-
- }
-*/
- /**
* Clears controller statistics.
*
*/
@@ -1176,6 +1143,9 @@
}
// Clear CAS statistics
perCasStatistics.clear();
+
+
+
}
/**
* Returns a copy of the controller statistics.
@@ -1278,10 +1248,12 @@
{
return getInputChannel().getInputQueueName();
}
+/*
public long getIdleTime()
{
return 0;
}
+*/
public long getTotalTimeSpentSerializingCAS()
{
return 0;
@@ -1366,14 +1338,14 @@
((AggregateAnalysisEngineController_impl)this).stopTimers();
// Stops ALL input channels of this service including the reply channels
stopInputChannels();
- int childControllerListSize = ((AggregateAnalysisEngineController_impl)this).childControllerList.size();
+ int childControllerListSize = ((AggregateAnalysisEngineController_impl)this).getChildControllerList().size();
// send terminate event to all collocated child controllers
if ( childControllerListSize > 0 )
{
for( int i=0; i < childControllerListSize; i++ )
{
AnalysisEngineController childController =
- (AnalysisEngineController)((AggregateAnalysisEngineController_impl)this).childControllerList.get(i);
+ (AnalysisEngineController)((AggregateAnalysisEngineController_impl)this).getChildControllerList().get(i);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "stop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stop_delegate__INFO", new Object[] { getComponentName(), childController.getComponentName() });
childController.stop();
@@ -1488,23 +1460,6 @@
// fully processed.
stopCasMultiplier();
stop();
-/*
- // If the InProcessCache is not empty ( CASes are still in play), register self
- // (the top level controller) to receive a callback when all CASes are fully
- // processed and the cache becomes empty.
- if ( !getInProcessCache().isEmpty() )
- {
- System.out.println("Controller:"+getComponentName()+" Cache Not Empty. Registering Self For Callback");
- getInProcessCache().dumpContents();
- getInProcessCache().registerCallbackWhenCacheEmpty(this.getEventListener());
- }
- else
- {
- // Cache is already empty - trigger shutdown. If this controller is an
- // aggregate, it will propagate stop() down the delegate hierarchy
- getEventListener().onCacheEmpty();
- }
-*/
}
}
@@ -1621,16 +1576,14 @@
public AnalysisEngineController getCasMultiplierController()
{
- int childControllerListSize = ((AggregateAnalysisEngineController_impl)this).childControllerList.size();
+ int childControllerListSize = ((AggregateAnalysisEngineController_impl)this).getChildControllerList().size();
if ( childControllerListSize > 0 )
{
for( int i=0; i < childControllerListSize; i++ )
{
AnalysisEngineController childController =
- (AnalysisEngineController)((AggregateAnalysisEngineController_impl)this).childControllerList.get(i);
- if ( childController instanceof PrimitiveAnalysisEngineController &&
- ((PrimitiveAnalysisEngineController)childController).isMultiplier()
- )
+ (AnalysisEngineController)((AggregateAnalysisEngineController_impl)this).getChildControllerList().get(i);
+ if ( childController.isCasMultiplier() )
{
return childController;
}
@@ -1651,7 +1604,6 @@
}
return null;
}
-
/**
* Callback method called the InProcessCache becomes empty meaning ALL CASes are processed.
@@ -1725,12 +1677,12 @@
if ( e != null )
{
((ControllerCallbackListener)controllerListeners.get(i)).
- notifyOnInitializationFailure(e);
+ notifyOnInitializationFailure(this, e);
}
else
{
((ControllerCallbackListener)controllerListeners.get(i)).
- notifyOnInitializationSuccess();
+ notifyOnInitializationSuccess(this);
}
}
}
@@ -1742,4 +1694,416 @@
perCasStatistics.remove(aCasReferenceId);
}
}
+
+ public boolean isCasMultiplier()
+ {
+ return casMultiplier;
+ }
+
+ public void releaseNextCas(String casReferenceId)
+ {
+ synchronized(syncObject)
+ {
+ // Check if the CAS is in the list of outstanding CASes and also exists in the cache
+ if ( cmOutstandingCASes.size() > 0 && cmOutstandingCASes.containsKey(casReferenceId) && getInProcessCache().entryExists(casReferenceId))
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
+ new Object[] { getComponentName(), casReferenceId });
+ try
+ {
+ CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ String parentCasReferenceId = cacheEntry.getInputCasReferenceId();
+ Endpoint freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
+ // If the CAS was created by a remote Cas Multiplier, send a Free CAS Notification
+ // to the CM.
+ if ( freeCasEndpoint != null )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_sending_fcq_req__FINE",
+ new Object[] { getComponentName(), casReferenceId, cacheEntry.getCasMultiplierKey(), freeCasEndpoint.getDestination() });
+ freeCasEndpoint.setReplyEndpoint(true);
+ getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, casReferenceId, freeCasEndpoint);
+ }
+ cacheEntry = null;
+ // Release the CAS and remove it from the InProcess cache
+ dropCAS(casReferenceId, true);
+ // Check if the CAS has a parent CAS
+ if ( parentCasReferenceId != null )
+ {
+ // Fetch the parent CAS from the InProcess Cache
+ cacheEntry = getInProcessCache().getCacheEntryForCAS(parentCasReferenceId);
+ if ( cacheEntry != null )
+ {
+ // Decrement number of child CASes in play
+ // Decrement has already happened in the final step before the CAS was sent to the client
+ //cacheEntry.decrementSubordinateCasInPlayCount();
+// if ( cacheEntry.isPendingReply() && cacheEntry.getSubordinateCasInPlayCount() == 0)
+ if ( cacheEntry.isPendingReply() && getInProcessCache().hasNoSubordinates(cacheEntry.getCasReferenceId()))
+ {
+ if ( this instanceof AggregateAnalysisEngineController )
+ {
+ ((AggregateAnalysisEngineController)this).finalStep( cacheEntry.getFinalStep(), parentCasReferenceId);
+ }
+ else // PrimitiveAnalysisEngineController
+ {
+ // Return an input CAS to the client. The input CAS is returned
+ // to the remote client only if all of the child CASes produced
+ // from the input CAS have been fully processed.
+ getOutputChannel().sendReply(cacheEntry.getCasReferenceId(), cacheEntry.getMessageOrigin());
+ dropCAS(cacheEntry.getCasReferenceId(), true);
+ }
+ }
+
+ }
+ }
+ // If debug level=FINEST dump the entire cache
+ getInProcessCache().dumpContents(getComponentName());
+
+ }
+ catch( Exception e)
+ {
+ e.printStackTrace();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
+ new Object[] { e});
+ }
+ }
+ }
+ }
+
+ private boolean validMessageForSnapshot( int msgType )
+ {
+ return ( AsynchAEMessage.Process == msgType || AsynchAEMessage.CollectionProcessComplete == msgType);
+ }
+
+ // Called by ServicePerformance MBean on separate thread
+
+ // This is called every time a request comes
+ public void beginProcess(int msgType )
+ {
+ // Disregard GetMeta as it comes on a non-process thread
+ if ( validMessageForSnapshot( msgType ) )
+ {
+ synchronized( mux )
+ {
+ AnalysisThreadState threadState = null;
+ if ( threadStateMap.containsKey(Thread.currentThread().getId()))
+ {
+ threadState = threadStateMap.get(Thread.currentThread().getId());
+ if (threadState.isIdle) {
+ threadState.setIdle(false);
+ threadState.incrementIdleTime(System.nanoTime()-threadState.getLastUpdate());
+ threadState.computeIdleTimeBetweenProcessCalls();
+ }
+ }
+ else
+ {
+ threadStateMap.put(Thread.currentThread().getId(), new AnalysisThreadState(Thread.currentThread().getId()));
+
+ threadState = threadStateMap.get(Thread.currentThread().getId());
+ threadState.setIdle(false);
+ threadState.incrementIdleTime(System.nanoTime()-startTime);
+ threadState.setLastMessageDispatchTime(startTime);
+ threadState.computeIdleTimeBetweenProcessCalls();
+ }
+ }
+ }
+ }
+ // This is called every time a request is completed
+ public void endProcess( int msgType )
+ {
+ // Disregard GetMeta as it comes on a non-process thread
+ if ( validMessageForSnapshot( msgType ) )
+ {
+ synchronized( mux )
+ {
+ AnalysisThreadState threadState = getThreadState();
+ threadState.setLastUpdate(System.nanoTime());
+ threadState.setIdle(true);
+ threadState.setLastMessageDispatchTime();
+ }
+ }
+ }
+ public long getIdleTimeBetweenProcessCalls(int msgType)
+ {
+ if ( validMessageForSnapshot( msgType ) )
+ {
+ synchronized( mux )
+ {
+ AnalysisThreadState threadState = getThreadState();
+ return threadState.getIdleTimeBetweenProcessCalls();
+ }
+ }
+ return 0;
+ }
+ public long getIdleTime()
+ {
+ synchronized( mux )
+ {
+ long now = System.nanoTime();
+ long serviceIdleTime = 0;
+ Set<Long> set = threadStateMap.keySet();
+ int howManyThreads = threadStateMap.size();
+ // Iterate over all processing threads to calculate the total amount of idle time
+ for(Long key: set )
+ {
+ // Retrieve the current thread state information from the global map. The key is
+ // the thread id.
+ AnalysisThreadState threadState = threadStateMap.get(key);
+ // add this thread idle time
+ serviceIdleTime += threadState.getIdleTime() ;
+
+ // If this thread is currently idle, compute amount of time elapsed since the last
+ // update. The last update has been done at the last startProcess() or endProcess() call.
+ if ( threadState.isIdle())
+ {
+ // compute idle time since the last update
+ long delta = now - threadState.getLastUpdate();
+
+ threadState.setLastUpdate(System.nanoTime());
+
+ // increment total idle time
+ threadState.incrementIdleTime(delta);
+ // add the elapsed time since the last update to the total idle time
+ serviceIdleTime += delta;
+ }
+ }
+ // If process CAS request has not yet been received, there are not process threads
+ // created yet. Simply return the delta since the service started. This is a special
+ // case which is only executing if the client has not sent any CASes for processing.
+ if ( howManyThreads == 0)
+ {
+ return System.nanoTime()-startTime;
+ }
+ else
+ {
+ // Return accumulated idle time from all processing threads. Divide the total idle by the
+ // number of process threads.
+
+ if ( this instanceof PrimitiveAnalysisEngineController )
+ {
+ int aeInstanceCount = ((PrimitiveAnalysisEngineController)this).getAEInstanceCount();
+ serviceIdleTime += (aeInstanceCount - howManyThreads)*(System.nanoTime()-startTime);
+ return serviceIdleTime/aeInstanceCount;
+ }
+ else
+ {
+ return serviceIdleTime;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns CPU Time with nanosecond precision (not nanosecond accuracy). If the OS/JVM
+ * does not support reporting the CPU Time, returns the wall clock time.
+ */
+ public synchronized long getCpuTime()
+ {
+ if ( ManagementFactory.getPlatformMBeanServer() != null )
+ {
+ ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
+ return bean.isCurrentThreadCpuTimeSupported( ) ? bean.getCurrentThreadCpuTime( ) : System.nanoTime();
+ }
+ return System.nanoTime();
+ }
+ private synchronized long getCpuTime(long threadId)
+ {
+ if ( ManagementFactory.getPlatformMBeanServer() != null )
+ {
+ ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
+ return bean.isCurrentThreadCpuTimeSupported( ) ? bean.getThreadCpuTime(threadId): System.nanoTime();
+ }
+ return System.nanoTime();
+ }
+
+ private AnalysisThreadState getFirstThreadState()
+ {
+ Set<Long> set = threadStateMap.keySet();
+ Iterator<Long> it = set.iterator();
+ return threadStateMap.get(it.next());
+
+ }
+ /**
+ * Returns the {@link AnalysisThreadState} object associated with the current thread.
+ *
+ * @return
+ */
+ private AnalysisThreadState getThreadState()
+ {
+ AnalysisThreadState threadState;
+ if ( this instanceof AggregateAnalysisEngineController )
+ {
+ threadState = getFirstThreadState();
+ }
+ else
+ {
+ threadState = threadStateMap.get(Thread.currentThread().getId());
+ if ( threadState == null )
+ {
+ // This may be the case if the thread processing
+ // FreeCASRequest is returning an input CAS to the client.
+ // This thread is different from the process thread, thus
+ // we just return the first thread's state.
+ threadState = getFirstThreadState();
+ }
+ }
+ return threadState;
+ }
+
+ /**
+ * Returns the total CPU time all processing threads spent in analysis.
+ * This method subtracts the serialization and de-serialization time from
+ * the total. If this service is an aggregate, the return time is a sum
+ * of CPU utilization in each colocated delegate.
+ */
+ public long getAnalysisTime()
+ {
+ Set<Long> set = threadStateMap.keySet();
+ Iterator<Long> it = set.iterator();
+ long totalCpuProcessTime = 0;
+ // Iterate over all processing threads
+ while( it.hasNext())
+ {
+ long threadId = it.next();
+ synchronized( mux )
+ {
+ // Fetch the next thread's stats
+ AnalysisThreadState threadState = threadStateMap.get(threadId);
+ // If an Aggregate service, sum up the CPU times of all collocated
+ // delegates.
+ if ( this instanceof AggregateAnalysisEngineController_impl )
+ {
+ // Get a list of all colocated delegate controllers from the Aggregate
+ List<AnalysisEngineController> delegateControllerList =
+ ((AggregateAnalysisEngineController_impl)this).childControllerList;
+ // Iterate over all colocated delegates
+ for( int i=0; i < delegateControllerList.size(); i++)
+ {
+ // Get the next delegate's controller
+ AnalysisEngineController delegateController =
+ (AnalysisEngineController)delegateControllerList.get(i);
+ if ( delegateController != null && !delegateController.isStopped())
+ {
+ // get the CPU time for all processing threads in the current controller
+ totalCpuProcessTime += delegateController.getAnalysisTime();
+ }
+ }
+ }
+ else // Primitive Controller
+ {
+ // Get the CPU time of a thread with a given ID
+ totalCpuProcessTime += getCpuTime(threadId);
+ }
+ // Subtract serialization and deserialization times from the total CPU used
+ if ( totalCpuProcessTime > 0 )
+ {
+ totalCpuProcessTime -= threadState.getDeserializationTime();
+ totalCpuProcessTime -= threadState.getSerializationTime();
+ }
+ }
+ }
+ return totalCpuProcessTime;
+ }
+ /**
+ * Increments the time this thread spent in serialization of a CAS
+ */
+ public void incrementSerializationTime(long cpuTime)
+ {
+ synchronized( mux )
+ {
+ AnalysisThreadState threadState = getThreadState();
+ threadState.incrementSerializationTime(cpuTime);
+ }
+ }
+ /**
+ * Increments the time this thread spent in deserialization of a CAS
+ */
+ public void incrementDeserializationTime(long cpuTime)
+ {
+ synchronized( mux )
+ {
+ AnalysisThreadState threadState = getThreadState();
+ threadState.incrementDeserializationTime(cpuTime);
+ }
+ }
+ private class AnalysisThreadState
+ {
+ private long threadId;
+
+ private boolean isIdle = false;
+ private long lastUpdate = 0;
+ private long totalIdleTime = 0;
+ // Measures idle time between process CAS calls
+ private long idleTimeSinceLastProcess = 0;
+ private long lastMessageDispatchTime = 0;
+
+ private long serializationTime = 0;
+ private long deserializationTime = 0;
+
+ public AnalysisThreadState( long aThreadId )
+ {
+ threadId = aThreadId;
+ }
+
+ public long getThreadId()
+ {
+ return threadId;
+ }
+ public long getSerializationTime() {
+ return serializationTime;
+ }
+ public void incrementSerializationTime(long serializationTime) {
+ this.serializationTime += serializationTime;
+ }
+ public long getDeserializationTime() {
+ return deserializationTime;
+ }
+ public void incrementDeserializationTime(long deserializationTime) {
+ this.deserializationTime += deserializationTime;
+ }
+ public boolean isIdle() {
+ return isIdle;
+ }
+ public void computeIdleTimeBetweenProcessCalls()
+ {
+ idleTimeSinceLastProcess = System.nanoTime() - lastMessageDispatchTime;
+ }
+ public void setLastMessageDispatchTime( long aTime )
+ {
+ lastMessageDispatchTime = aTime;
+ }
+ public void incrementIdleTime( long idleTime )
+ {
+ totalIdleTime += idleTime;
+ }
+ public void setIdle(boolean isIdle) {
+ this.isIdle = isIdle;
+ }
+ public long getIdleTime()
+ {
+ return totalIdleTime;
+ }
+ public void setLastMessageDispatchTime()
+ {
+ lastMessageDispatchTime = System.nanoTime();
+ }
+ public long getIdleTimeBetweenProcessCalls()
+ {
+ long val = idleTimeSinceLastProcess;
+ // Reset so that only one reply contains a non-zero value
+ idleTimeSinceLastProcess = 0;
+ return val;
+ }
+ public long getLastUpdate() {
+ return lastUpdate;
+ }
+ public void setLastUpdate(long lastUpdate) {
+ this.lastUpdate = lastUpdate;
+ }
+
+ }
+
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java Fri Aug 22 11:53:05 2008
@@ -22,6 +22,8 @@
public interface ControllerCallbackListener
{
public void notifyOnTermination( String aMessage );
+ public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e);
+ public void notifyOnInitializationSuccess(AnalysisEngineController aController);
public void notifyOnInitializationFailure( Exception e);
public void notifyOnInitializationSuccess();
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java Fri Aug 22 11:53:05 2008
@@ -136,4 +136,8 @@
public void setIdleTime( long anIdleTime );
public long getIdleTime();
+
+ public void setEndpointServer( String anEndpointServer );
+
+ public String getEndpointServer();
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java Fri Aug 22 11:53:05 2008
@@ -100,9 +100,15 @@
private boolean tempReplyDestination;
private int initialHeapSize;
+
private volatile boolean replyDestinationFailed;
private long idleTime=0;
+
+ // This is supplied by the remote client. It needs to be
+ // echoed back to the client.
+ private String endpointServer = null;
+
public int getCommand()
{
return command;
@@ -375,7 +381,7 @@
{
if ( serviceInfo == null )
{
- serviceInfo = new ServiceInfo();
+ serviceInfo = new ServiceInfo(isCasMultiplier);
serviceInfo.setBrokerURL(serverURI);
serviceInfo.setInputQueueName(endpoint);
serviceInfo.setState("Active");
@@ -482,6 +488,7 @@
public void setIsCasMultiplier(boolean trueORfalse)
{
isCasMultiplier = trueORfalse;
+ getServiceInfo().setCASMultiplier();
}
public void setShadowCasPoolSize( int aPoolSize )
{
@@ -547,4 +554,13 @@
public String toString() {
return endpoint;
}
+
+ public void setEndpointServer( String anEndpointServer ){
+ endpointServer = anEndpointServer;
+ }
+
+ public String getEndpointServer() {
+ return endpointServer;
+ }
+
}
\ No newline at end of file
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java Fri Aug 22 11:53:05 2008
@@ -25,10 +25,8 @@
public interface PrimitiveAnalysisEngineController extends AnalysisEngineController
{
- public boolean isMultiplier();
- public void releaseNextCas();
public void setAnalysisEngineInstancePool( AnalysisEngineInstancePool aPool);
public PrimitiveServiceInfo getServiceInfo();
public void addAbortedCasReferenceId( String aCasReferenceId );
-
+ public int getAEInstanceCount();
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Fri Aug 22 11:53:05 2008
@@ -19,9 +19,12 @@
package org.apache.uima.aae.controller;
+import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.management.ObjectName;
@@ -55,6 +58,8 @@
import org.apache.uima.resource.metadata.impl.ConfigurationParameter_impl;
import org.apache.uima.util.Level;
+import sun.management.snmp.jvminstr.JvmThreadInstanceEntryImpl.ThreadStateMap;
+
public class PrimitiveAnalysisEngineController_impl
extends BaseAnalysisEngineController implements PrimitiveAnalysisEngineController
{
@@ -70,17 +75,13 @@
private List aeList = new ArrayList();
- private List cmOutstandingCASes = new ArrayList();
-
private int throttleWindow = 0;
private Object gater = new Object();
private long howManyBeforeReplySeen = 0;
- private boolean isMultiplier = false;
- private Object syncObject = new Object();
private PrimitiveServiceInfo serviceInfo = null;
@@ -88,6 +89,7 @@
private String abortedCASReferenceId = null;
+
public PrimitiveAnalysisEngineController_impl(String anEndpointName, String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize) throws Exception
{
this(null, anEndpointName, anAnalysisEngineDescriptor, aCasManager, anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, 0);
@@ -122,6 +124,11 @@
this(aParentController, anEndpointName, anAnalysisEngineDescriptor, aCasManager, anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, 0, aJmxManagement);
}
+
+ public int getAEInstanceCount()
+ {
+ return analysisEnginePoolSize;
+ }
public void initialize() throws AsynchAEException
{
try
@@ -138,7 +145,7 @@
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, getClass().getName(), "initialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_primitive_ctrl_init_info__CONFIG", new Object[] { analysisEnginePoolSize });
-
+ // Instantiate and initialize UIMA analytics
for (int i = 0; i < analysisEnginePoolSize; i++)
{
AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(resourceSpecifier, paramsMap);
@@ -148,16 +155,11 @@
if (i == 0)
{
analysisEngineMetadata = ae.getAnalysisEngineMetaData();
- if ( analysisEngineMetadata.getOperationalProperties().getOutputsNewCASes())
- {
- isMultiplier = true;
- }
}
}
-
if ( serviceInfo == null )
{
- serviceInfo = new PrimitiveServiceInfo();
+ serviceInfo = new PrimitiveServiceInfo(isCasMultiplier());
}
serviceInfo.setAnalysisEngineInstanceCount(analysisEnginePoolSize);
@@ -176,12 +178,16 @@
if (isTopLevelComponent())
{
getCasManagerWrapper().initialize("PrimitiveAEService");
+ CAS cas = getCasManagerWrapper().getNewCas("PrimitiveAEService");
+ cas.release();
}
}
+
// All internal components of this Primitive have been initialized. Open the latch
// so that this service can start processing requests.
latch.openLatch(getName(), isTopLevelComponent(), true);
+
}
catch ( Exception e)
{
@@ -197,20 +203,20 @@
}
catch ( AsynchAEException e)
{
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "initialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ e.printStackTrace();
throw e;
}
catch ( Exception e)
{
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "initialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ e.printStackTrace();
throw new AsynchAEException(e);
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_initialized_controller__INFO", new Object[] { getComponentName() });
super.serviceInitialized = true;
}
- public boolean isMultiplier()
- {
- return isMultiplier;
- }
/**
*
@@ -218,7 +224,8 @@
public void collectionProcessComplete(Endpoint anEndpoint)// throws AsynchAEException
{
AnalysisEngine ae = null;
-
+// long start = System.nanoTime();
+ long start = super.getCpuTime();
try
{
ae = aeInstancePool.checkout();
@@ -227,8 +234,12 @@
ae.collectionProcessComplete();
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cpc_all_cases_processed__FINEST", new Object[] { getComponentName() });
+ getServicePerformance().incrementAnalysisTime(super.getCpuTime()-start);
+// getServicePerformance().incrementAnalysisTime(System.nanoTime()-start);
getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cpc_completed__FINE", new Object[] { getComponentName()});
+ //getServicePerformance().reset();
+
}
catch ( Exception e)
{
@@ -289,24 +300,8 @@
{
return;
}
-/*
- try
- {
- // Test to see if the connection to the reply endpoint can be created
- // If the client has died, dont waste time analyzing the CAS.
- getOutputChannel().bindWithClientEndpoint(anEndpoint);
- }
- catch( Exception e)
- {
- if ( isTopLevelComponent() )
- {
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_client_drop_cas__INFO", new Object[] { getComponentName(), aCasReferenceId, anEndpoint.getEndpoint()});
- dropCAS(aCasReferenceId, true);
- }
- return;
- }
-*/
+
+ boolean inputCASReturned = false;
boolean processingFailed = false;
// This is a primitive controller. No more processing is to be done on the Cas. Mark the destination as final and return CAS in reply.
anEndpoint.setFinal(true);
@@ -333,54 +328,46 @@
return;
}
- long time = System.nanoTime();
+ // Get input CAS entry from the InProcess cache
+ CacheEntry inputCASEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+
+ long time = super.getCpuTime();
long totalProcessTime = 0; // stored total time spent producing ALL CASes
+
+// super.beginAnalysis();
CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
+// super.endAnalysis();
+
// Store how long it took to call processAndOutputNewCASes()
- totalProcessTime = ( System.nanoTime() - time);
+ totalProcessTime = ( super.getCpuTime() - time);
long sequence = 1;
- String newCasReferenceId = null;
long hasNextTime = 0; // stores time in hasNext()
long getNextTime = 0; // stores time in next();
- long timeToProcessCAS = 0; // stores time in hasNext() and next() for each CAS
boolean moreCASesToProcess = true;
-/*
-
- String parentCasReferenceId = null;
- CacheEntry inputCasCacheEntry = null;
- try
- {
- // Fetch cache entry for the input CAS
- inputCasCacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
- parentCasReferenceId = inputCasCacheEntry.getInputCasReferenceId();
- }
- catch( Exception e )
- {
- // An exception be be thrown here if the service is being stopped.
- // The top level controller may have already cleaned up the cache
- // and the getCacheEntryForCAS() will throw an exception. Ignore it
- // here, we are shutting down.
- }
-*/
while (moreCASesToProcess)
{
- hasNextTime = System.nanoTime();
+ long timeToProcessCAS = 0; // stores time in hasNext() and next() for each CAS
+ hasNextTime = super.getCpuTime();
+// super.beginAnalysis();
if ( !casIterator.hasNext() )
{
moreCASesToProcess = false;
// Measure how long it took to call hasNext()
- timeToProcessCAS = (System.nanoTime()-hasNextTime);
+ timeToProcessCAS = (super.getCpuTime()-hasNextTime);
totalProcessTime += timeToProcessCAS;
- break;
+// super.endAnalysis();
+ break; // from while
}
// Measure how long it took to call hasNext()
- timeToProcessCAS = (System.nanoTime()-hasNextTime);
- getNextTime = System.nanoTime();
+ timeToProcessCAS = (super.getCpuTime()-hasNextTime);
+ getNextTime = super.getCpuTime();
CAS casProduced = casIterator.next();
+// super.endAnalysis();
// Add how long it took to call next()
- timeToProcessCAS += (System.nanoTime()- getNextTime);
+ timeToProcessCAS += (super.getCpuTime()- getNextTime);
// Add time to call hasNext() and next() to the running total
totalProcessTime += timeToProcessCAS;
+
// If the service is stopped or aborted, stop generating new CASes and just return the input CAS
if ( stopped || abortGeneratingCASes(aCasReferenceId))
{
@@ -412,60 +399,61 @@
}
OutOfTypeSystemData otsd = getInProcessCache().getOutOfTypeSystemData(aCasReferenceId);
MessageContext mContext = getInProcessCache().getMessageAccessorByReference(aCasReferenceId);
- sequence++;
- newCasReferenceId = getInProcessCache().register( casProduced, mContext, otsd);
- CacheEntry newEntry = getInProcessCache().getCacheEntryForCAS(newCasReferenceId);
-/*
- if ( parentCasReferenceId == null )
- {
- newEntry.setInputCasReferenceId(aCasReferenceId);
- }
- else
- {
- newEntry.setInputCasReferenceId(parentCasReferenceId);
- }
-*/
+ CacheEntry newEntry = getInProcessCache().register( casProduced, mContext, otsd);
+ // Associate input CAS with the new CAS
newEntry.setInputCasReferenceId(aCasReferenceId);
+ newEntry.setCasSequence(sequence);
// Add to the cache how long it took to process the generated (subordinate) CAS
- getCasStatistics(newCasReferenceId).incrementAnalysisTime(timeToProcessCAS);
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_produced_new_cas__FINE", new Object[] { Thread.currentThread().getName(),getComponentName(),newCasReferenceId, aCasReferenceId });
+ getCasStatistics(newEntry.getCasReferenceId()).incrementAnalysisTime(timeToProcessCAS);
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_produced_new_cas__FINE", new Object[] { Thread.currentThread().getName(),getComponentName(),newEntry.getCasReferenceId(), aCasReferenceId });
+ // Add the generated CAS to the outstanding CAS Map. Client notification will release
+ // this CAS back to its pool
synchronized(syncObject)
{
- cmOutstandingCASes.add(newCasReferenceId);
- getOutputChannel().sendReply(casProduced, aCasReferenceId, newCasReferenceId, anEndpoint, sequence);
+ if ( isTopLevelComponent() )
+ {
+ inputCASEntry.incrementSubordinateCasInPlayCount();
+ // Add the id of the generated CAS to the map holding outstanding CASes. This
+ // map will be referenced when a client sends Free CAS Notification. The map
+ // stores the id of the CAS both as a key and a value. Map is used to facilitate
+ // quick lookup
+ cmOutstandingCASes.put(newEntry.getCasReferenceId(),newEntry.getCasReferenceId());
+ }
+ // Send generated CAS to the client
+ getOutputChannel().sendReply(newEntry, anEndpoint);
}
// Remove Stats from the global Map associated with the new CAS
- // This stats for this CAS were added to the response message
+ // These stats for this CAS were added to the response message
// and are no longer needed
- dropCasStatistics(newCasReferenceId);
+ dropCasStatistics(newEntry.getCasReferenceId());
// Increment number of CASes processed by this service
- getServicePerformance().incrementNumberOfCASesProcessed();
- getServicePerformance().incrementAnalysisTime(timeToProcessCAS);
- }
-
- LongNumericStatistic statistic = null;
- if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalAEProcessTime)) != null )
- {
- // Increment how long it took to process the input CAS. This timer is exposed via JMX
- statistic.increment(totalProcessTime);
- }
-/*
- if (newCasReferenceId != null)
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), newCasReferenceId, (double) (System.nanoTime() - time) / (double) 1000000 });
- }
- else
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), aCasReferenceId, (double) (System.nanoTime() - time) / (double) 1000000 });
-
+ sequence++;
}
-*/
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), aCasReferenceId, (double) (System.nanoTime() - time) / (double) 1000000 });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), aCasReferenceId, (double) (super.getCpuTime() - time) / (double) 1000000 });
getMonitor().resetCountingStatistic("", Monitor.ProcessErrorCount);
+
+ // Store total time spent processing this input CAS
getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
- // Aggregate total time spent processing the input CAS
- getServicePerformance().incrementAnalysisTime(totalProcessTime);
- getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
+ // Aggregate total time spent processing in this service. This is separate from per CAS stats above
+// getServicePerformance().incrementAnalysisTime(totalProcessTime);
+ synchronized( cmOutstandingCASes )
+ {
+ if ( cmOutstandingCASes.size() == 0)
+ {
+ inputCASReturned = true;
+
+ // Return an input CAS to the client if there are no outstanding child CASes in play
+ getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
+ }
+ else
+ {
+ // Change the state of the input CAS. Since the input CAS is not returned to the client
+ // until all children of this CAS has been fully processed we keep the input in the cache.
+ // The client will send Free CAS Notifications to release CASes produced here. When the
+ // last child CAS is freed, the input CAS is allowed to be returned to the client.
+ inputCASEntry.setPendingReply(true);
+ }
+ }
}
catch ( Throwable e)
{
@@ -507,8 +495,9 @@
getInProcessCache().releaseCASesProducedFromInputCAS(aCasReferenceId);
}
- else
+ else if ( inputCASReturned )
{
+ // Remove input CAS cache entry if the CAS has been sent to the client
dropCAS(aCasReferenceId, true);
}
}
@@ -620,32 +609,6 @@
{
return super.getMetaData().getName();
}
- public void releaseNextCas()
- {
- synchronized(syncObject)
- {
- if ( cmOutstandingCASes.size() > 0 )
- {
- try
- {
- String casReferenceId = (String)cmOutstandingCASes.remove(0);
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
- new Object[] { getName(), casReferenceId });
- if ( casReferenceId != null && getInProcessCache().entryExists(casReferenceId))
- {
- dropCAS(casReferenceId, true);
- }
- }
- catch( Exception e)
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
- new Object[] { e});
- }
- }
- }
- }
public void setAnalysisEngineInstancePool( AnalysisEngineInstancePool aPool)
{
@@ -665,6 +628,10 @@
}
serviceInfo.setState("Running");
+ if ( isCasMultiplier() )
+ {
+ serviceInfo.setCASMultiplier();
+ }
return serviceInfo;
}
@@ -683,7 +650,6 @@
if ( cmOutstandingCASes != null )
{
cmOutstandingCASes.clear();
- cmOutstandingCASes = null;
}
if ( aeList != null )
{
@@ -691,4 +657,5 @@
aeList = null;
}
}
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java Fri Aug 22 11:53:05 2008
@@ -205,7 +205,7 @@
{
// If there is an exception, notify listener with failure
if ( e != null ) {
- (this.listeners.get(i)).notifyOnInitializationFailure(e);
+ (this.listeners.get(i)).notifyOnInitializationFailure( e);
}
// else, Success!
else {
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Fri Aug 22 11:53:05 2008
@@ -138,6 +138,7 @@
}
public boolean handleError(Throwable t, ErrorContext anErrorContext, AnalysisEngineController aController)
{
+ CacheEntry parentCasCacheEntry = null;
if ( !isHandlerForError(anErrorContext, AsynchAEMessage.Process))
{
@@ -372,7 +373,6 @@
}
catch( Exception exc) {}
}
-
// Check if the caller has already decremented number of subordinates. This property is only
// set in the Aggregate's finalStep() method before the CAS is sent back to the client. If
// there was a problem sending the CAS to the client, we dont want to update the counter
@@ -388,12 +388,11 @@
{
try
{
- CacheEntry parentCasCacheEntry = aController.getInProcessCache().
+ parentCasCacheEntry = aController.getInProcessCache().
getCacheEntryForCAS(parentCasReferenceId);
synchronized( parentCasCacheEntry )
{
- ((AggregateAnalysisEngineController)aController).
- decrementCasSubordinateCount( parentCasCacheEntry);
+ parentCasCacheEntry.decrementSubordinateCasInPlayCount();
if ( parentCasCacheEntry.getSubordinateCasInPlayCount() == 0 &&
parentCasCacheEntry.isPendingReply())
{
@@ -499,6 +498,11 @@
}
if ( casReferenceId != null && aController instanceof AggregateAnalysisEngineController )
{
+ if ( parentCasCacheEntry != null && parentCasCacheEntry.getSubordinateCasInPlayCount() == 0 &&
+ parentCasCacheEntry.isPendingReply())
+ {
+ ((AggregateAnalysisEngineController)aController).finalStep(parentCasCacheEntry.getFinalStep(), parentCasCacheEntry.getCasReferenceId());
+ }
// Cleanup state information from local caches
((AggregateAnalysisEngineController)aController).dropFlow(casReferenceId, true);
((AggregateAnalysisEngineController)aController).removeMessageOrigin(casReferenceId);
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java Fri Aug 22 11:53:05 2008
@@ -21,11 +21,14 @@
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.message.UIMAMessage;
@@ -165,6 +168,7 @@
public Handler getDelegate()
{
+// System.out.println("In getDelegate() - Returning:"+delegateHandler.getName());
return delegateHandler;
}
@@ -217,5 +221,156 @@
{
}
+ protected synchronized void aggregateDelegateStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException
+ {
+ String delegateKey = "";
+ try
+ {
+
+ delegateKey = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ if ( entry == null )
+ {
+ throw new AsynchAEException("CasReferenceId:"+aCasReferenceId+" Not Found in the Cache.");
+ }
+ CacheEntry inputCasEntry = null;
+ String inputCasReferenceId = entry.getInputCasReferenceId();
+ ServicePerformance casStats =
+ ((AggregateAnalysisEngineController)getController()).getCasStatistics(aCasReferenceId);
+ if ( inputCasReferenceId != null &&
+ getController().getInProcessCache().entryExists(inputCasReferenceId) )
+ {
+ String casProducerKey = entry.getCasProducerKey();
+ if ( casProducerKey != null &&
+ ((AggregateAnalysisEngineController)getController()).
+ isDelegateKeyValid(casProducerKey) )
+ {
+ // Get entry for the input CAS
+ inputCasEntry = getController().
+ getInProcessCache().
+ getCacheEntryForCAS(inputCasReferenceId);
+ }
+
+ }
+ ServicePerformance delegateServicePerformance =
+ ((AggregateAnalysisEngineController)getController()).getServicePerformance(delegateKey);
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS))
+ {
+ long timeToSerializeCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+ if ( timeToSerializeCAS > 0)
+ {
+ if ( delegateServicePerformance != null )
+ {
+ delegateServicePerformance.
+ incrementCasSerializationTime(timeToSerializeCAS);
+ }
+ }
+ }
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS))
+ {
+ long timeToDeserializeCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+ if ( timeToDeserializeCAS > 0 )
+ {
+ if ( delegateServicePerformance != null )
+ {
+ delegateServicePerformance.
+ incrementCasDeserializationTime(timeToDeserializeCAS);
+ }
+ }
+ }
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime))
+ {
+ long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
+ if ( idleTime > 0 && delegateServicePerformance != null )
+ {
+ Endpoint endp = aMessageContext.getEndpoint();
+ if ( endp != null && endp.isRemote())
+ {
+ delegateServicePerformance.incrementIdleTime(idleTime);
+ }
+ }
+ }
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS))
+ {
+ long timeWaitingForCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
+ if ( aMessageContext.getEndpoint().isRemote())
+ {
+ entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+ if ( inputCasEntry != null )
+ {
+ inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+ }
+ }
+ }
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS))
+ {
+ long timeInProcessCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
+ Endpoint endp = aMessageContext.getEndpoint();
+ if ( endp != null && endp.isRemote())
+ {
+ if ( delegateServicePerformance != null )
+ {
+ delegateServicePerformance.incrementAnalysisTime(timeInProcessCAS);
+ }
+ }
+ else
+ {
+ getController().getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
+ }
+ if ( inputCasReferenceId != null )
+ {
+ ServicePerformance inputCasStats =
+ ((AggregateAnalysisEngineController)getController()).
+ getCasStatistics(inputCasReferenceId);
+ // Update processing time for this CAS
+ if ( inputCasStats != null )
+ {
+ inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+ }
+ }
+ }
+ }
+ catch( AsynchAEException e)
+ {
+ throw e;
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+ }
+ protected void computeStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException
+ {
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService))
+ {
+ long departureTime = getController().getTime(aCasReferenceId, aMessageContext.getEndpoint().getEndpoint());
+ long currentTime = System.nanoTime();
+ long roundTrip = currentTime - departureTime;
+ long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+ long totalTimeInComms = currentTime - (departureTime - timeInService);
+
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
+ new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),(double) roundTrip / (double) 1000000 });
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
+ new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000, aMessageContext.getEndpoint() });
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
+ new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000, aMessageContext.getEndpoint() });
+ }
+
+ if ( getController() instanceof AggregateAnalysisEngineController )
+ {
+ aggregateDelegateStats( aMessageContext, aCasReferenceId );
+ }
+ }
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java Fri Aug 22 11:53:05 2008
@@ -78,7 +78,6 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handle", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_msg_for_next_handler__FINEST",
new Object[] { messageType });
-
super.getDelegate().handle(anObjectToHandle);
}
else
@@ -92,6 +91,7 @@
}
catch( Exception e)
{
+ e.printStackTrace();
getController().getErrorHandlerChain().handle(e, HandlerBase.populateErrorContext( (MessageContext)anObjectToHandle ), getController());
}
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java Fri Aug 22 11:53:05 2008
@@ -26,6 +26,7 @@
import org.apache.uima.aae.handler.HandlerBase;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.util.Level;
public class MetadataResponseHandler_impl extends HandlerBase
@@ -65,7 +66,18 @@
String analysisEngineMetadata = ((MessageContext)anObjectToHandle).getStringMessage();
String fromEndpoint = ((MessageContext)anObjectToHandle).getMessageStringProperty(AsynchAEMessage.MessageFrom);
- ((AggregateAnalysisEngineController) getController()).mergeTypeSystem(analysisEngineMetadata, fromEndpoint);
+ String fromServer = null;
+ if ( ((MessageContext)anObjectToHandle).propertyExists(AsynchAEMessage.EndpointServer))
+ {
+
+ fromServer =((MessageContext)anObjectToHandle).getMessageStringProperty(AsynchAEMessage.EndpointServer);
+
+ }
+ else if ( ((MessageContext)anObjectToHandle).propertyExists(UIMAMessage.ServerURI))
+ {
+ fromServer = ((MessageContext)anObjectToHandle).getMessageStringProperty(UIMAMessage.ServerURI);
+ }
+ ((AggregateAnalysisEngineController) getController()).mergeTypeSystem(analysisEngineMetadata, fromEndpoint, fromServer);
}
}
else