You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/01/22 18:02:22 UTC

svn commit: r736711 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main: java/org/apache/uima/aae/controller/ java/org/apache/uima/aae/delegate/ java/org/apache/uima/aae/error/ java/org/apache/uima/aae/error/handler/ java/org/apache/uima/...

Author: cwiklik
Date: Thu Jan 22 09:02:21 2009
New Revision: 736711

URL: http://svn.apache.org/viewvc?rev=736711&view=rev
Log:
UIMA-1127 Modified to use GetMeta as Ping message on service Timeout

Added:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java
Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java Thu Jan 22 09:02:21 2009
@@ -114,4 +114,6 @@
 	
   public Delegate lookupDelegate( String aDelegateKey );
 
+  public boolean delayCasIfDelegateInTimedOutState( String aCasReferenceId, String aDelegateKey ) throws AsynchAEException;
+
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu Jan 22 09:02:21 2009
@@ -1777,10 +1777,44 @@
     }
     else
     {
-      getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
+    	//	Check delegate's state before sending it a CAS. The delegate
+    	//	may have previously timed out and is in a process of pinging
+    	//	the delegate to check its availability. While the delegate
+    	//	is in this state, delay CASes by placing them on a list of
+    	//	CASes pending dispatch. Once the ping reply is received all
+    	//	delayed CASes will be dispatched to the delegate.
+      if ( !delayCasIfDelegateInTimedOutState( aCasReferenceId, anEndpoint.getEndpoint() ) ) {
+				//	The delegate is in the normal state so send it this CAS
+        getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
+      }
     }
 	}
-
+	/**
+	 * Checks the state of a delegate to see if it is in TIMEOUT State.
+   * If it is, push the CAS id onto a list of CASes pending dispatch.
+   * The delegate is in a questionable state and the aggregate sends
+   * a ping message to check delegate's availability. If the delegate
+   * responds to the ping, all CASes in the pending dispatch list will
+   * be immediately dispatched.
+  **/
+  public boolean delayCasIfDelegateInTimedOutState( String aCasReferenceId, String aDelegateKey ) throws AsynchAEException {
+    Delegate delegate = lookupDelegate(aDelegateKey);
+    if (delegate != null && delegate.getState() == Delegate.TIMEOUT_STATE ) {
+      // Add CAS id to the list of delayed CASes.
+      int listSize = delegate.addCasToPendingDispatchList(aCasReferenceId);
+      // If the list was empty (before the add), send the GetMeta request
+      // as a PING to see if the delegate service is alive.
+      if ( listSize == 1 ) {
+        delegate.setAwaitingPingReply();
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "delayCasIfDelegateInTimedOutState", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_aggregate_sending_ping__FINE", new Object[] { getComponentName(), delegate.getKey() });
+        }
+        retryMetadataRequest(delegate.getEndpoint());
+      }
+      return true;
+    }
+    return false;  // Cas Not Delayed
+  }
 	private void dispatchProcessRequest(String aCasReferenceId, Endpoint anEndpoint, boolean addEndpointToCache) throws AsynchAEException
 	{
 		if (addEndpointToCache)
@@ -1823,15 +1857,26 @@
 
 	private void dispatchProcessRequest(String aCasReferenceId, Endpoint[] anEndpointList, boolean addEndpointToCache) throws AsynchAEException
 	{
+	  List<Endpoint> endpointList = new ArrayList<Endpoint>();
 		for (int i = 0; i < anEndpointList.length; i++)
 		{
+		  //  Check if the delegate previously timed out. If so, add the CAS
+		  //  Id to the list pending dispatch. This list holds CASes that are
+		  //  delayed until the service responds to a Ping.
+		  if ( delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpointList[i].getEndpoint()))  {
+		    //  The CAS was delayed until the delegate responds to a Ping
+		    continue;
+		  } else {
+		    endpointList.add(anEndpointList[i]);
+		  }
 			if (addEndpointToCache)
 			{
 				getInProcessCache().addEndpoint(anEndpointList[i], aCasReferenceId);
 			}
 		}
-
-		getOutputChannel().sendRequest(aCasReferenceId, anEndpointList);
+		Endpoint[] endpoints = new Endpoint[endpointList.size()];
+		endpointList.toArray(endpoints);
+		getOutputChannel().sendRequest(aCasReferenceId, endpoints);
 	}
 
 

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Thu Jan 22 09:02:21 2009
@@ -53,6 +53,7 @@
 import org.apache.uima.aae.error.ErrorContext;
 import org.apache.uima.aae.error.ErrorHandler;
 import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.error.MessageTimeoutException;
 import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
 import org.apache.uima.aae.jmx.JmxManagement;
@@ -1008,6 +1009,60 @@
 					}
           ((AggregateAnalysisEngineController)this).disableDelegates(list);
 
