You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/01/22 18:02:22 UTC
svn commit: r736711 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main:
java/org/apache/uima/aae/controller/ java/org/apache/uima/aae/delegate/
java/org/apache/uima/aae/error/ java/org/apache/uima/aae/error/handler/
java/org/apache/uima/...
Author: cwiklik
Date: Thu Jan 22 09:02:21 2009
New Revision: 736711
URL: http://svn.apache.org/viewvc?rev=736711&view=rev
Log:
UIMA-1127 Modified to use GetMeta as Ping message on service Timeout
Added:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java Thu Jan 22 09:02:21 2009
@@ -114,4 +114,6 @@
public Delegate lookupDelegate( String aDelegateKey );
+ public boolean delayCasIfDelegateInTimedOutState( String aCasReferenceId, String aDelegateKey ) throws AsynchAEException;
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu Jan 22 09:02:21 2009
@@ -1777,10 +1777,44 @@
}
else
{
- getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
+ // Check delegate's state before sending it a CAS. The delegate
+ // may have previously timed out and is in a process of pinging
+ // the delegate to check its availability. While the delegate
+ // is in this state, delay CASes by placing them on a list of
+ // CASes pending dispatch. Once the ping reply is received all
+ // delayed CASes will be dispatched to the delegate.
+ if ( !delayCasIfDelegateInTimedOutState( aCasReferenceId, anEndpoint.getEndpoint() ) ) {
+ // The delegate is in the normal state so send it this CAS
+ getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
+ }
}
}
-
+ /**
+ * Checks the state of a delegate to see if it is in TIMEOUT State.
+ * If it is, push the CAS id onto a list of CASes pending dispatch.
+ * The delegate is in a questionable state and the aggregate sends
+ * a ping message to check delegate's availability. If the delegate
+ * responds to the ping, all CASes in the pending dispatch list will
+ * be immediately dispatched.
+ **/
+ public boolean delayCasIfDelegateInTimedOutState( String aCasReferenceId, String aDelegateKey ) throws AsynchAEException {
+ Delegate delegate = lookupDelegate(aDelegateKey);
+ if (delegate != null && delegate.getState() == Delegate.TIMEOUT_STATE ) {
+ // Add CAS id to the list of delayed CASes.
+ int listSize = delegate.addCasToPendingDispatchList(aCasReferenceId);
+ // If the list was empty (before the add), send the GetMeta request
+ // as a PING to see if the delegate service is alive.
+ if ( listSize == 1 ) {
+ delegate.setAwaitingPingReply();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "delayCasIfDelegateInTimedOutState", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_aggregate_sending_ping__FINE", new Object[] { getComponentName(), delegate.getKey() });
+ }
+ retryMetadataRequest(delegate.getEndpoint());
+ }
+ return true;
+ }
+ return false; // Cas Not Delayed
+ }
private void dispatchProcessRequest(String aCasReferenceId, Endpoint anEndpoint, boolean addEndpointToCache) throws AsynchAEException
{
if (addEndpointToCache)
@@ -1823,15 +1857,26 @@
private void dispatchProcessRequest(String aCasReferenceId, Endpoint[] anEndpointList, boolean addEndpointToCache) throws AsynchAEException
{
+ List<Endpoint> endpointList = new ArrayList<Endpoint>();
for (int i = 0; i < anEndpointList.length; i++)
{
+ // Check if the delegate previously timed out. If so, add the CAS
+ // Id to the list pending dispatch. This list holds CASes that are
+ // delayed until the service responds to a Ping.
+ if ( delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpointList[i].getEndpoint())) {
+ // The CAS was delayed until the delegate responds to a Ping
+ continue;
+ } else {
+ endpointList.add(anEndpointList[i]);
+ }
if (addEndpointToCache)
{
getInProcessCache().addEndpoint(anEndpointList[i], aCasReferenceId);
}
}
-
- getOutputChannel().sendRequest(aCasReferenceId, anEndpointList);
+ Endpoint[] endpoints = new Endpoint[endpointList.size()];
+ endpointList.toArray(endpoints);
+ getOutputChannel().sendRequest(aCasReferenceId, endpoints);
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Thu Jan 22 09:02:21 2009
@@ -53,6 +53,7 @@
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.error.MessageTimeoutException;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
import org.apache.uima.aae.jmx.JmxManagement;
@@ -1008,6 +1009,60 @@
}
((AggregateAnalysisEngineController)this).disableDelegates(list);
+ if ( key != null && key.trim().length() > 0) {
+ // Delegate has been disabled. Cleanup Delegate's lists. Each Delegate
+ // maintains a list of CASes pending reply and a different list of CASes
+ // pending dispatch. The first list contains CASes sent to the delegate.
+ // When a reply is received from the delegate, the CAS is removed from
+ // the list. The second list contains CASes that have been delayed
+ // while the service was in the TIMEDOUT state. These CASes were to
+ // be dispatched to the delegate once its state is reset to OK. It is
+ // reset to OK state when the delegate responds to the client PING
+ // request. Since we have disabled the delegate, remove ALL CASes from
+ // both lists and send them through the ErrorHandler one at a time
+ // as if these CASes timed out.
+
+ Delegate delegate = ((AggregateAnalysisEngineController)this).lookupDelegate(key);
+ // Cancel the delegate timer. No more responses are expected
+ delegate.cancelDelegateTimer();
+
+ // If the delegate has CASes pending reply still, send each CAS
+ // from the pending list through the error handler with
+ // MessageTimeoutException as a cause of error
+ while ( delegate.getCasPendingReplyListSize() > 0 ) {
+ String timedOutCasId = delegate.removeOldestCasFromOutstandingList();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_force_cas_timeout__INFO",
+ new Object[] { getComponentName(), key, timedOutCasId, " Pending Reply List" });
+ }
+
+ ErrorContext errorContext = new ErrorContext();
+ errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ errorContext.add(AsynchAEMessage.CasReference, timedOutCasId);
+ errorContext.add(AsynchAEMessage.Endpoint, endpoint);
+ getErrorHandlerChain().handle(new MessageTimeoutException(), errorContext, this);
+ }
+ // If the delegate has CASes pending dispatch, send each CAS
+ // from the pending dispatch list through the error handler with
+ // MessageTimeoutException as a cause of error
+ while ( delegate.getCasPendingDispatchListSize() > 0 ) {
+ String timedOutCasId = delegate.removeOldestFromPendingDispatchList();
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_force_cas_timeout__INFO",
+ new Object[] { getComponentName(), key, timedOutCasId, " Pending Dispatch List" });
+ }
+
+ ErrorContext errorContext = new ErrorContext();
+ errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ errorContext.add(AsynchAEMessage.CasReference, timedOutCasId);
+ errorContext.add(AsynchAEMessage.Endpoint, endpoint);
+ getErrorHandlerChain().handle(new MessageTimeoutException(), errorContext, this);
+ }
+ }
+
if ( endpoint != null ) {
try {
// Fetch all Cas entries currently awaiting reply from the delegate that was just
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Thu Jan 22 09:02:21 2009
@@ -190,7 +190,9 @@
return replyReceived;
}
public synchronized void incrementHowManyDelegatesResponded(){
- howManyDelegatesResponded++;
+ if ( howManyDelegatesResponded < numberOfParallelDelegates) {
+ howManyDelegatesResponded++;
+ }
}
public synchronized int howManyDelegatesResponded(){
return howManyDelegatesResponded;
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Thu Jan 22 09:02:21 2009
@@ -72,7 +72,28 @@
// synchronizes access to the delegate's state
private Object stateMux = new Object();
+
+ // List holding CASes that have been delayed due to a delegate timeout. These
+ // CASes should be send to the delegate as soon as the getMeta (Ping) is received.
+ private List<DelegateEntry> pendingDispatchList = new ArrayList<DelegateEntry>();
+
+ // synchronizes access to the list of CASes pending dispatch
+ private Object pendingDispatchListMux = new Object();
+ // Flag that is set when getMeta reply is received
+ private volatile boolean awaitingPingReply;
+
+ public boolean isAwaitingPingReply() {
+ return awaitingPingReply;
+ }
+
+ public void setAwaitingPingReply() {
+ this.awaitingPingReply = true;
+ }
+
+ public void resetAwaitingPingReply() {
+ this.awaitingPingReply = false;
+ }
/**
* Returns delegate key
*
@@ -85,8 +106,8 @@
/**
* Sets an {@link Endpoint} object
*
- * @param anEndpoint -
- * an endpoint object
+ * @param anEndpoint
+ * - an endpoint object
*/
public void setEndpoint(Endpoint anEndpoint) {
endpoint = anEndpoint;
@@ -100,59 +121,165 @@
public Endpoint getEndpoint() {
return endpoint;
}
+ /**
+ * Forces Timer restart for the oldest CAS sitting in the list
+ * of CASes pending reply.
+ */
+ public void restartTimerForOldestCasInOutstandingList() {
+ DelegateEntry entry = null;
+ synchronized (outstandingCasListMux) {
+ if (!outstandingCasList.isEmpty()) {
+ // Get the oldest entry
+ entry = outstandingCasList.get(0);
+ if ( entry != null ) {
+ restartTimerForCas(entry);
+ }
+ }
+ }
+ }
+ /**
+ * Restarts timer for a given CAS
+ *
+ * @param entry
+ */
+ private void restartTimerForCas( DelegateEntry entry ) {
+ if (getCasProcessTimeout() > 0) {
+ entry.incrementRetryCount();
+ // restart timer for retry
+ startDelegateTimer(entry.getCasReferenceId(), AsynchAEMessage.Process);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "restartTimerForCas",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_restart_timer_FINE",
+ new Object[] { getComponentName(), delegateKey, entry.getCasReferenceId(),
+ getCasProcessTimeout() });
+ }
+ }
+ }
+
/**
- * Adds a given Cas ID to the list of CASes pending reply. A new timer will be started
- * to handle delegate's timeout if either:
- * 1) the list of CASes pending reply is empty AND delegate timeout > 0
- * 2) the list already contains the CAS ID AND delegate timeout > 0. This is a retry logic.
- *
- * @param aCasReferenceId - CAS ID to add to pending list if not already there
- *
+ * Adds a given Cas ID to the list of CASes pending reply. A new timer will be started to handle
+ * delegate's timeout if either: 1) the list of CASes pending reply is empty AND delegate timeout
+ * > 0 2) the list already contains the CAS ID AND delegate timeout > 0. This is a retry logic.
+ *
+ * @param aCasReferenceId
+ * - CAS ID to add to pending list if not already there
+ *
*/
public void addCasToOutstandingList(String aCasReferenceId) {
synchronized (outstandingCasListMux) {
DelegateEntry entry = null;
- // Check if the outstanding list already contains entry for the Cas Id. If it does, retry logic
- // is calling this method. Increment number of retries and restart the timer.
- if ( !outstandingCasList.isEmpty() && (entry = lookupEntry(aCasReferenceId)) != null) {
- entry.incrementRetryCount();
- if ( getCasProcessTimeout() > 0 ) {
- // restart timer for retry
- startDelegateTimer(aCasReferenceId, AsynchAEMessage.Process);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
- "addCasToOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_restart_timer_FINE",
- new Object[] { getComponentName(), delegateKey, aCasReferenceId, getCasProcessTimeout() });
- }
- }
- } else {
- // Create a new entry to be stored in the list of CASes pending reply
+ // Check if the outstanding list already contains entry for the Cas Id. If it does, retry
+ // logic
+ // is calling this method. Increment number of retries and restart the timer.
+ if (!outstandingCasList.isEmpty() && (entry = lookupEntry(aCasReferenceId, outstandingCasList)) != null) {
+ restartTimerForCas(entry);
+ } else {
+ // Create a new entry to be stored in the list of CASes pending reply
entry = new DelegateEntry(aCasReferenceId);
- // Remember the command
+ // Remember the command
entry.setCommand(AsynchAEMessage.Process);
// Start delegate timer if the pending list is empty
- if (outstandingCasList.isEmpty() && getCasProcessTimeout() > 0 ) {
+ if (outstandingCasList.isEmpty() && getCasProcessTimeout() > 0) {
startDelegateTimer(aCasReferenceId, AsynchAEMessage.Process);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
- "addCasToOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "addCasToOutstandingList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_start_timer_FINE",
- new Object[] { getComponentName(), delegateKey, aCasReferenceId, getCasProcessTimeout() });
+ new Object[] { getComponentName(), delegateKey, aCasReferenceId,
+ getCasProcessTimeout() });
}
}
- // Append Cas Entry to the end of the list
+ // Append Cas Entry to the end of the list
outstandingCasList.add(entry);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
- "addCasToOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "addCasToOutstandingList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_add_cas_to_delegate_pending_reply_FINE",
- new Object[] { getComponentName(), delegateKey, aCasReferenceId,outstandingCasList.size() });
+ new Object[] { getComponentName(), delegateKey, aCasReferenceId,
+ outstandingCasList.size() });
}
}
}
}
+
+ /**
+ * Adds given CAS ID to the list of CASes pending dispatch. These CASes are delayed due to a
+ * questionable state of the delegate that most likely timed out on a previous CAS. When the
+ * timeout occurs, the subsequent CASes are queued (delayed) and a GetMeta request is sent to the
+ * delegate. When the delegate responds to GetMeta request, the state of the delegate is reset
+ * back to normal and the CASes queued (delayed) are immediately send to the delegate.
+ *
+ * @param aCasReferenceId
+ * - CAS ID to add to the delayed list
+ */
+ public int addCasToPendingDispatchList(String aCasReferenceId) {
+ synchronized (pendingDispatchListMux) {
+ DelegateEntry entry = null;
+ // Create a new entry to be stored in the list of CASes pending
+ // dispatch
+ entry = new DelegateEntry(aCasReferenceId);
+ // Remember the command
+ entry.setCommand(AsynchAEMessage.Process);
+ // Append Cas Entry to the end of the list
+ pendingDispatchList.add(entry);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ dumpDelayedList();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "addCasToPendingDispatchList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_add_cas_to_delegate_pending_dispatch_FINE",
+ new Object[] { getComponentName(), delegateKey, aCasReferenceId,
+ pendingDispatchList.size() });
+ }
+ return pendingDispatchList.size();
+ }
+ }
+ /**
+ * Logs CASes sitting in the list of CASes pending dispatch. These CASes
+ * were delayed due to a bad state of the delegate.
+ */
+ private void dumpDelayedList() {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ for( DelegateEntry entry: pendingDispatchList) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "dumpDelayedList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_dump_cas_pending_dispatch__FINE",
+ new Object[] { getComponentName(), entry.getCasReferenceId(), delegateKey });
+ }
+ }
+ }
+ /**
+ * Logs CASes sitting in the list of CASes pending reply.
+ */
+ private void dumpPendingReplyList() {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ for( DelegateEntry entry: outstandingCasList) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "dumpPendingReplyList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_dump_cas_pending_reply__FINE",
+ new Object[] { getComponentName(), entry.getCasReferenceId(), delegateKey });
+ }
+ }
+ }
/**
* Increments retry count
*
@@ -160,46 +287,114 @@
*/
public void incrementRetryCount(String aCasReferenceId) {
synchronized (outstandingCasListMux) {
- DelegateEntry entry = lookupEntry(aCasReferenceId);
- if ( entry != null ) {
+ DelegateEntry entry = lookupEntry(aCasReferenceId, outstandingCasList);
+ if (entry != null) {
entry.incrementRetryCount();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
- "incrementRetryCount", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "incrementRetryCount",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_increment_retry_count_FINE",
- new Object[] { getComponentName(), delegateKey, aCasReferenceId, entry.getRetryCount() });
+ new Object[] { getComponentName(), delegateKey, aCasReferenceId,
+ entry.getRetryCount() });
}
}
}
}
- /**
- * Returns {@link DelegateEntry} instance that matches given CAS ID
- * pending reply.
- *
- * @param aCasReferenceId - unique id of a CAS to be searched for
- * @return
- */
- private DelegateEntry lookupEntry(String aCasReferenceId) {
- for (DelegateEntry entry : outstandingCasList) {
+
+ /**
+ * Returns {@link DelegateEntry} instance that matches given CAS ID pending reply.
+ *
+ * @param aCasReferenceId
+ * - unique id of a CAS to be searched for
+ * @return
+ */
+ private DelegateEntry lookupEntry(String aCasReferenceId, List<DelegateEntry> list) {
+ for (DelegateEntry entry : list) {
if (entry.getCasReferenceId().equals(aCasReferenceId)) {
return entry;
}
}
return null;
}
+ /**
+ * Removes the oldest entry from the list of CASes pending dispatch.
+ * A CAS is delayed and placed on this list when the delegate status
+ * changes to TIMED_OUT and a PING message is sent to test delegate
+ * availability. Until the PING message is acked by the delegate OR
+ * the PING times out, all CASes are delayed. When the PING is acked
+ * by the delegate ALL delayed CASes are sent to the delegate one at
+ * a time.
+ *
+ * @return - ID of the oldest CAS in the list
+ */
+ public String removeOldestFromPendingDispatchList() {
+ synchronized (pendingDispatchListMux) {
+ if ( pendingDispatchList.size() > 0 ) {
+ String casReferenceId = pendingDispatchList.remove(0).getCasReferenceId();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "removeOldestFromPendingDispatchList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_removed_cas_from_delegate_pending_dispatch_list__FINE",
+ new Object[] { getComponentName(), delegateKey, casReferenceId,
+ pendingDispatchList.size() });
+ }
+ return casReferenceId;
+ }
+ }
+ return null;
+ }
+
+
+ /**
+ * Removes an entry from the list of CASes pending dispatch that
+ * matches a given CAS Id.A CAS is delayed and placed on this list when the delegate status
+ * changes to TIMED_OUT and a PING message is sent to test delegate
+ * availability. Until the PING message is acked by the delegate OR
+ * the PING times out, all CASes are delayed. When the PING is acked
+ * by the delegate ALL delayed CASes are sent to the delegate one at
+ * a time.
+ *
+ * @return - ID of the oldest CAS in the list
+ */
+ public boolean removeCasFromPendingDispatchList(String aCasReferenceId) {
+ synchronized (pendingDispatchListMux) {
+ DelegateEntry entry = lookupEntry(aCasReferenceId, pendingDispatchList);
+ if (entry != null) {
+ pendingDispatchList.remove(entry);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "removeCasFromPendingDispatchList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_removed_cas_from_delegate_pending_dispatch_list__FINE",
+ new Object[] { getComponentName(), delegateKey, entry.getCasReferenceId(),
+ pendingDispatchList.size() });
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
- * Removes {@link DelegateEntry} from the list of CASes pending reply. The entry is removed
- * when either:
- * 1) reply is received from the delegate before the timeout
- * 2) the timeout occurs with no retry
- * 3) an error occurs and the CAS is dropped as part of Error Handling
+ * Removes {@link DelegateEntry} from the list of CASes pending reply. The entry is removed when
+ * either: 1) reply is received from the delegate before the timeout 2) the timeout occurs with no
+ * retry 3) an error occurs and the CAS is dropped as part of Error Handling
*
- * @param aCasReferenceId - id of the CAS to remove from the list
+ * @param aCasReferenceId
+ * - id of the CAS to remove from the list
*/
public boolean removeCasFromOutstandingList(String aCasReferenceId) {
synchronized (outstandingCasListMux) {
- DelegateEntry entry = lookupEntry(aCasReferenceId);
- if ( entry != null ) {
+ DelegateEntry entry = lookupEntry(aCasReferenceId, outstandingCasList);
+ if (entry != null) {
this.removeCasFromOutstandingList(entry);
return true;
}
@@ -208,9 +403,24 @@
}
/**
+ * Removes {@link DelegateEntry} from the list of CASes pending reply. The entry is removed when
+ * either: 1) reply is received from the delegate before the timeout 2) the timeout occurs with no
+ * retry 3) an error occurs and the CAS is dropped as part of Error Handling
+ *
+ * @param aCasReferenceId
+ * - id of the CAS to remove from the list
+ */
+ public String removeOldestCasFromOutstandingList() {
+ synchronized (outstandingCasListMux) {
+ dumpPendingReplyList();
+ return outstandingCasList.remove(0).getCasReferenceId();
+ }
+ }
+ /**
* Removes {@link DelegateEntry} from the list of CASes pending reply. If the CAS removed was the
* oldest in the list (first in the list) AND there are other CASes in the list pending reply AND
- * the delegate timeout is configured ( timeout > 0) , restart the timer for the next oldest CAS in the list.
+ * the delegate timeout is configured ( timeout > 0) , restart the timer for the next oldest CAS
+ * in the list.
*
* @param aDelegateEntry
*/
@@ -220,46 +430,67 @@
DelegateEntry oldestEntry = outstandingCasList.get(0);
boolean doStartDelegateTimer = oldestEntry.equals(aDelegateEntry) && getCasProcessTimeout() > 0;
outstandingCasList.remove(aDelegateEntry);
-
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
- "removeCasFromOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "removeCasFromOutstandingList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_removed_cas_from_delegate_list__FINE",
- new Object[] { getComponentName(), delegateKey, aDelegateEntry.getCasReferenceId(), outstandingCasList.size() });
+ new Object[] { getComponentName(), delegateKey, aDelegateEntry.getCasReferenceId(),
+ outstandingCasList.size() });
}
// Restart delegate Timer if the CAS removed was the oldest and the list is not empty
if (doStartDelegateTimer) {
// Cancel previous timer and restart it if there are still CASes in the outstanding list
cancelDelegateTimer();
if (!outstandingCasList.isEmpty()) {
- // get the oldest entry from the list of CASes pending reply
+ // get the oldest entry from the list of CASes pending reply
DelegateEntry delegateEntry = outstandingCasList.get(0);
- // Restart the timer for the oldest CAS in the list
+
+ // Restart the timer for the oldest CAS in the list
startDelegateTimer(delegateEntry.getCasReferenceId(), delegateEntry.getCommand());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
- "removeCasFromOutstandingList", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ this.getClass().getName(),
+ "removeCasFromOutstandingList",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_restart_timer_FINE",
- new Object[] { getComponentName(), delegateKey, delegateEntry.getCasReferenceId(), getCasProcessTimeout() });
+ new Object[] { getComponentName(), delegateKey,
+ delegateEntry.getCasReferenceId(), getCasProcessTimeout() });
}
}
}
-
+
}
+
/**
* Cancels timer and clears a list of CASes pending reply
*/
public void cleanup() {
cancelDelegateTimer();
- synchronized( outstandingCasListMux ) {
+ synchronized (outstandingCasListMux) {
outstandingCasList.clear();
}
+ synchronized( pendingDispatchListMux) {
+ pendingDispatchList.clear();
+ }
}
+
public int getCasPendingReplyListSize() {
- synchronized( outstandingCasListMux ) {
+ synchronized (outstandingCasListMux) {
return outstandingCasList.size();
}
}
+
+ public int getCasPendingDispatchListSize() {
+ synchronized (pendingDispatchListMux) {
+ return pendingDispatchList.size();
+ }
+ }
+
/**
* Cancels current timer
*/
@@ -271,9 +502,11 @@
}
/**
- * Returns a timeout value for a given command type. The values are defined
- * in the deployment descriptor
- * @param aCommand - command for which a timeout value is saught
+ * Returns a timeout value for a given command type. The values are defined in the deployment
+ * descriptor
+ *
+ * @param aCommand
+ * - command for which a timeout value is saught
*
* @return - long time out value
*/
@@ -289,28 +522,34 @@
return -1;
}
}
+
/**
* Starts GetMeta Request timer
*/
public void startGetMetaRequestTimer() {
startDelegateTimer(null, AsynchAEMessage.GetMeta);
}
+
/**
* Starts a timer for a given command
*
- * @param aCasReferenceId - id of a CAS if command = Process, null otherwise
- * @param aCommand - command for which the timer is started
+ * @param aCasReferenceId
+ * - id of a CAS if command = Process, null otherwise
+ * @param aCommand
+ * - command for which the timer is started
*/
private void startDelegateTimer(final String aCasReferenceId, final int aCommand) {
final long timeToWait = getTimeoutValueForCommand(aCommand);
Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);
- timer = new Timer("Controller:" + getComponentName() + ":TimerThread-Endpoint_impl:"
- + endpoint + ":" + System.nanoTime() + ":Cmd:" + aCommand);
+ timer = new Timer("Controller:" + getComponentName() + ":TimerThread-Endpoint_impl:" + endpoint
+ + ":" + System.nanoTime() + ":Cmd:" + aCommand);
final Delegate delegate = this;
timer.schedule(new TimerTask() {
public void run() {
-
cancelDelegateTimer();
+ delegate.setState(TIMEOUT_STATE);
+ ErrorContext errorContext = new ErrorContext();
+
if (AsynchAEMessage.Process == aCommand) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
@@ -318,24 +557,23 @@
"UIMAEE_cas_timeout_no_reply__INFO",
new Object[] { delegate.getKey(), timeToWait, aCasReferenceId });
}
+ errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
} else if (AsynchAEMessage.GetMeta == aCommand) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
"Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_meta_timeout_no_reply__INFO", new Object[] { delegate.getKey(), timeToWait });
+ "UIMAEE_meta_timeout_no_reply__INFO",
+ new Object[] { delegate.getKey(), timeToWait });
}
} else if (AsynchAEMessage.CollectionProcessComplete == aCommand) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
"Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_cpc_timeout_no_reply__INFO", new Object[] { delegate.getKey(), timeToWait });
+ "UIMAEE_cpc_timeout_no_reply__INFO",
+ new Object[] { delegate.getKey(), timeToWait });
}
}
- ErrorContext errorContext = new ErrorContext();
- if (aCasReferenceId != null) {
- errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
- }
errorContext.add(AsynchAEMessage.Command, aCommand);
errorContext.add(AsynchAEMessage.Endpoint, getEndpoint());
handleError(new MessageTimeoutException(), errorContext);
@@ -373,18 +611,24 @@
}
}
- public void setState(int state) {
+ public void setState(int aState) {
synchronized (stateMux) {
- this.state = state;
+ // Change the state to timout, only if the current state = OK_STATE
+ // This prevents overriding DISABLED state.
+ if ( aState == TIMEOUT_STATE && this.state != OK_STATE ) {
+ return;
+ }
+ state = aState;
}
}
-
+
public abstract void handleError(Exception e, ErrorContext errorContext);
-
- public abstract String getComponentName();
+
+ public abstract String getComponentName();
+
/**
- * Entry in the list of CASes pending reply. It stores the {@link CacheEntry} containing information
- * about a CAS that was sent to the delegate.
+ * Entry in the list of CASes pending reply. It stores the {@link CacheEntry} containing
+ * information about a CAS that was sent to the delegate.
*
*
*/
@@ -393,8 +637,8 @@
private int command;
- private int retryCount=0;
-
+ private int retryCount = 0;
+
public DelegateEntry(String aCasReferenceId) {
casReferenceId = aCasReferenceId;
}
@@ -414,12 +658,11 @@
public void incrementRetryCount() {
this.retryCount++;
}
-
+
public void resetRetryCount() {
this.retryCount = 0;
}
-
public String getCasReferenceId() {
return casReferenceId;
}
Added: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java?rev=736711&view=auto
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java (added)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/UimaASPingTimeout.java Thu Jan 22 09:02:21 2009
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.aae.error;
+
+public class UimaASPingTimeout extends Exception
+{
+ public UimaASPingTimeout()
+ {
+ }
+
+ public UimaASPingTimeout(String message)
+ {
+ super(message);
+ }
+
+ public UimaASPingTimeout(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public UimaASPingTimeout(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/GetMetaErrorHandler.java Thu Jan 22 09:02:21 2009
@@ -29,6 +29,7 @@
import org.apache.uima.aae.controller.BaseAnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.ErrorHandlerBase;
@@ -83,10 +84,10 @@
if ( endpoint != null && aController instanceof AggregateAnalysisEngineController )
{
- Threshold threshold = super.getThreshold(endpoint, delegateMap, aController);
+ Threshold threshold = super.getThreshold(endpoint, delegateMap, aController);
String key = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(endpoint.getEndpoint());
- // If threshold is not defined, assume action=terminate
- if ( threshold == null || threshold.getMaxRetries() == 0 ||
+ Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
+ if ( delegate.isAwaitingPingReply() || threshold == null || threshold.getMaxRetries() == 0 ||
( super.retryLastCommand(AsynchAEMessage.GetMeta, endpoint, aController, key, threshold, anErrorContext) == false )
)
{
@@ -99,7 +100,12 @@
new Object[] {aController.getComponentName(), endpoint.getEndpoint()});
}
aController.terminate();
- aController.notifyListenersWithInitializationStatus((Exception)t);
+ // Notify if the error occurred during initialization of the service.
+ // If the ping times out, there is no need to notify the listener. We
+ // use getMeta request as a ping to check if the service is running.
+ if ( !delegate.isAwaitingPingReply() ) {
+ aController.notifyListenersWithInitializationStatus((Exception)t);
+ }
}
else
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Thu Jan 22 09:02:21 2009
@@ -290,8 +290,12 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, getClass().getName(), "handleError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_threshold_for_endpoint__CONFIG", new Object[] { aController.getComponentName(), "Process", key });
}
}
- Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
- delegate.removeCasFromOutstandingList(casReferenceId);
+ if ( key != null ) {
+ // Received reply from the delegate. Remove the CAS from the
+ // delegate's list of CASes pending reply
+ Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
+ delegate.removeCasFromOutstandingList(casReferenceId);
+ }
}
else
{
@@ -416,14 +420,22 @@
flowControllerContinueFlag =
((AggregateAnalysisEngineController) aController).continueOnError(casReferenceId, key, (Exception) t );
}
- catch( Exception exc) {}
+ catch( Exception exc)
+ {
+ exc.printStackTrace();
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "handleError",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", exc);
+ }
+ }
}
// Check if the caller has already decremented number of subordinates. This property is only
// set in the Aggregate's finalStep() method before the CAS is sent back to the client. If
// there was a problem sending the CAS to the client, we dont want to update the counter
// again. If an exception is reported elsewhere ( not in finalStep()), the default action is
// to decrement the number of subordinates associated with the parent CAS.
- if ( !anErrorContext.containsKey(AsynchAEMessage.SkipSubordinateCountUpdate))
+ if (!flowControllerContinueFlag && !anErrorContext.containsKey(AsynchAEMessage.SkipSubordinateCountUpdate))
{
// Check if the CAS is a subordinate (has parent CAS).
if ( casStateEntry != null && casStateEntry.isSubordinate())
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java Thu Jan 22 09:02:21 2009
@@ -66,11 +66,35 @@
Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
if ( delegate.getEndpoint().isRemote() ) {
delegate.cancelDelegateTimer();
+ delegate.setState(Delegate.OK_STATE);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
"handle", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cancelled_timer_FINE", new Object[] { getController().getComponentName(), delegateKey });
}
+ String casReferenceId = null;
+ // Check if the GetMeta reply was actually a PING message to check
+ // delegate's availability. This would be the case if the delegate
+ // has previously timed out waiting for Process CAS reply.
+ if ( delegate.isAwaitingPingReply() && delegate.getState() == Delegate.OK_STATE) {
+ // Since this is a reply to a ping we may have delayed some
+ // CASes waiting for the ping to come back. Drain the list
+ // of delayed CASes and send each CAS to the delegate.
+ while ( (casReferenceId = delegate.removeOldestFromPendingDispatchList() ) != null ) {
+ ((AggregateAnalysisEngineController)getController()).
+ retryLastCommand(AsynchAEMessage.Process, delegate.getEndpoint(), casReferenceId);
+ }
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
+ "handle", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_aggregate_rcvd_ping_reply__FINE", new Object[] { getController().getComponentName(), delegateKey });
+ }
+ // Reset delegate flag to indicate that the ping reply was received
+ delegate.resetAwaitingPingReply();
+ // No need to merge typesystem. We've received a reply to a ping
+ return;
+ }
}
if (AsynchAEMessage.Exception == payload)
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Thu Jan 22 09:02:21 2009
@@ -164,7 +164,6 @@
}
String delegateKey =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
-
boolean casRemovedFromOutstandingList = delegate.removeCasFromOutstandingList(casReferenceId);
// Check if this process reply message is expected. A message is expected if the Cas Id
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=736711&r1=736710&r2=736711&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties Thu Jan 22 09:02:21 2009
@@ -193,5 +193,10 @@
UIMAEE_restart_timer_FINE = Controller: {0} Restarted Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} ms
UIMAEE_increment_retry_count_FINE = Controller: {0} Incremented Retry Count For Delegate: {1} Cas Id: {2} Current Retry Count: {3}
UIMAEE_cancelled_timer_FINE = Controller: {0} Received GetMeta Reply From Delegate: {1} Cancelling Timer
-
-
+UIMAEE_add_cas_to_delegate_pending_dispatch_FINE = Controller: {0} Added CAS To Delegates List Of CASes Pending Dispatch. Delegate: {1} Cas Id: {2} List Size After Add: {3}
+UIMAEE_removed_cas_from_delegate_pending_dispatch_list__FINE = Controller: {0} Removed CAS Entry From Delegates List Of CASes Pending Dispatch. Delegate: {1} Cas Id: {2} List Size After Remove: {3}
+UIMAEE_aggregate_sending_ping__FINE = Controller: {0} Sending Ping Message (GetMeta) To Check Delegate: {1} Availability.
+UIMAEE_aggregate_rcvd_ping_reply__FINE = Controller: {0} Received Ping Message (GetMeta) Reply From Delegate: {1}.
+UIMAEE_dump_cas_pending_dispatch__FINE = Controller: {0} Cas Id: {1} Pending Dispatch To Delegate: {2}
+UIMAEE_dump_cas_pending_reply__FINE = Controller: {0} Cas Id: {1} Pending Reply From Delegate: {2}
+UIMAEE_force_cas_timeout__INFO = Controller: {0} Disabled Delegate: {1}. Forcing Timeout Of CAS: {2} Found In : {3}
\ No newline at end of file