You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/06/24 20:58:35 UTC

svn commit: r788125 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java

Author: cwiklik
Date: Wed Jun 24 18:58:35 2009
New Revision: 788125

URL: http://svn.apache.org/viewvc?rev=788125&view=rev
Log:
UIMA-1358 Modified to stop generation of CASes

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=788125&r1=788124&r2=788125&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Wed Jun 24 18:58:35 2009
@@ -20,6 +20,7 @@
 package org.apache.uima.aae.controller;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -50,6 +51,7 @@
 import org.apache.uima.cas.impl.CASImpl;
 import org.apache.uima.cas.impl.OutOfTypeSystemData;
 import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceProcessException;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.resource.metadata.ConfigurationParameter;
 import org.apache.uima.resource.metadata.impl.ConfigurationParameter_impl;
@@ -363,10 +365,17 @@
 		
 		if ( stopped )
 		{
-			return;
+		  return;
 		}
-		CasStateEntry parentCasStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
-    long totalProcessTime = 0;  // stored total time spent producing ALL CASes
+		CasStateEntry parentCasStateEntry = null;
+		try {
+	    parentCasStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
+		} catch ( Exception e) {
+		  e.printStackTrace();
+		  return;
+		}
+
+		long totalProcessTime = 0;  // stored total time spent producing ALL CASes
 		
 		boolean inputCASReturned = false;
 		boolean processingFailed = false;
@@ -377,19 +386,16 @@
 		{
 			// Checkout an instance of AE from the pool
 			ae = aeInstancePool.checkout();
-			
 			//	Get input CAS entry from the InProcess cache
-			CacheEntry inputCASEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
 			long time = super.getCpuTime();
-			
 			CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
-			
 			//	Store how long it took to call processAndOutputNewCASes()
 			totalProcessTime = ( super.getCpuTime() - time);
 			long sequence = 1;
 			long hasNextTime = 0;         // stores time in hasNext()
 			long getNextTime = 0;         // stores time in next();   
 			boolean moreCASesToProcess = true;
+			boolean casAbortedDueToExternalRequest = false;
 			while (moreCASesToProcess)
 			{
 				long timeToProcessCAS = 0;    // stores time in hasNext() and next() for each CAS
@@ -408,11 +414,12 @@
 				CAS casProduced = casIterator.next();
 				//	Add how long it took to call next()
 				timeToProcessCAS += (super.getCpuTime()- getNextTime);
-                //	Add time to call hasNext() and next() to the running total
+        //	Add time to call hasNext() and next() to the running total
 				totalProcessTime += timeToProcessCAS;
-				
+				casAbortedDueToExternalRequest = 
+				  abortGeneratingCASes(aCasReferenceId);
 				//	If the service is stopped or aborted, stop generating new CASes and just return the input CAS
-				if ( stopped || abortGeneratingCASes(aCasReferenceId))
+				if ( stopped || casAbortedDueToExternalRequest)
 				{
 					if ( getInProcessCache() != null && getInProcessCache().getSize() > 0 && getInProcessCache().entryExists(aCasReferenceId))
 					{
@@ -423,7 +430,7 @@
 						}
 						catch( Exception e )
 						{
-							//	An exception be be thrown here if the service is being stopped.
+						  //	An exception be be thrown here if the service is being stopped.
 							//	The top level controller may have already cleaned up the cache
 							//	and the getCacheEntryForCAS() will throw an exception. Ignore it
 							//	here, we are shutting down.
@@ -436,13 +443,27 @@
 							//	as there may potentially be a problem with a Class Loader.
 							//	!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 							((CASImpl)aCAS).enableReset(true);
+							try {
+							  // We are either stopping the service or aborting input CAS due to explicit STOP request
+							  // from a client. If a new CAS was produced, release it back to the pool.
+							  if ( casProduced != null ) {
+	                casProduced.release();
+	              }
+							} catch( Exception e) {}
 			        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
 			          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stopped_producing_new_cases__INFO", new Object[] { Thread.currentThread().getId(),getComponentName(),aCasReferenceId });
 			        }
 							System.out.println(">>>> Cas Multiplier:"+getComponentName()+" Stopped Generating CASes from Input CAS:"+aCasReferenceId);
 						}
 					}
-					return;
+					if ( casAbortedDueToExternalRequest ) {
+					  // The controller was told to stop generating new CASes. Just return the input CAS to the 
+					  // client
+					  throw new ResourceProcessException(new InterruptedException("Cas Multiplier:"+getComponentName()+" Aborted CAS:"+aCasReferenceId));
+					} else {
+					  // The controller is stopping
+	          return;
+					}
 				}
 				OutOfTypeSystemData otsd = getInProcessCache().getOutOfTypeSystemData(aCasReferenceId);
 				MessageContext mContext = getInProcessCache().getMessageAccessorByReference(aCasReferenceId);