+          if ( key != null && key.trim().length() > 0) {
+            //  Delegate has been disabled. Cleanup Delegate's lists. Each Delegate
+            //  maintains a list of CASes pending reply and a different list of CASes
+            //  pending dispatch. The first list contains CASes sent to the delegate.
+            //  When a reply is received from the delegate, the CAS is removed from
+            //  the list. The second list contains CASes that have been delayed
+            //  while the service was in the TIMEDOUT state. These CASes were to 
+            //  be dispatched to the delegate once its state is reset to OK. It is
+            //  reset to OK state when the delegate responds to the client PING
+            //  request. Since we have disabled the delegate, remove ALL CASes from
+            //  both lists and send them through the ErrorHandler one at a time 
+            //  as if these CASes timed out.
+            
+            Delegate delegate = ((AggregateAnalysisEngineController)this).lookupDelegate(key);
+            //  Cancel the delegate timer. No more responses are expected 
+            delegate.cancelDelegateTimer();
+            
+            //  If the delegate has CASes pending reply still, send each CAS
+            //  from the pending list through the error handler with 
+            //  MessageTimeoutException as a cause of error
+            while ( delegate.getCasPendingReplyListSize() > 0 ) {
+              String timedOutCasId = delegate.removeOldestCasFromOutstandingList();
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                       "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_force_cas_timeout__INFO",
+                       new Object[] { getComponentName(), key, timedOutCasId, " Pending Reply List" });
+              }
+              
+              ErrorContext errorContext = new ErrorContext();
+              errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+              errorContext.add(AsynchAEMessage.CasReference, timedOutCasId);
+              errorContext.add(AsynchAEMessage.Endpoint, endpoint);
+              getErrorHandlerChain().handle(new MessageTimeoutException(), errorContext, this);
+            }
+            //  If the delegate has CASes pending dispatch, send each CAS
+            //  from the pending dispatch list through the error handler with 
+            //  MessageTimeoutException as a cause of error
+            while ( delegate.getCasPendingDispatchListSize() > 0 ) {
+              String timedOutCasId = delegate.removeOldestFromPendingDispatchList();
+
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                       "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_force_cas_timeout__INFO",
+                       new Object[] { getComponentName(), key, timedOutCasId, " Pending Dispatch List" });
+              }
+              
+              ErrorContext errorContext = new ErrorContext();
+              errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+              errorContext.add(AsynchAEMessage.CasReference, timedOutCasId);
+              errorContext.add(AsynchAEMessage.Endpoint, endpoint);
+              getErrorHandlerChain().handle(new MessageTimeoutException(), errorContext, this);
+            }
+          }
+
           if ( endpoint != null ) {
 	          try {
 	            // Fetch all Cas entries currently awaiting reply from the delegate that was just

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Thu Jan 22 09:02:21 2009
@@ -190,7 +190,9 @@
       return replyReceived;
     }
     public synchronized void incrementHowManyDelegatesResponded(){
-      howManyDelegatesResponded++;
+      if ( howManyDelegatesResponded < numberOfParallelDelegates) {
+        howManyDelegatesResponded++;
+      }
     }
     public synchronized int howManyDelegatesResponded(){
       return howManyDelegatesResponded;

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Thu Jan 22 09:02:21 2009
@@ -72,7 +72,28 @@
 
   // synchronizes access to the delegate's state
   private Object stateMux = new Object();
+
+  // List holding CASes that have been delayed due to a delegate timeout. These
+  // CASes should be send to the delegate as soon as the getMeta (Ping) is received.
+  private List<DelegateEntry> pendingDispatchList = new ArrayList<DelegateEntry>();
+
+  // synchronizes access to the list of CASes pending dispatch
+  private Object pendingDispatchListMux = new Object();
   
+  //  Flag that is set when getMeta reply is received
+  private volatile boolean awaitingPingReply;
+
+  public boolean isAwaitingPingReply() {
+    return awaitingPingReply;
+  }
+
+  public void setAwaitingPingReply() {
+    this.awaitingPingReply = true;
+  }
+
+  public void resetAwaitingPingReply() {
+    this.awaitingPingReply = false;
+  }
   /**
    * Returns delegate key
    * 
@@ -85,8 +106,8 @@
   /**
    * Sets an {@link Endpoint} object
    * 
-   * @param anEndpoint -
-   *          an endpoint object
+   * @param anEndpoint
+   *          - an endpoint object
    */
   public void setEndpoint(Endpoint anEndpoint) {
     endpoint = anEndpoint;
@@ -100,59 +121,165 @@
   public Endpoint getEndpoint() {
     return endpoint;
   }
+  /**
+   * Forces Timer restart for the oldest CAS sitting in the list
+   * of CASes pending reply.  
+   */
+  public void restartTimerForOldestCasInOutstandingList() {
+    DelegateEntry entry = null;
+    synchronized (outstandingCasListMux) {
+      if (!outstandingCasList.isEmpty()) {
+        //  Get the oldest entry
+        entry = outstandingCasList.get(0);
+        if ( entry != null ) {
+          restartTimerForCas(entry);
+        }
+      }
+    }
+  }
+  /**
+   * Restarts timer for a given CAS
+   * 
+   * @param entry
+   */
+  private void restartTimerForCas( DelegateEntry entry ) {
+    if (getCasProcessTimeout() > 0) {
+      entry.incrementRetryCount();
+      // restart timer for retry
+      startDelegateTimer(entry.getCasReferenceId(), AsynchAEMessage.Process);
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINE,
+                this.getClass().getName(),
+                "restartTimerForCas",
+                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_restart_timer_FINE",
+                new Object[] { getComponentName(), delegateKey, entry.getCasReferenceId(),
+                    getCasProcessTimeout() });
+      }
+    }    
+  }
 
+  
   /**
-   * Adds a given Cas ID to the list of CASes pending reply. A new timer will be started
-   * to handle delegate's timeout if either:
-   * 1) the list of CASes pending reply is empty AND delegate timeout > 0
-   * 2) the list already contains the CAS ID AND delegate timeout > 0. This is a retry logic. 
-   *  
-   * @param aCasReferenceId - CAS ID to add to pending list if not already there
-   *          
+   * Adds a given Cas ID to the list of CASes pending reply. A new timer will be started to handle
+   * delegate's timeout if either: 1) the list of CASes pending reply is empty AND delegate timeout
+   * > 0 2) the list already contains the CAS ID AND delegate timeout > 0. This is a retry logic.
+   * 
+   * @param aCasReferenceId
+   *          - CAS ID to add to pending list if not already there
+   * 
    */
   public void addCasToOutstandingList(String aCasReferenceId) {
     synchronized (outstandingCasListMux) {
       DelegateEntry entry = null;
-      //  Check if the outstanding list already contains entry for the Cas Id. If it does, retry logic 
-      //  is calling this method. Increment number of retries and restart the timer.
-      if ( !outstandingCasList.isEmpty() && (entry = lookupEntry(aCasReferenceId)) != null) {
-        entry.incrementRetryCount();
-        if ( getCasProcessTimeout() > 0 ) {
-          //  restart timer for retry
-          startDelegateTimer(aCasReferenceId, AsynchAEMessage.Process);
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
-                    "addCasToOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_restart_timer_FINE",
-                    new Object[] { getComponentName(), delegateKey, aCasReferenceId, getCasProcessTimeout() });
-          }
-        }
-      } else { 
-        //  Create a new entry to be stored in the list of CASes pending reply
+      // Check if the outstanding list already contains entry for the Cas Id. If it does, retry
+      // logic
+      // is calling this method. Increment number of retries and restart the timer.
+      if (!outstandingCasList.isEmpty() && (entry = lookupEntry(aCasReferenceId, outstandingCasList)) != null) {
+        restartTimerForCas(entry);
+      } else {
+        // Create a new entry to be stored in the list of CASes pending reply
         entry = new DelegateEntry(aCasReferenceId);
-        //  Remember the command 
+        // Remember the command
         entry.setCommand(AsynchAEMessage.Process);
         // Start delegate timer if the pending list is empty
-        if (outstandingCasList.isEmpty() && getCasProcessTimeout() > 0 ) {
+        if (outstandingCasList.isEmpty() && getCasProcessTimeout() > 0) {
           startDelegateTimer(aCasReferenceId, AsynchAEMessage.Process);
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
-                    "addCasToOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+            UIMAFramework.getLogger(CLASS_NAME).logrb(
+                    Level.FINE,
+                    this.getClass().getName(),
+                    "addCasToOutstandingList",
+                    UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                     "UIMAEE_start_timer_FINE",
-                    new Object[] { getComponentName(), delegateKey, aCasReferenceId, getCasProcessTimeout() });
+                    new Object[] { getComponentName(), delegateKey, aCasReferenceId,
+                        getCasProcessTimeout() });
           }
         }
-        //  Append Cas Entry to the end of the list
+        // Append Cas Entry to the end of the list
         outstandingCasList.add(entry);
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
-                  "addCasToOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+          UIMAFramework.getLogger(CLASS_NAME).logrb(
+                  Level.FINE,
+                  this.getClass().getName(),
+                  "addCasToOutstandingList",
+                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_add_cas_to_delegate_pending_reply_FINE",
-                  new Object[] { getComponentName(), delegateKey, aCasReferenceId,outstandingCasList.size() });
+                  new Object[] { getComponentName(), delegateKey, aCasReferenceId,
+                      outstandingCasList.size() });
         }
       }
     }
   }
