You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/10/23 17:53:21 UTC

svn commit: r707401 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main: java/org/apache/uima/aae/ java/org/apache/uima/aae/controller/ resources/

Author: eae
Date: Thu Oct 23 08:53:21 2008
New Revision: 707401

URL: http://svn.apache.org/viewvc?rev=707401&view=rev
Log:
UIMA-1199 commit uimaj-as-core-UIMA-1199-patch-03.txt and uimaj-as-activemq-UIMA-1199-patch-03.txt

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=707401&r1=707400&r2=707401&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Thu Oct 23 08:53:21 2008
@@ -655,6 +655,14 @@
 		
 		private boolean sentDeltaCas = false;
 
+		//  list containing delegates that must be called sequentially. This list
+		//  is added to the cache if there are collocated delegates in a parallel
+		//  step. Only remote delegates can be part of the parallel step. Any 
+		//  collocated delegates are removed from the parallel step added to the
+		//  list. The delegates in this list will be called sequentially when
+		//  all delegates in parallel step respond.
+		private List delayedSingleStepList = null;
+		
 		protected CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor, OutOfTypeSystemData aotsd)
 		{
 			this(aCas, aCasReferenceId, aMessageAccessor);
@@ -1037,6 +1045,13 @@
 			return this.marker;
 		}
 
+		public void setDelayedSingleStepList( List aList) {
+		  delayedSingleStepList = aList;
+		}
+		
+		public List getDelayedSingleStepList() {
+		  return delayedSingleStepList;
+		}
 	}	
 
 

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=707401&r1=707400&r2=707401&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu Oct 23 08:53:21 2008
@@ -62,6 +62,7 @@
 import org.apache.uima.flow.ParallelStep;
 import org.apache.uima.flow.SimpleStep;
 import org.apache.uima.flow.Step;
+import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
 import org.apache.uima.resource.metadata.ResourceMetaData;
 import org.apache.uima.util.Level;
@@ -607,6 +608,12 @@
 		}
 
 	}
+	
+	public void handleInitializationError(Exception ex) {
+    // Any problems in completeInitialization() is a reason to stop
+    notifyListenersWithInitializationStatus(ex);
+    super.stop();
+	}
 
 	public void disableDelegates(List aDelegateList) throws AsynchAEException
 	{
@@ -663,7 +670,12 @@
 			}
 			if (!initialized && allTypeSystemsMerged() )
 			{
-				completeInitialization();
+			  try {
+	        completeInitialization();
+			  } catch ( ResourceInitializationException ex) {
+			    handleInitializationError(ex);
+			    return;
+			  }
 			}
 		}
 		catch ( Exception e)
@@ -810,8 +822,51 @@
 	 * This is a process method that is executed for CASes not created by a Multiplier in this aggregate.
 	 * 
 	 */
-	public void process(CAS aCAS, String aCasReferenceId)// throws AnalysisEngineProcessException, AsynchAEException
+	public void process(CAS aCAS, String aCasReferenceId)
 	{
+	  boolean handlingDelayedStep = false;
+	  // First check if there are outstanding steps to be called before consulting the Flow Controller.
+	  // This could be the case if a previous step was a parallel step and it contained collocated
+	  // delegates.
+    if ( !isStopped() ) {
+      try {
+        CacheEntry entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+        // if we are here entry is not null. The above throws an exception if an entry is not
+        //  found in the cache. First check if there is a delayedSingleStepList in the cache.
+        //  If there is one, it means that a parallel step contained collocated delegate(s)
+        //  The parallel step may only contain remote delegates. All collocated delegates
+        //  were removed from the parallel step and added to the delayedSingleStepList in
+        //  parallelStep() method.
+        List delayedSingleStepList = entry.getDelayedSingleStepList(); 
+        if ( delayedSingleStepList != null && delayedSingleStepList.size() > 0)
+        {
+          handlingDelayedStep = true;
+          //  Reset number of parallel delegates back to one. This is done only if the previous step 
+          //  was a parallel step.
+          synchronized(parallelStepMux)
+          {
+            if ( entry.getNumberOfParallelDelegates() > 1)
+            {
+              entry.setNumberOfParallelDelegates(1);
+            }
+          }
+          //  Remove a delegate endpoint from the single step list cached in the CAS entry
+          Endpoint endpoint = (Endpoint_impl) entry.getDelayedSingleStepList().remove(0);
+          //  send the CAS to a collocated delegate from the delayed single step list.
+          dispatchProcessRequest(aCasReferenceId, endpoint, true);
+        }
+      } catch ( Exception e) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });        
+        e.printStackTrace();
+      }
+      finally {
+        //  If just handled the delayed step, return as there is nothing else to do
+        if ( handlingDelayedStep ) {
+          return; 
+        }
+      }
+    }
+	  
 		FlowContainer flow = null;
 		try
 		{
@@ -935,24 +990,69 @@
 		{
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
 	                "parallelStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_parallel_step__FINE",
-	                new Object[] {getName(), aCasReferenceId });
+	                new Object[] {getComponentName(), aCasReferenceId });
 			Collection keyList = aStep.getAnalysisEngineKeys();
 			String[] analysisEngineKeys = new String[keyList.size()];
