You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/05/07 23:14:58 UTC
svn commit: r772775 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Author: cwiklik
Date: Thu May 7 21:14:57 2009
New Revision: 772775
URL: http://svn.apache.org/viewvc?rev=772775&view=rev
Log:
UIMA-1343 Synchronizes call to the Flow Controller's collectionProcessComplete()
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=772775&r1=772774&r2=772775&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu May 7 21:14:57 2009
@@ -132,7 +132,9 @@
public final Object parallelStepMux = new Object();
-
+ // prevents more than one thread to call collectionProcessComplete on the FC
+ private volatile boolean doSendCpcReply = false;
+
/**
*
* @param anEndpointName
@@ -376,6 +378,19 @@
}
/**
+ * Change the CPC status for each delegate in the destination Map.
+ */
+ private void resetEndpointsCpcStatus() {
+ Set set = destinationMap.entrySet();
+ for( Iterator it = set.iterator(); it.hasNext();) {
+ Map.Entry entry = (Map.Entry)it.next();
+ Endpoint endpoint = (Endpoint)entry.getValue();
+ if ( endpoint != null && endpoint.getStatus() == Endpoint.OK ) {
+ endpoint.setCompletedProcessingCollection(false);
+ }
+ }
+ }
+ /**
*
* @return
*/
@@ -392,7 +407,9 @@
return false;
}
}
-
+ // All delegates replied to CPC, change the status of each delegate
+ // to handle next CPC request.
+ resetEndpointsCpcStatus();
return true;
}
public Map getDelegateStats()
@@ -425,12 +442,24 @@
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_recvd_cpc_reply__FINE", new Object[] { key });
}
Endpoint cEndpoint = null;
- if (sendReply && allDelegatesCompletedCollection() && (( cEndpoint = getClientEndpoint()) != null) )
- {
- if ( flowControllerContainer != null ) {
- flowControllerContainer.collectionProcessComplete();
+ // synchronized to prevent more than one thread to call collectionProcessComplete() on
+ // the Flow Controller.
+ synchronized(flowControllerContainer) {
+ if ( doSendCpcReply == false &&
+ sendReply &&
+ allDelegatesCompletedCollection() &&
+ (( cEndpoint = getClientEndpoint()) != null) ) {
+ doSendCpcReply = true;
+ if ( flowControllerContainer != null ) {
+ flowControllerContainer.collectionProcessComplete();
+ }
}
- sendCpcReply(cEndpoint);
+ }
+ // Reply to a client once for each CPC request. doSendCpcReply is volatile thus
+ // no need to synchronize it
+ if ( doSendCpcReply) {
+ sendCpcReply(cEndpoint);
+ doSendCpcReply = false; //reset for the next CPC
}
}
catch ( Exception e)
@@ -644,6 +673,7 @@
}
public void handleInitializationError(Exception ex) {
+ ex.printStackTrace();
// Any problems in completeInitialization() is a reason to stop
notifyListenersWithInitializationStatus(ex);
super.stop();