+
+  /**
+   * Adds given CAS ID to the list of CASes pending dispatch. These CASes are delayed due to a
+   * questionable state of the delegate that most likely timed out on a previous CAS. When the
+   * timeout occurs, the subsequent CASes are queued (delayed) and a GetMeta request is sent to the
+   * delegate. When the delegate responds to GetMeta request, the state of the delegate is reset
+   * back to normal and the CASes queued (delayed) are immediately send to the delegate.
+   * 
+   * @param aCasReferenceId
+   *          - CAS ID to add to the delayed list
+   */
+  public int addCasToPendingDispatchList(String aCasReferenceId) {
+    synchronized (pendingDispatchListMux) {
+      DelegateEntry entry = null;
+      // Create a new entry to be stored in the list of CASes pending
+      // dispatch
+      entry = new DelegateEntry(aCasReferenceId);
+      // Remember the command
+      entry.setCommand(AsynchAEMessage.Process);
+      // Append Cas Entry to the end of the list
+      pendingDispatchList.add(entry);
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+        dumpDelayedList();
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINE,
+                this.getClass().getName(),
+                "addCasToPendingDispatchList",
+                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_add_cas_to_delegate_pending_dispatch_FINE",
+                new Object[] { getComponentName(), delegateKey, aCasReferenceId,
+                    pendingDispatchList.size() });
+      }
+      return pendingDispatchList.size();
+    }
+  }
+  /**
+   * Logs CASes sitting in the list of CASes pending dispatch. These CASes
+   * were delayed due to a bad state of the delegate. 
+   */
+  private void dumpDelayedList() {
+    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+      for( DelegateEntry entry: pendingDispatchList) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINE,
+                this.getClass().getName(),
+                "dumpDelayedList",
+                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_dump_cas_pending_dispatch__FINE",
+                new Object[] { getComponentName(),  entry.getCasReferenceId(), delegateKey });
+      }
+    }
+  }
+  /**
+   * Logs CASes sitting in the list of CASes pending reply. 
+   */
+  private void dumpPendingReplyList() {
+    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+      for( DelegateEntry entry: outstandingCasList) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINE,
+                this.getClass().getName(),
+                "dumpPendingReplyList",
+                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_dump_cas_pending_reply__FINE",
+                new Object[] { getComponentName(),  entry.getCasReferenceId(), delegateKey });
+      }
+    }
+  }
   /**
    * Increments retry count
    * 
@@ -160,46 +287,114 @@
    */
   public void incrementRetryCount(String aCasReferenceId) {
     synchronized (outstandingCasListMux) {
-      DelegateEntry entry = lookupEntry(aCasReferenceId);
-      if ( entry != null ) {
+      DelegateEntry entry = lookupEntry(aCasReferenceId, outstandingCasList);
+      if (entry != null) {
         entry.incrementRetryCount();
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
-                  "incrementRetryCount", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+          UIMAFramework.getLogger(CLASS_NAME).logrb(
+                  Level.FINE,
+                  this.getClass().getName(),
+                  "incrementRetryCount",
+                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_increment_retry_count_FINE",
-                  new Object[] { getComponentName(), delegateKey, aCasReferenceId, entry.getRetryCount() });
+                  new Object[] { getComponentName(), delegateKey, aCasReferenceId,
+                      entry.getRetryCount() });
         }
       }
     }
   }