@@ -452,6 +473,7 @@
 				//  all CASes it has in play that were generated from the input CAS.
 				CasStateEntry childCasStateEntry = null;
 				if ( !isTopLevelComponent() ) {
+	        newEntry.setNewCas(true, parentController.getComponentName());
 				  //  Create CAS state entry in the aggregate's local cache
 				  childCasStateEntry = parentController.getLocalCache().createCasStateEntry(newEntry.getCasReferenceId());
 				  //  Fetch the parent CAS state entry from the aggregate's local cache. We need to increment
@@ -471,7 +493,7 @@
 				//	Add to the cache how long it took to process the generated (subordinate) CAS
 				getCasStatistics(newEntry.getCasReferenceId()).incrementAnalysisTime(timeToProcessCAS);
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_produced_new_cas__FINE", new Object[] { Thread.currentThread().getName(),getComponentName(),newEntry.getCasReferenceId(), aCasReferenceId });
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_produced_new_cas__FINE", new Object[] { Thread.currentThread().getName(),getUimaContextAdmin().getQualifiedContextName(),newEntry.getCasReferenceId(), aCasReferenceId });
         }
 				//	Add the generated CAS to the outstanding CAS Map. Client notification will release
 				//	this CAS back to its pool
@@ -504,17 +526,24 @@
               message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
               long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process); 
               message.addLongProperty(AsynchAEMessage.IdleTime, iT );
-		          transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
+              if ( !stopped ) {
+                transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
+              }
 	      }
 	      else
 	      {
 	          //  Send generated CAS to the client
-	          getOutputChannel().sendReply(newEntry, anEndpoint);
+          if ( !stopped ) {
+            getOutputChannel().sendReply(newEntry, anEndpoint);
+          }
 	      }
         // Remove the new CAS state entry from the local cache if this a top level primitive.
 	      // If not top level, the client (an Aggregate) will remove this entry when this new
 	      // generated CAS reaches Final State.
 	      if ( isTopLevelComponent() ) {
+	        try {
+	          localCache.lookupEntry(newEntry.getCasReferenceId()).setDropped(true);
+	        } catch( Exception e) {}
 	        localCache.remove(newEntry.getCasReferenceId());
 	      }
 				
@@ -531,7 +560,7 @@
 			// Set total number of children generated from this CAS
 			// Store total time spent processing this input CAS
 			getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
-      if ( !anEndpoint.isRemote())
+      if ( !anEndpoint.isRemote() )
       {
           inputCASReturned = true;
           UimaTransport transport = getTransport(anEndpoint.getEndpoint());
@@ -548,15 +577,20 @@
           long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process); 
           message.addLongProperty(AsynchAEMessage.IdleTime, iT );
           //  Send reply back to the client. Use internal (non-jms) transport
-          transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
+          if ( !stopped ) {
+            transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
+          }
       }
       else
       {
+        if ( !stopped ) {
           getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
+        }
         inputCASReturned = true;
 			}
       //  Remove input CAS state entry from the local cache
       if ( !isTopLevelComponent() ) {
+        localCache.lookupEntry(aCasReferenceId).setDropped(true);
         localCache.remove(aCasReferenceId);
       }
 		}
@@ -736,6 +770,26 @@
 		}
 		if ( cmOutstandingCASes != null )
 		{
+		  if ( !cmOutstandingCASes.isEmpty()) {
+		    //  If there are outstanding CASes, force them to be released 
+		    //  If the CM is blocking on getCAS() this will unblock it and
+		    //  enable termination. Otherwise, a hang may occur
+		    Iterator<String> it = cmOutstandingCASes.keySet().iterator();
+		    while( it.hasNext()) {
+		      String casId = it.next();
+		      try {
+		        CacheEntry entry = getInProcessCache().getCacheEntryForCAS(casId);
+		        if ( entry != null && entry.getCas() != null ) {
+		          System.out.println("Primitive:"+getComponentName()+" Forcing Release of CAS:"+casId+" in stop()");
+		          //  Force CAS release to unblock CM thread
+		          entry.getCas().release();
+		        }
+		      } catch ( Exception e) {
+		        
+		      }
+		    }
+		    
+		  }
 			cmOutstandingCASes.clear();
 		}
 		if ( aeList != null )