You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/16 19:06:24 UTC

svn commit: r1071337 - /uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java

Author: cwiklik
Date: Wed Feb 16 18:06:24 2011
New Revision: 1071337

URL: http://svn.apache.org/viewvc?rev=1071337&view=rev
Log:
UIMA-2055 Modified to work with new/refactored JmsOutputChannel

Modified:
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1071337&r1=1071336&r2=1071337&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Wed Feb 16 18:06:24 2011
@@ -481,7 +481,7 @@ public class AggregateAnalysisEngineCont
       // Send reply back to the client. Use internal (non-jms) transport
       transport.getUimaMessageDispatcher(aClientEndpoint.getEndpoint()).dispatch(message);
     } else {
-      getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint);
+      getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint, null, false);
     }
 
     clearStats();
@@ -610,7 +610,7 @@ public class AggregateAnalysisEngineCont
               }
             }
           } else {
-            getOutputChannel().sendRequest(AsynchAEMessage.CollectionProcessComplete, endpoint);
+            getOutputChannel().sendRequest(AsynchAEMessage.CollectionProcessComplete, null, endpoint);
             endpoint.startCollectionProcessCompleteTimer();
           }
         }
@@ -1008,8 +1008,12 @@ public class AggregateAnalysisEngineCont
             // If the delegate CM is a remote, send a Free CAS notification
             if (delegateCM.getEndpoint().isRemote()) {
               parentCasStateEntry.getFreeCasNotificationEndpoint().setCommand(AsynchAEMessage.Stop);
+              Endpoint fcEndpoint = parentCasStateEntry.getFreeCasNotificationEndpoint();
+              fcEndpoint.setReplyEndpoint(true);
+              fcEndpoint.setIsCasMultiplier(true);
+              fcEndpoint.setFreeCasEndpoint(true);
               getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(),
-                      parentCasStateEntry.getFreeCasNotificationEndpoint());
+                      fcEndpoint);
             }
             // Check if a request to stop generation of new CASes from the parent of
             // this CAS has been sent to the CM. The Delegate object keeps track of
@@ -1136,7 +1140,7 @@ public class AggregateAnalysisEngineCont
           // Remove a delegate endpoint from the single step list cached in the CAS entry
           Endpoint endpoint = (Endpoint_impl) entry.getDelayedSingleStepList().remove(0);
           // send the CAS to a collocated delegate from the delayed single step list.
-          dispatchProcessRequest(aCasReferenceId, endpoint, true);
+          dispatchProcessRequest(entry, endpoint, true);
         }
       } catch (Exception e) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -1244,13 +1248,13 @@ public class AggregateAnalysisEngineCont
       String analysisEngineKey = aStep.getAnalysisEngineKey();
       // Find the endpoint for the delegate
       endpoint = lookUpEndpoint(analysisEngineKey, true);
+      CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
       if (endpoint != null) {
         endpoint.setController(this);
         CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
         casStateEntry.resetReplyReceived();
         if (enableCasLogMap!=null && enableCasLogMap.containsKey(analysisEngineKey)) {
           //  Get a CAS
-          CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
           CAS cas = cacheEntry.getCas();
           logCasForEndpoint(analysisEngineKey, cas);
         }
@@ -1287,7 +1291,7 @@ public class AggregateAnalysisEngineCont
                     "UIMAEE_next_step_dispatch__FINEST",
                     new Object[] { getComponentName(), analysisEngineKey, aCasReferenceId  });
           }
-          dispatchProcessRequest(aCasReferenceId, endpoint, true);
+          dispatchProcessRequest(cacheEntry, endpoint, true);
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                     "simpleStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -1431,7 +1435,7 @@ public class AggregateAnalysisEngineCont
           casStateEntry.setNumberOfParallelDelegates(endpoints.length);
         }
         // Dispatch CAS to remote parallel delegates
-        dispatchProcessRequest(aCasReferenceId, endpoints, true);
+        dispatchProcessRequest(cacheEntry, endpoints, true);
       } else {
         // All delegates in a parallel step are co-located. Send the CAS
         // to the first delegate in the single step list.
@@ -1836,8 +1840,10 @@ public class AggregateAnalysisEngineCont
       }
       // freeCasEndpoint is a special endpoint for sending Free CAS Notification.
       if (casDropped && freeCasEndpoint != null) {
-        freeCasEndpoint.setReplyEndpoint(true);
         try {
+          freeCasEndpoint.setReplyEndpoint(true);
+          freeCasEndpoint.setIsCasMultiplier(true);
+          freeCasEndpoint.setFreeCasEndpoint(true);
           // send Free CAS Notification to a Cas Multiplier
           getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId,
                   freeCasEndpoint);
@@ -2343,13 +2349,13 @@ public class AggregateAnalysisEngineCont
     }
   }
 