- /**
-  * Returns {@link DelegateEntry} instance that matches given CAS ID 
-  * pending reply. 
-  *  
-  * @param aCasReferenceId - unique id of a CAS to be searched for
-  * @return
-  */
-  private DelegateEntry lookupEntry(String aCasReferenceId) {
-    for (DelegateEntry entry : outstandingCasList) {
+
+  /**
+   * Returns {@link DelegateEntry} instance that matches given CAS ID pending reply.
+   * 
+   * @param aCasReferenceId
+   *          - unique id of a CAS to be searched for
+   * @return
+   */
+  private DelegateEntry lookupEntry(String aCasReferenceId, List<DelegateEntry> list) {
+    for (DelegateEntry entry : list) {
       if (entry.getCasReferenceId().equals(aCasReferenceId)) {
         return entry;
       }
     }
     return null;
   }
+  /** 
+   * Removes the oldest entry from the list of CASes pending dispatch.
+   * A CAS is delayed and placed on this list when the delegate status
+   * changes to TIMED_OUT and a PING message is sent to test delegate 
+   * availability. Until the PING message is acked by the delegate OR
+   * the PING times out, all CASes are delayed. When the PING is acked
+   * by the delegate ALL delayed CASes are sent to the delegate one at
+   * a time.
+   * 
+   * @return - ID of the oldest CAS in the list
+   */
+  public String removeOldestFromPendingDispatchList() {
+    synchronized (pendingDispatchListMux) {
+      if ( pendingDispatchList.size() > 0 ) {
+        String casReferenceId = pendingDispatchList.remove(0).getCasReferenceId();
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(
+                  Level.FINE,
+                  this.getClass().getName(),
+                  "removeOldestFromPendingDispatchList",
+                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAEE_removed_cas_from_delegate_pending_dispatch_list__FINE",
+                  new Object[] { getComponentName(), delegateKey, casReferenceId,
+                    pendingDispatchList.size() });
+        }
+        return casReferenceId;
+      }
+    }
+    return null;
+  }
+
+  
+  /** 
+   * Removes an entry from the list of CASes pending dispatch that
+   * matches a given CAS Id.A CAS is delayed and placed on this list when the delegate status
+   * changes to TIMED_OUT and a PING message is sent to test delegate 
+   * availability. Until the PING message is acked by the delegate OR
+   * the PING times out, all CASes are delayed. When the PING is acked
+   * by the delegate ALL delayed CASes are sent to the delegate one at
+   * a time.
+   * 
+   * @return - ID of the oldest CAS in the list
+   */
+  public boolean removeCasFromPendingDispatchList(String aCasReferenceId) {
+    synchronized (pendingDispatchListMux) {
+      DelegateEntry entry = lookupEntry(aCasReferenceId, pendingDispatchList);
+      if (entry != null) {
+        pendingDispatchList.remove(entry);
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(
+                  Level.FINE,
+                  this.getClass().getName(),
+                  "removeCasFromPendingDispatchList",
+                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAEE_removed_cas_from_delegate_pending_dispatch_list__FINE",
+                  new Object[] { getComponentName(), delegateKey, entry.getCasReferenceId(),
+                    pendingDispatchList.size() });
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
-   * Removes {@link DelegateEntry} from the list of CASes pending reply. The entry is removed
-   * when either:
-   * 1) reply is received from the delegate before the timeout
-   * 2) the timeout occurs with no retry
-   * 3) an error occurs and the CAS is dropped as part of Error Handling
+   * Removes {@link DelegateEntry} from the list of CASes pending reply. The entry is removed when
+   * either: 1) reply is received from the delegate before the timeout 2) the timeout occurs with no
+   * retry 3) an error occurs and the CAS is dropped as part of Error Handling
    * 
