You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/06/24 20:58:35 UTC
svn commit: r788125 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
Author: cwiklik
Date: Wed Jun 24 18:58:35 2009
New Revision: 788125
URL: http://svn.apache.org/viewvc?rev=788125&view=rev
Log:
UIMA-1358 Modified to stop generation of CASes
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=788125&r1=788124&r2=788125&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Wed Jun 24 18:58:35 2009
@@ -20,6 +20,7 @@
package org.apache.uima.aae.controller;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,7 @@
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.cas.impl.OutOfTypeSystemData;
import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.metadata.ConfigurationParameter;
import org.apache.uima.resource.metadata.impl.ConfigurationParameter_impl;
@@ -363,10 +365,17 @@
if ( stopped )
{
- return;
+ return;
}
- CasStateEntry parentCasStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
- long totalProcessTime = 0; // stored total time spent producing ALL CASes
+ CasStateEntry parentCasStateEntry = null;
+ try {
+ parentCasStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
+ } catch ( Exception e) {
+ e.printStackTrace();
+ return;
+ }
+
+ long totalProcessTime = 0; // stored total time spent producing ALL CASes
boolean inputCASReturned = false;
boolean processingFailed = false;
@@ -377,19 +386,16 @@
{
// Checkout an instance of AE from the pool
ae = aeInstancePool.checkout();
-
// Get input CAS entry from the InProcess cache
- CacheEntry inputCASEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
long time = super.getCpuTime();
-
CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
-
// Store how long it took to call processAndOutputNewCASes()
totalProcessTime = ( super.getCpuTime() - time);
long sequence = 1;
long hasNextTime = 0; // stores time in hasNext()
long getNextTime = 0; // stores time in next();
boolean moreCASesToProcess = true;
+ boolean casAbortedDueToExternalRequest = false;
while (moreCASesToProcess)
{
long timeToProcessCAS = 0; // stores time in hasNext() and next() for each CAS
@@ -408,11 +414,12 @@
CAS casProduced = casIterator.next();
// Add how long it took to call next()
timeToProcessCAS += (super.getCpuTime()- getNextTime);
- // Add time to call hasNext() and next() to the running total
+ // Add time to call hasNext() and next() to the running total
totalProcessTime += timeToProcessCAS;
-
+ casAbortedDueToExternalRequest =
+ abortGeneratingCASes(aCasReferenceId);
// If the service is stopped or aborted, stop generating new CASes and just return the input CAS
- if ( stopped || abortGeneratingCASes(aCasReferenceId))
+ if ( stopped || casAbortedDueToExternalRequest)
{
if ( getInProcessCache() != null && getInProcessCache().getSize() > 0 && getInProcessCache().entryExists(aCasReferenceId))
{
@@ -423,7 +430,7 @@
}
catch( Exception e )
{
- // An exception be be thrown here if the service is being stopped.
+ // An exception be be thrown here if the service is being stopped.
// The top level controller may have already cleaned up the cache
// and the getCacheEntryForCAS() will throw an exception. Ignore it
// here, we are shutting down.
@@ -436,13 +443,27 @@
// as there may potentially be a problem with a Class Loader.
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
((CASImpl)aCAS).enableReset(true);
+ try {
+ // We are either stopping the service or aborting input CAS due to explicit STOP request
+ // from a client. If a new CAS was produced, release it back to the pool.
+ if ( casProduced != null ) {
+ casProduced.release();
+ }
+ } catch( Exception e) {}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stopped_producing_new_cases__INFO", new Object[] { Thread.currentThread().getId(),getComponentName(),aCasReferenceId });
}
System.out.println(">>>> Cas Multiplier:"+getComponentName()+" Stopped Generating CASes from Input CAS:"+aCasReferenceId);
}
}
- return;
+ if ( casAbortedDueToExternalRequest ) {
+ // The controller was told to stop generating new CASes. Just return the input CAS to the
+ // client
+ throw new ResourceProcessException(new InterruptedException("Cas Multiplier:"+getComponentName()+" Aborted CAS:"+aCasReferenceId));
+ } else {
+ // The controller is stopping
+ return;
+ }
}
OutOfTypeSystemData otsd = getInProcessCache().getOutOfTypeSystemData(aCasReferenceId);
MessageContext mContext = getInProcessCache().getMessageAccessorByReference(aCasReferenceId);
@@ -452,6 +473,7 @@
// all CASes it has in play that were generated from the input CAS.
CasStateEntry childCasStateEntry = null;
if ( !isTopLevelComponent() ) {
+ newEntry.setNewCas(true, parentController.getComponentName());
// Create CAS state entry in the aggregate's local cache
childCasStateEntry = parentController.getLocalCache().createCasStateEntry(newEntry.getCasReferenceId());
// Fetch the parent CAS state entry from the aggregate's local cache. We need to increment
@@ -471,7 +493,7 @@
// Add to the cache how long it took to process the generated (subordinate) CAS
getCasStatistics(newEntry.getCasReferenceId()).incrementAnalysisTime(timeToProcessCAS);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_produced_new_cas__FINE", new Object[] { Thread.currentThread().getName(),getComponentName(),newEntry.getCasReferenceId(), aCasReferenceId });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_produced_new_cas__FINE", new Object[] { Thread.currentThread().getName(),getUimaContextAdmin().getQualifiedContextName(),newEntry.getCasReferenceId(), aCasReferenceId });
}
// Add the generated CAS to the outstanding CAS Map. Client notification will release
// this CAS back to its pool
@@ -504,17 +526,24 @@
message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
message.addLongProperty(AsynchAEMessage.IdleTime, iT );
- transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
+ if ( !stopped ) {
+ transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
+ }
}
else
{
// Send generated CAS to the client
- getOutputChannel().sendReply(newEntry, anEndpoint);
+ if ( !stopped ) {
+ getOutputChannel().sendReply(newEntry, anEndpoint);
+ }
}
// Remove the new CAS state entry from the local cache if this a top level primitive.
// If not top level, the client (an Aggregate) will remove this entry when this new
// generated CAS reaches Final State.
if ( isTopLevelComponent() ) {
+ try {
+ localCache.lookupEntry(newEntry.getCasReferenceId()).setDropped(true);
+ } catch( Exception e) {}
localCache.remove(newEntry.getCasReferenceId());
}
@@ -531,7 +560,7 @@
// Set total number of children generated from this CAS
// Store total time spent processing this input CAS
getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
- if ( !anEndpoint.isRemote())
+ if ( !anEndpoint.isRemote() )
{
inputCASReturned = true;
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
@@ -548,15 +577,20 @@
long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
message.addLongProperty(AsynchAEMessage.IdleTime, iT );
// Send reply back to the client. Use internal (non-jms) transport
- transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
+ if ( !stopped ) {
+ transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
+ }
}
else
{
+ if ( !stopped ) {
getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
+ }
inputCASReturned = true;
}
// Remove input CAS state entry from the local cache
if ( !isTopLevelComponent() ) {
+ localCache.lookupEntry(aCasReferenceId).setDropped(true);
localCache.remove(aCasReferenceId);
}
}
@@ -736,6 +770,26 @@
}
if ( cmOutstandingCASes != null )
{
+ if ( !cmOutstandingCASes.isEmpty()) {
+ // If there are outstanding CASes, force them to be released
+ // If the CM is blocking on getCAS() this will unblock it and
+ // enable termination. Otherwise, a hang may occur
+ Iterator<String> it = cmOutstandingCASes.keySet().iterator();
+ while( it.hasNext()) {
+ String casId = it.next();
+ try {
+ CacheEntry entry = getInProcessCache().getCacheEntryForCAS(casId);
+ if ( entry != null && entry.getCas() != null ) {
+ System.out.println("Primitive:"+getComponentName()+" Forcing Release of CAS:"+casId+" in stop()");
+ // Force CAS release to unblock CM thread
+ entry.getCas().release();
+ }
+ } catch ( Exception e) {
+
+ }
+ }
+
+ }
cmOutstandingCASes.clear();
}
if ( aeList != null )