-			keyList.toArray(analysisEngineKeys);
-			CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-			synchronized(parallelStepMux)
-			{
-				cacheEntry.resetDelegateResponded();
-				cacheEntry.setNumberOfParallelDelegates(analysisEngineKeys.length);
-			}
-
-			Endpoint[] endpoints = new Endpoint_impl[analysisEngineKeys.length];
-			for (int i = 0; i < analysisEngineKeys.length; i++)
-			{
-				endpoints[i] = lookUpEndpoint(analysisEngineKeys[i], true);
-				endpoints[i].setController(this);
-			}
-			dispatchProcessRequest(aCasReferenceId, endpoints, true);
+      keyList.toArray(analysisEngineKeys);
+			List parallelDelegateList = new ArrayList();
+      List singleStepDelegateList = null;
+      //  Only remote delegates can be in a parallel step. Iterate over the 
+      //  delegates in parallel step and assign each to a different list based on location.
+      //  Remote delegates are assigned to parallelDelegateList, whereas co-located
+      //  delegates are assigned to singleStepDelegateList. Those delegates
+      //  assigned to the singleStepDelegateList will be executed sequentially
+      //  once all parallel delegates respond.
+      for (int i = 0; i < analysisEngineKeys.length; i++)
+      {
+        //  Fetch an endpoint corresponding to a given delegate key
+        Endpoint endpoint = lookUpEndpoint(analysisEngineKeys[i], true);
+        endpoint.setController(this);
+        //  Assign delegate to appropriate list
+        if ( endpoint.isRemote() ) {
+          parallelDelegateList.add(endpoint);
+        } else {
+          if ( singleStepDelegateList == null ) {
+            singleStepDelegateList = new ArrayList();
+          }
+          singleStepDelegateList.add(endpoint);
+          if ( UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                    "parallelStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_move_to_single_step_list__FINE",
+                    new Object[] {getComponentName(), analysisEngineKeys[i], aCasReferenceId });
+          }
+        }
+      }
+      //  Fetch cache entry for a given CAS id
+      CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+      //  Add all co-located delegates to the cache. These delegates will be called 
+      //  sequentially once all parallel delegates respond
+      if ( singleStepDelegateList != null ) {
+        //  Add a list containing single step delegates to the cache
+        //  These delegates will be called sequentially when all parallel
+        //  delegates respond.
+        cacheEntry.setDelayedSingleStepList(singleStepDelegateList);
+      }
+      //  Check if there are any delegates in the parallel step. It is possible that
+      //  all of the delegates were co-located and thus the parallel delegate list
+      //  is empty.
+      if ( parallelDelegateList.size() > 0 ) {
+        //  Create endpoint array to contain as many slots as there are parallel delegates
+        Endpoint[] endpoints = new Endpoint_impl[parallelDelegateList.size()];
+        //  Copy parallel delegate endpoints to the array
+        parallelDelegateList.toArray(endpoints);
+        synchronized(parallelStepMux)
+        {
+          cacheEntry.resetDelegateResponded();
+          //  Set number of delegates in the parallel step
+          cacheEntry.setNumberOfParallelDelegates(endpoints.length);
+        }
+        //  Dispatch CAS to remote parallel delegates
+        dispatchProcessRequest(aCasReferenceId, endpoints, true);
+      } else {
+        //  All delegates in a parallel step are co-located. Send the CAS 
+        //  to the first delegate in the single step list.
+        process( null, aCasReferenceId);
+      }
 		}
 		catch ( Exception e)
 		{
@@ -961,7 +1061,6 @@
 			map.put(AsynchAEMessage.CasReference, aCasReferenceId);
 			handleError(map, e);
 		}