-   * @param aCasReferenceId - id of the CAS to remove from the list
+   * @param aCasReferenceId
+   *          - id of the CAS to remove from the list
    */
   public boolean removeCasFromOutstandingList(String aCasReferenceId) {
     synchronized (outstandingCasListMux) {
-      DelegateEntry entry = lookupEntry(aCasReferenceId);
-      if ( entry != null ) {
+      DelegateEntry entry = lookupEntry(aCasReferenceId, outstandingCasList);
+      if (entry != null) {
         this.removeCasFromOutstandingList(entry);
         return true;
       }
@@ -208,9 +403,24 @@
   }
 
   /**
+   * Removes {@link DelegateEntry} from the list of CASes pending reply. The entry is removed when
+   * either: 1) reply is received from the delegate before the timeout 2) the timeout occurs with no
+   * retry 3) an error occurs and the CAS is dropped as part of Error Handling
+   * 
+   * @param aCasReferenceId
+   *          - id of the CAS to remove from the list
+   */
+  public String removeOldestCasFromOutstandingList() {
+    synchronized (outstandingCasListMux) {
+      dumpPendingReplyList();
+      return outstandingCasList.remove(0).getCasReferenceId();
+    }
+  }
+  /**
    * Removes {@link DelegateEntry} from the list of CASes pending reply. If the CAS removed was the
    * oldest in the list (first in the list) AND there are other CASes in the list pending reply AND
-   * the delegate timeout is configured ( timeout > 0) , restart the timer for the next oldest CAS in the list.
+   * the delegate timeout is configured ( timeout > 0) , restart the timer for the next oldest CAS
+   * in the list.
    * 
    * @param aDelegateEntry
    */
@@ -220,46 +430,67 @@
     DelegateEntry oldestEntry = outstandingCasList.get(0);
     boolean doStartDelegateTimer = oldestEntry.equals(aDelegateEntry) && getCasProcessTimeout() > 0;
     outstandingCasList.remove(aDelegateEntry);
-    
+
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
-              "removeCasFromOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+      UIMAFramework.getLogger(CLASS_NAME).logrb(
+              Level.FINE,
+              this.getClass().getName(),
+              "removeCasFromOutstandingList",
+              UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
               "UIMAEE_removed_cas_from_delegate_list__FINE",
-              new Object[] { getComponentName(), delegateKey, aDelegateEntry.getCasReferenceId(), outstandingCasList.size() });
+              new Object[] { getComponentName(), delegateKey, aDelegateEntry.getCasReferenceId(),
+                  outstandingCasList.size() });
     }
     // Restart delegate Timer if the CAS removed was the oldest and the list is not empty
     if (doStartDelegateTimer) {
       // Cancel previous timer and restart it if there are still CASes in the outstanding list
       cancelDelegateTimer();
       if (!outstandingCasList.isEmpty()) {
-        //  get the oldest entry from the list of CASes pending reply
+        // get the oldest entry from the list of CASes pending reply
         DelegateEntry delegateEntry = outstandingCasList.get(0);
-        // Restart the timer for the oldest CAS in the list 
+
+        // Restart the timer for the oldest CAS in the list
         startDelegateTimer(delegateEntry.getCasReferenceId(), delegateEntry.getCommand());
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
-                  "removeCasFromOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+          UIMAFramework.getLogger(CLASS_NAME).logrb(
+                  Level.FINE,
+                  this.getClass().getName(),
+                  "removeCasFromOutstandingList",
+                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_restart_timer_FINE",
-                  new Object[] { getComponentName(), delegateKey, delegateEntry.getCasReferenceId(), getCasProcessTimeout() });
+                  new Object[] { getComponentName(), delegateKey,
+                      delegateEntry.getCasReferenceId(), getCasProcessTimeout() });
         }
       }
     }