-  private void dispatch(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
+  private void dispatch(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
     if (!anEndpoint.isRemote()) {
       try {
         UimaTransport transport = getTransport(anEndpoint.getEndpoint());
         UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
                 AsynchAEMessage.Request, getName());
-        message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+        message.addStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
         transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
 
       } catch (Exception e) {
@@ -2370,9 +2376,9 @@ public class AggregateAnalysisEngineCont
       // is in this state, delay CASes by placing them on a list of
       // CASes pending dispatch. Once the ping reply is received all
       // delayed CASes will be dispatched to the delegate.
-      if (!delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpoint.getDelegateKey())) {
+      if (!delayCasIfDelegateInTimedOutState(entry.getCasReferenceId(), anEndpoint.getDelegateKey())) {
         // The delegate is in the normal state so send it this CAS
-        getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
+        getOutputChannel().sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), anEndpoint);
       }
     }
   }
@@ -2406,13 +2412,13 @@ public class AggregateAnalysisEngineCont
     return false; // Cas Not Delayed
   }
 
-  private void dispatchProcessRequest(String aCasReferenceId, Endpoint anEndpoint,
+  private void dispatchProcessRequest(CacheEntry entry, Endpoint anEndpoint,
           boolean addEndpointToCache) throws AsynchAEException {
     if (addEndpointToCache) {
-      getInProcessCache().addEndpoint(anEndpoint, aCasReferenceId);
+      getInProcessCache().addEndpoint(anEndpoint, entry.getCasReferenceId());
     }
     anEndpoint.setController(this);
-    dispatch(aCasReferenceId, anEndpoint);
+    dispatch(entry, anEndpoint);
 
   }
 
@@ -2439,29 +2445,38 @@ public class AggregateAnalysisEngineCont
       endpoint = lookUpEndpoint(key, true);
       getInProcessCache().addEndpoint(endpoint, aCasReferenceId);
     }
-    dispatchProcessRequest(aCasReferenceId, endpoint, addEndpointToCache);
+    if ( getInProcessCache().entryExists(aCasReferenceId) ) {
+      dispatchProcessRequest(getInProcessCache().getCacheEntryForCAS(aCasReferenceId), endpoint, addEndpointToCache);
+    }
   }
 
-  private void dispatchProcessRequest(String aCasReferenceId, Endpoint[] anEndpointList,
+  private void dispatchProcessRequest(CacheEntry entry, Endpoint[] anEndpointList,
           boolean addEndpointToCache) throws AsynchAEException {
     List<Endpoint> endpointList = new ArrayList<Endpoint>();
     for (int i = 0; i < anEndpointList.length; i++) {
       // Check if the delegate previously timed out. If so, add the CAS
       // Id to the list pending dispatch. This list holds CASes that are
       // delayed until the service responds to a Ping.
-      if (delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpointList[i].getEndpoint())) {
+      if (delayCasIfDelegateInTimedOutState(entry.getCasReferenceId(), anEndpointList[i].getEndpoint())) {
         // The CAS was delayed until the delegate responds to a Ping
         continue;
       } else {
         endpointList.add(anEndpointList[i]);
       }
       if (addEndpointToCache) {
-        getInProcessCache().addEndpoint(anEndpointList[i], aCasReferenceId);
+        getInProcessCache().addEndpoint(anEndpointList[i], entry.getCasReferenceId());
       }
     }
     Endpoint[] endpoints = new Endpoint[endpointList.size()];
     endpointList.toArray(endpoints);
-    getOutputChannel().sendRequest(aCasReferenceId, endpoints);
+    for (Endpoint endpoint : endpoints) {
+      if (endpoint.isRemote()) {
+        //  In Parallel Step serialization must be xmi.Binary serialization doesnt support merge.
+        endpoint.setSerializer("xmi");
+        getOutputChannel().sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), endpoint);
+      }    
+    }
+
   }
 
   public boolean isDelegateKeyValid(String aDelegateKey) {
@@ -2813,7 +2828,6 @@ public class AggregateAnalysisEngineCont
       throw new AsynchAEException("Controller:" + getComponentName()
               + " Unable To Dispatch GetMeta Request. Provided Endpoint is Invalid (NULL)");
     }
-    anEndpoint.startMetadataRequestTimer();
     anEndpoint.setController(this);
     anEndpoint.setWaitingForResponse(true);
     String key = lookUpDelegateKey(anEndpoint.getEndpoint());
@@ -2839,7 +2853,7 @@ public class AggregateAnalysisEngineCont
 
       }
     }
-    getOutputChannel().sendRequest(AsynchAEMessage.GetMeta, anEndpoint);
+    getOutputChannel().sendRequest(AsynchAEMessage.GetMeta, null, anEndpoint);
   }
 
   public void retryMetadataRequest(Endpoint anEndpoint) throws AsynchAEException {
@@ -2915,11 +2929,7 @@ public class AggregateAnalysisEngineCont
 
   public void retryLastCommand(int aCommand, Endpoint anEndpoint, String aCasReferenceId) {
     try {
-      if (AsynchAEMessage.Process == aCommand) {
-        getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
-      } else {
-        getOutputChannel().sendRequest(aCommand, anEndpoint);
-      }
+      getOutputChannel().sendRequest(aCommand, aCasReferenceId, anEndpoint);
     } catch (AsynchAEException e) {
 
     }