You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/05/07 23:14:58 UTC

svn commit: r772775 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java

Author: cwiklik
Date: Thu May  7 21:14:57 2009
New Revision: 772775

URL: http://svn.apache.org/viewvc?rev=772775&view=rev
Log:
UIMA-1343 Synchronizes call to the Flow Controller's collectionProcessComplete()

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=772775&r1=772774&r2=772775&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu May  7 21:14:57 2009
@@ -132,7 +132,9 @@
 	
 	public final Object parallelStepMux = new Object();
 	
-	
+	// prevents more than one thread to call collectionProcessComplete on the FC
+	private volatile boolean doSendCpcReply = false;
+
 	/**
 	 * 
 	 * @param anEndpointName
@@ -376,6 +378,19 @@
 
 	}
 	/**
+	 * Change the CPC status for each delegate in the destination Map.
+	 */
+	private void resetEndpointsCpcStatus() {
+    Set set = destinationMap.entrySet();
+    for( Iterator it = set.iterator(); it.hasNext();) {
+      Map.Entry entry = (Map.Entry)it.next();
+      Endpoint endpoint = (Endpoint)entry.getValue();
+      if ( endpoint != null && endpoint.getStatus() == Endpoint.OK ) {
+        endpoint.setCompletedProcessingCollection(false);
+      }
+    }
+	}
+	/**
 	 * 
 	 * @return
 	 */
@@ -392,7 +407,9 @@
 				return false;
 			}
 		}
-
+		//  All delegates replied to CPC, change the status of each delegate
+		//  to handle next CPC request.
+		resetEndpointsCpcStatus();
 		return true;
 	}
 	public Map getDelegateStats()
@@ -425,12 +442,24 @@
 					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_recvd_cpc_reply__FINE", new Object[] { key });
       }
 			Endpoint cEndpoint = null;
-			if (sendReply && allDelegatesCompletedCollection() && (( cEndpoint = getClientEndpoint()) != null) )
-			{
-			  if ( flowControllerContainer != null  ) {
-			    flowControllerContainer.collectionProcessComplete();
+			// synchronized to prevent more than one thread to call collectionProcessComplete() on
+			// the Flow Controller. 
+			synchronized(flowControllerContainer) {
+			  if ( doSendCpcReply == false &&
+			          sendReply && 
+			          allDelegatesCompletedCollection() && 
+			          (( cEndpoint = getClientEndpoint()) != null) ) {
+			    doSendCpcReply = true;
+			    if ( flowControllerContainer != null  ) {
+			      flowControllerContainer.collectionProcessComplete();
+			    }
 			  }
-				sendCpcReply(cEndpoint);
+			}
+			// Reply to a client once for each CPC request. doSendCpcReply is volatile thus
+			// no need to synchronize it
+			if ( doSendCpcReply) {
+        sendCpcReply(cEndpoint);
+        doSendCpcReply = false; //reset for the next CPC
 			}
 		}
 		catch ( Exception e)
@@ -644,6 +673,7 @@
 	}
 	
 	public void handleInitializationError(Exception ex) {
+	  ex.printStackTrace();
     // Any problems in completeInitialization() is a reason to stop
     notifyListenersWithInitializationStatus(ex);
     super.stop();