-    
+
   }
+
   /**
    * Cancels timer and clears a list of CASes pending reply
    */
   public void cleanup() {
     cancelDelegateTimer();
-    synchronized( outstandingCasListMux ) {
+    synchronized (outstandingCasListMux) {
       outstandingCasList.clear();
     }
+    synchronized( pendingDispatchListMux) {
+      pendingDispatchList.clear();
+    }
   }
+
   public int getCasPendingReplyListSize() {
-    synchronized( outstandingCasListMux ) {
+    synchronized (outstandingCasListMux) {
       return outstandingCasList.size();
     }
   }
+
+  public int getCasPendingDispatchListSize() {
+    synchronized (pendingDispatchListMux) {
+      return pendingDispatchList.size();
+    }
+  }
+  
   /**
    * Cancels current timer
    */
@@ -271,9 +502,11 @@
   }
 
   /**
-   *  Returns a timeout value for a given command type. The values are defined
-   *  in the deployment descriptor
-   * @param aCommand - command for which a timeout value is saught
+   * Returns a timeout value for a given command type. The values are defined in the deployment
+   * descriptor
+   * 
+   * @param aCommand
+   *          - command for which a timeout value is saught
    * 
    * @return - long time out value
    */
@@ -289,28 +522,34 @@
         return -1;
     }
   }
+
   /**
    * Starts GetMeta Request timer
    */
   public void startGetMetaRequestTimer() {
     startDelegateTimer(null, AsynchAEMessage.GetMeta);
   }
+
   /**
    * Starts a timer for a given command
    * 
-   * @param aCasReferenceId - id of a CAS if command = Process, null otherwise
-   * @param aCommand - command for which the timer is started
+   * @param aCasReferenceId
+   *          - id of a CAS if command = Process, null otherwise
+   * @param aCommand
+   *          - command for which the timer is started
    */
   private void startDelegateTimer(final String aCasReferenceId, final int aCommand) {
     final long timeToWait = getTimeoutValueForCommand(aCommand);
     Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);
-    timer = new Timer("Controller:" + getComponentName() + ":TimerThread-Endpoint_impl:"
-            + endpoint + ":" + System.nanoTime() + ":Cmd:" + aCommand);
+    timer = new Timer("Controller:" + getComponentName() + ":TimerThread-Endpoint_impl:" + endpoint
+            + ":" + System.nanoTime() + ":Cmd:" + aCommand);
     final Delegate delegate = this;
     timer.schedule(new TimerTask() {
       public void run() {
-
         cancelDelegateTimer();
+        delegate.setState(TIMEOUT_STATE);
+        ErrorContext errorContext = new ErrorContext();
+
         if (AsynchAEMessage.Process == aCommand) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
@@ -318,24 +557,23 @@
                     "UIMAEE_cas_timeout_no_reply__INFO",
                     new Object[] { delegate.getKey(), timeToWait, aCasReferenceId });
           }
+          errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
         } else if (AsynchAEMessage.GetMeta == aCommand) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
                     "Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_meta_timeout_no_reply__INFO", new Object[] { delegate.getKey(), timeToWait });
+                    "UIMAEE_meta_timeout_no_reply__INFO",
+                    new Object[] { delegate.getKey(), timeToWait });
           }
         } else if (AsynchAEMessage.CollectionProcessComplete == aCommand) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
                     "Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_cpc_timeout_no_reply__INFO", new Object[] { delegate.getKey(), timeToWait });
