You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/10/23 17:53:21 UTC
svn commit: r707401 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main:
java/org/apache/uima/aae/ java/org/apache/uima/aae/controller/ resources/
Author: eae
Date: Thu Oct 23 08:53:21 2008
New Revision: 707401
URL: http://svn.apache.org/viewvc?rev=707401&view=rev
Log:
UIMA-1199 commit uimaj-as-core-UIMA-1199-patch-03.txt and uimaj-as-activemq-UIMA-1199-patch-03.txt
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=707401&r1=707400&r2=707401&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Thu Oct 23 08:53:21 2008
@@ -655,6 +655,14 @@
private boolean sentDeltaCas = false;
+ // list containing delegates that must be called sequentially. This list
+ // is added to the cache if there are collocated delegates in a parallel
+ // step. Only remote delegates can be part of the parallel step. Any
+ // collocated delegates are removed from the parallel step added to the
+ // list. The delegates in this list will be called sequentially when
+ // all delegates in parallel step respond.
+ private List delayedSingleStepList = null;
+
protected CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor, OutOfTypeSystemData aotsd)
{
this(aCas, aCasReferenceId, aMessageAccessor);
@@ -1037,6 +1045,13 @@
return this.marker;
}
+ public void setDelayedSingleStepList( List aList) {
+ delayedSingleStepList = aList;
+ }
+
+ public List getDelayedSingleStepList() {
+ return delayedSingleStepList;
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=707401&r1=707400&r2=707401&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu Oct 23 08:53:21 2008
@@ -62,6 +62,7 @@
import org.apache.uima.flow.ParallelStep;
import org.apache.uima.flow.SimpleStep;
import org.apache.uima.flow.Step;
+import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.Level;
@@ -607,6 +608,12 @@
}
}
+
+ public void handleInitializationError(Exception ex) {
+ // Any problems in completeInitialization() is a reason to stop
+ notifyListenersWithInitializationStatus(ex);
+ super.stop();
+ }
public void disableDelegates(List aDelegateList) throws AsynchAEException
{
@@ -663,7 +670,12 @@
}
if (!initialized && allTypeSystemsMerged() )
{
- completeInitialization();
+ try {
+ completeInitialization();
+ } catch ( ResourceInitializationException ex) {
+ handleInitializationError(ex);
+ return;
+ }
}
}
catch ( Exception e)
@@ -810,8 +822,51 @@
* This is a process method that is executed for CASes not created by a Multiplier in this aggregate.
*
*/
- public void process(CAS aCAS, String aCasReferenceId)// throws AnalysisEngineProcessException, AsynchAEException
+ public void process(CAS aCAS, String aCasReferenceId)
{
+ boolean handlingDelayedStep = false;
+ // First check if there are outstanding steps to be called before consulting the Flow Controller.
+ // This could be the case if a previous step was a parallel step and it contained collocated
+ // delegates.
+ if ( !isStopped() ) {
+ try {
+ CacheEntry entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ // if we are here entry is not null. The above throws an exception if an entry is not
+ // found in the cache. First check if there is a delayedSingleStepList in the cache.
+ // If there is one, it means that a parallel step contained collocated delegate(s)
+ // The parallel step may only contain remote delegates. All collocated delegates
+ // were removed from the parallel step and added to the delayedSingleStepList in
+ // parallelStep() method.
+ List delayedSingleStepList = entry.getDelayedSingleStepList();
+ if ( delayedSingleStepList != null && delayedSingleStepList.size() > 0)
+ {
+ handlingDelayedStep = true;
+ // Reset number of parallel delegates back to one. This is done only if the previous step
+ // was a parallel step.
+ synchronized(parallelStepMux)
+ {
+ if ( entry.getNumberOfParallelDelegates() > 1)
+ {
+ entry.setNumberOfParallelDelegates(1);
+ }
+ }
+ // Remove a delegate endpoint from the single step list cached in the CAS entry
+ Endpoint endpoint = (Endpoint_impl) entry.getDelayedSingleStepList().remove(0);
+ // send the CAS to a collocated delegate from the delayed single step list.
+ dispatchProcessRequest(aCasReferenceId, endpoint, true);
+ }
+ } catch ( Exception e) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ e.printStackTrace();
+ }
+ finally {
+ // If just handled the delayed step, return as there is nothing else to do
+ if ( handlingDelayedStep ) {
+ return;
+ }
+ }
+ }
+
FlowContainer flow = null;
try
{
@@ -935,24 +990,69 @@
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"parallelStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_parallel_step__FINE",
- new Object[] {getName(), aCasReferenceId });
+ new Object[] {getComponentName(), aCasReferenceId });
Collection keyList = aStep.getAnalysisEngineKeys();
String[] analysisEngineKeys = new String[keyList.size()];
- keyList.toArray(analysisEngineKeys);
- CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
- synchronized(parallelStepMux)
- {
- cacheEntry.resetDelegateResponded();
- cacheEntry.setNumberOfParallelDelegates(analysisEngineKeys.length);
- }
-
- Endpoint[] endpoints = new Endpoint_impl[analysisEngineKeys.length];
- for (int i = 0; i < analysisEngineKeys.length; i++)
- {
- endpoints[i] = lookUpEndpoint(analysisEngineKeys[i], true);
- endpoints[i].setController(this);
- }
- dispatchProcessRequest(aCasReferenceId, endpoints, true);
+ keyList.toArray(analysisEngineKeys);
+ List parallelDelegateList = new ArrayList();
+ List singleStepDelegateList = null;
+ // Only remote delegates can be in a parallel step. Iterate over the
+ // delegates in parallel step and assign each to a different list based on location.
+ // Remote delegates are assigned to parallelDelegateList, whereas co-located
+ // delegates are assigned to singleStepDelegateList. Those delegates
+ // assigned to the singleStepDelegateList will be executed sequentially
+ // once all parallel delegates respond.
+ for (int i = 0; i < analysisEngineKeys.length; i++)
+ {
+ // Fetch an endpoint corresponding to a given delegate key
+ Endpoint endpoint = lookUpEndpoint(analysisEngineKeys[i], true);
+ endpoint.setController(this);
+ // Assign delegate to appropriate list
+ if ( endpoint.isRemote() ) {
+ parallelDelegateList.add(endpoint);
+ } else {
+ if ( singleStepDelegateList == null ) {
+ singleStepDelegateList = new ArrayList();
+ }
+ singleStepDelegateList.add(endpoint);
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "parallelStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_move_to_single_step_list__FINE",
+ new Object[] {getComponentName(), analysisEngineKeys[i], aCasReferenceId });
+ }
+ }
+ }
+ // Fetch cache entry for a given CAS id
+ CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ // Add all co-located delegates to the cache. These delegates will be called
+ // sequentially once all parallel delegates respond
+ if ( singleStepDelegateList != null ) {
+ // Add a list containing single step delegates to the cache
+ // These delegates will be called sequentially when all parallel
+ // delegates respond.
+ cacheEntry.setDelayedSingleStepList(singleStepDelegateList);
+ }
+ // Check if there are any delegates in the parallel step. It is possible that
+ // all of the delegates were co-located and thus the parallel delegate list
+ // is empty.
+ if ( parallelDelegateList.size() > 0 ) {
+ // Create endpoint array to contain as many slots as there are parallel delegates
+ Endpoint[] endpoints = new Endpoint_impl[parallelDelegateList.size()];
+ // Copy parallel delegate endpoints to the array
+ parallelDelegateList.toArray(endpoints);
+ synchronized(parallelStepMux)
+ {
+ cacheEntry.resetDelegateResponded();
+ // Set number of delegates in the parallel step
+ cacheEntry.setNumberOfParallelDelegates(endpoints.length);
+ }
+ // Dispatch CAS to remote parallel delegates
+ dispatchProcessRequest(aCasReferenceId, endpoints, true);
+ } else {
+ // All delegates in a parallel step are co-located. Send the CAS
+ // to the first delegate in the single step list.
+ process( null, aCasReferenceId);
+ }
}
catch ( Exception e)
{
@@ -961,7 +1061,6 @@
map.put(AsynchAEMessage.CasReference, aCasReferenceId);
handleError(map, e);
}
-
}
public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException
@@ -1409,13 +1508,16 @@
catch( Exception e)
{
// Any error here is automatic termination
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
try
{
getInProcessCache().destroy();
handleAction(ErrorHandler.TERMINATE, null, null);
}
- catch( Exception ex){ex.printStackTrace();}
+ catch( Exception ex) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ ex.printStackTrace();
+ }
return;
}
@@ -1428,19 +1530,6 @@
}
else if (step instanceof ParallelStep)
{
-
- Collection keyList = ((ParallelStep) step).getAnalysisEngineKeys();
- String[] analysisEngineKeys = new String[keyList.size()];
- keyList.toArray(analysisEngineKeys);
-
- String aeKeys = "";
- Iterator it = keyList.iterator();
- while (it.hasNext())
- {
- aeKeys += "::" + (String) it.next();
-
- }
-
parallelStep((ParallelStep) step, aCasReferenceId);
}
else if (step instanceof FinalStep)
@@ -1775,7 +1864,12 @@
}
if ( !isStopped() )
{
- completeInitialization();
+ try {
+ completeInitialization();
+ } catch ( ResourceInitializationException ex) {
+ handleInitializationError(ex);
+ return;
+ }
}
}
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=707401&r1=707400&r2=707401&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Thu Oct 23 08:53:21 2008
@@ -190,6 +190,7 @@
protected ConcurrentHashMap<String, UimaMessageListener> messageListeners
= new ConcurrentHashMap<String, UimaMessageListener>();
+ private Exception initException = null;
// protected UimaTransport transport = new VmTransport();
public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache) throws Exception
@@ -1833,9 +1834,11 @@
*/
public void addControllerCallbackListener(ControllerCallbackListener aListener)
{
+
controllerListeners.add(aListener);
- if ( serviceInitialized )
- {
+ if ( initException != null ) {
+ notifyListenersWithInitializationStatus(initException);
+ } else if ( serviceInitialized ) {
notifyListenersWithInitializationStatus(null);
}
}
@@ -1853,7 +1856,8 @@
public void notifyListenersWithInitializationStatus(Exception e)
{
- if ( controllerListeners.isEmpty())
+ initException = e;
+ if ( controllerListeners.isEmpty())
{
return;
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=707401&r1=707400&r2=707401&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties Thu Oct 23 08:53:21 2008
@@ -168,3 +168,4 @@
UIMAEE_input_cas_invalid__INFO = Controller: {0} Received Invalid Request from Cas Multiplier {1} Containing Cas Id: {2}. The Parent Cas Id Is Missing.
UIMAEE_show_remote_delegate_serialization_INFO = >>> Controller: {0} Configured To Serialize CASes To Remote Delegate: {1} Using {2} Serialization
UIMAEE_delegate_in_parallel_step_not_remote_WARNING = >>> Controller: {0} Delegate: {1} Not Remote But Defined In Parallel Step. Only Remote Delegates Can Be In Parallel Step.
+UIMAEE_move_to_single_step_list__FINE = Controller: {0} Moved Delegate: {1} To Single Step List From Parallel Step List. The Delegate is Co-located. Only Remote Delegates Can Be in Parallel Step. CAS id: {2}