-			
 	}
 
 	public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException
@@ -1409,13 +1508,16 @@
 		catch( Exception e)
 		{
 			//	Any error here is automatic termination
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
 			try
 			{
 				getInProcessCache().destroy();
 				handleAction(ErrorHandler.TERMINATE, null, null);
 			}
-			catch( Exception ex){ex.printStackTrace();}
+			catch( Exception ex) {
+	      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+			  ex.printStackTrace();
+			}
 			return;
 		}
 
@@ -1428,19 +1530,6 @@
 			}
 			else if (step instanceof ParallelStep)
 			{
-
-				Collection keyList = ((ParallelStep) step).getAnalysisEngineKeys();
-				String[] analysisEngineKeys = new String[keyList.size()];
-				keyList.toArray(analysisEngineKeys);
-
-				String aeKeys = "";
-				Iterator it = keyList.iterator();
-				while (it.hasNext())
-				{
-					aeKeys += "::" + (String) it.next();
-
-				}
-
 				parallelStep((ParallelStep) step, aCasReferenceId);
 			}
 			else if (step instanceof FinalStep)
@@ -1775,7 +1864,12 @@
 						}
 						if ( !isStopped() )
 						{
-							completeInitialization();
+						  try {
+	              completeInitialization();
+						  } catch ( ResourceInitializationException ex) {
+			          handleInitializationError(ex);
+			          return;
+						  }
 						}
 					}
 				}

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=707401&r1=707400&r2=707401&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Thu Oct 23 08:53:21 2008
@@ -190,6 +190,7 @@
   protected ConcurrentHashMap<String, UimaMessageListener> messageListeners
   = new ConcurrentHashMap<String, UimaMessageListener>();
   
+  private Exception initException = null;
 //	protected UimaTransport transport = new VmTransport();
 	
 	public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache) throws Exception
@@ -1833,9 +1834,11 @@
 	   */
 	  public void addControllerCallbackListener(ControllerCallbackListener aListener)
 	  {
+	      
 		    controllerListeners.add(aListener); 
-		    if ( serviceInitialized )
-		    {
+		    if ( initException != null ) {
+		      notifyListenersWithInitializationStatus(initException);
+		    } else if ( serviceInitialized )  {
           notifyListenersWithInitializationStatus(null);
 		    }
 	  }
@@ -1853,7 +1856,8 @@
 
 	  public void notifyListenersWithInitializationStatus(Exception e)
 	  {
-      if ( controllerListeners.isEmpty())
+	    initException = e;
+	    if ( controllerListeners.isEmpty())
       {
         return;
       }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=707401&r1=707400&r2=707401&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties Thu Oct 23 08:53:21 2008
@@ -168,3 +168,4 @@
 UIMAEE_input_cas_invalid__INFO = Controller: {0} Received Invalid Request from Cas Multiplier {1} Containing Cas Id: {2}. The Parent Cas Id Is Missing. 
 UIMAEE_show_remote_delegate_serialization_INFO = >>> Controller: {0} Configured To Serialize CASes To Remote Delegate: {1} Using {2} Serialization
 UIMAEE_delegate_in_parallel_step_not_remote_WARNING = >>> Controller: {0} Delegate: {1} Not Remote But Defined In Parallel Step. Only Remote Delegates Can Be In Parallel Step.
+UIMAEE_move_to_single_step_list__FINE = Controller: {0} Moved Delegate: {1} To Single Step List From Parallel Step List. The Delegate is Co-located. Only Remote Delegates Can Be in Parallel Step. CAS id: {2}