+                    "UIMAEE_cpc_timeout_no_reply__INFO",
+                    new Object[] { delegate.getKey(), timeToWait });
           }
 
         }
-        ErrorContext errorContext = new ErrorContext();
-        if (aCasReferenceId != null) {
-          errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
-        }
         errorContext.add(AsynchAEMessage.Command, aCommand);
         errorContext.add(AsynchAEMessage.Endpoint, getEndpoint());
         handleError(new MessageTimeoutException(), errorContext);
@@ -373,18 +611,24 @@
     }
   }
 
-  public void setState(int state) {
+  public void setState(int aState) {
     synchronized (stateMux) {
-      this.state = state;
+      //  Change the state to timout, only if the current state = OK_STATE
+      //  This prevents overriding DISABLED state.
+      if ( aState == TIMEOUT_STATE && this.state != OK_STATE ) {
+        return;
+      } 
+      state = aState;
     }
   }
-  
+
   public abstract void handleError(Exception e, ErrorContext errorContext);
-  
-  public abstract String getComponentName(); 
+
+  public abstract String getComponentName();
+
   /**
-   * Entry in the list of CASes pending reply. It stores the {@link CacheEntry} containing information
-   * about a CAS that was sent to the delegate.
+   * Entry in the list of CASes pending reply. It stores the {@link CacheEntry} containing
+   * information about a CAS that was sent to the delegate.
    * 
    * 
    */
@@ -393,8 +637,8 @@
 
     private int command;
 
-    private int retryCount=0;
-    
+    private int retryCount = 0;
+
     public DelegateEntry(String aCasReferenceId) {
       casReferenceId = aCasReferenceId;
     }
@@ -414,12 +658,11 @@
     public void incrementRetryCount() {
       this.retryCount++;
     }
-    
+
     public void resetRetryCount() {
       this.retryCount = 0;
     }
 
-
     public String getCasReferenceId() {
       return casReferenceId;
     }

Added: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java?rev=736711&view=auto
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java (added)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java Thu Jan 22 09:02:21 2009
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.aae.error;
+
+public class UimaASPingTimeout extends Exception
+{
+	public UimaASPingTimeout()
+	{
+	}
+
+	public UimaASPingTimeout(String message)
+	{
+		super(message);
+	}
+
+	public UimaASPingTimeout(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public UimaASPingTimeout(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java Thu Jan 22 09:02:21 2009
@@ -29,6 +29,7 @@
 import org.apache.uima.aae.controller.BaseAnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.error.ErrorContext;
 import org.apache.uima.aae.error.ErrorHandler;
 import org.apache.uima.aae.error.ErrorHandlerBase;
@@ -83,10 +84,10 @@
 
 		if ( endpoint != null && aController instanceof AggregateAnalysisEngineController )
 		{
-			Threshold threshold = super.getThreshold(endpoint, delegateMap, aController);
+			  Threshold threshold = super.getThreshold(endpoint, delegateMap, aController);
 	    	String key = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(endpoint.getEndpoint());
-	    	//	If threshold is not defined, assume action=terminate
-	    	if (  threshold == null || threshold.getMaxRetries() == 0 || 
+	    	Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
+	    	if (  delegate.isAwaitingPingReply() || threshold == null || threshold.getMaxRetries() == 0 || 
 	    			  ( super.retryLastCommand(AsynchAEMessage.GetMeta, endpoint, aController, key, threshold, anErrorContext) == false )	
 	    	        )
 	    	{
@@ -99,7 +100,12 @@
 	    	                new Object[] {aController.getComponentName(), endpoint.getEndpoint()});
 	          }
 	    			aController.terminate();
-	    			aController.notifyListenersWithInitializationStatus((Exception)t);
+	    			//  Notify if the error occurred during initialization of the service.
+	    			//  If the ping times out, there is no need to notify the listener. We
+	    			//  use getMeta request as a ping to check if the service is running.
+	    			if ( !delegate.isAwaitingPingReply() ) {
+	            aController.notifyListenersWithInitializationStatus((Exception)t);
+	    			}
 	    		}
 	    		else
 	    		{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Thu Jan 22 09:02:21 2009
@@ -290,8 +290,12 @@
 		           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, getClass().getName(), "handleError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_threshold_for_endpoint__CONFIG", new Object[] { aController.getComponentName(), "Process",  key });
 		         }
 		    	}
-		    	Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
-		    	delegate.removeCasFromOutstandingList(casReferenceId);
+		    	if ( key != null ) {
+		    		//	Received reply from the delegate. Remove the CAS from the 
+		    		//	delegate's list of CASes pending reply
+	          Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
+	          delegate.removeCasFromOutstandingList(casReferenceId);
+		    	}
 			}
 			else
 			{
@@ -416,14 +420,22 @@
 			    flowControllerContinueFlag = 
 			      ((AggregateAnalysisEngineController) aController).continueOnError(casReferenceId, key, (Exception) t );
 			  }
-			  catch( Exception exc) {}
+			  catch( Exception exc) 
+			  {
+			    exc.printStackTrace();
+			    
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "handleError", 
+							UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", exc);
+          }
+			  }
 			}
 			//	Check if the caller has already decremented number of subordinates. This property is only
 			//	set in the Aggregate's finalStep() method before the CAS is sent back to the client. If
 			//	there was a problem sending the CAS to the client, we dont want to update the counter 
 			//	again. If an exception is reported elsewhere ( not in finalStep()), the default action is
 			//	to decrement the number of subordinates associated with the parent CAS.
-			if ( !anErrorContext.containsKey(AsynchAEMessage.SkipSubordinateCountUpdate)) 
+			if (!flowControllerContinueFlag && !anErrorContext.containsKey(AsynchAEMessage.SkipSubordinateCountUpdate)) 
 			{
 				//	Check if the CAS is a subordinate (has parent CAS).
 				if ( casStateEntry != null && casStateEntry.isSubordinate())

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java Thu Jan 22 09:02:21 2009
@@ -66,11 +66,35 @@
             Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
             if ( delegate.getEndpoint().isRemote() ) {
               delegate.cancelDelegateTimer();
+              delegate.setState(Delegate.OK_STATE);
               if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
                         "handle", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                         "UIMAEE_cancelled_timer_FINE", new Object[] { getController().getComponentName(), delegateKey });
               }
+              String casReferenceId = null;
+              //  Check if the GetMeta reply was actually a PING message to check
+              //  delegate's availability. This would be the case if the delegate
+              //  has previously timed out waiting for Process CAS reply.
+              if ( delegate.isAwaitingPingReply() && delegate.getState() == Delegate.OK_STATE) {
+                //  Since this is a reply to a ping we may have delayed some 
+                //  CASes waiting for the ping to come back. Drain the list
+                //  of delayed CASes and send each CAS to the delegate.
+                while ( (casReferenceId = delegate.removeOldestFromPendingDispatchList() ) != null ) {
+                  ((AggregateAnalysisEngineController)getController()).
+                    retryLastCommand(AsynchAEMessage.Process, delegate.getEndpoint(), casReferenceId);
+                }
+                
+                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
+                          "handle", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                          "UIMAEE_aggregate_rcvd_ping_reply__FINE", new Object[] { getController().getComponentName(), delegateKey });
+                }
+                //  Reset delegate flag to indicate that the ping reply was received
+                delegate.resetAwaitingPingReply();
+                //  No need to merge typesystem. We've received a reply to a ping
+                return;
+              }
             }
             if (AsynchAEMessage.Exception == payload)
 						{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Thu Jan 22 09:02:21 2009
@@ -164,7 +164,6 @@
 			}
       String delegateKey =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
       Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
-      
       boolean casRemovedFromOutstandingList = delegate.removeCasFromOutstandingList(casReferenceId);
 			
 			//  Check if this process reply message is expected. A message is expected if the Cas Id 

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties Thu Jan 22 09:02:21 2009
@@ -193,5 +193,10 @@
 UIMAEE_restart_timer_FINE = Controller: {0} Restarted Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} ms
 UIMAEE_increment_retry_count_FINE = Controller: {0} Incremented Retry Count For Delegate: {1} Cas Id: {2} Current Retry Count: {3}
 UIMAEE_cancelled_timer_FINE = Controller: {0} Received GetMeta Reply From Delegate: {1} Cancelling Timer
-
-
+UIMAEE_add_cas_to_delegate_pending_dispatch_FINE = Controller: {0} Added CAS To Delegates List Of CASes Pending Dispatch. Delegate: {1} Cas Id: {2} List Size After Add: {3}
+UIMAEE_removed_cas_from_delegate_pending_dispatch_list__FINE = Controller: {0} Removed CAS Entry From Delegates List Of CASes Pending Dispatch. Delegate: {1} Cas Id: {2} List Size After Remove: {3}
+UIMAEE_aggregate_sending_ping__FINE = Controller: {0} Sending Ping Message (GetMeta) To Check Delegate: {1} Availability.
+UIMAEE_aggregate_rcvd_ping_reply__FINE = Controller: {0} Received Ping Message (GetMeta) Reply From Delegate: {1}.
+UIMAEE_dump_cas_pending_dispatch__FINE = Controller: {0} Cas Id: {1} Pending Dispatch To Delegate: {2}
+UIMAEE_dump_cas_pending_reply__FINE = Controller: {0} Cas Id: {1} Pending Reply From Delegate: {2}
+UIMAEE_force_cas_timeout__INFO = Controller: {0} Disabled Delegate: {1}. Forcing Timeout Of CAS: {2} Found In : {3}  
\ No newline at end of file