You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/08/06 22:07:12 UTC

svn commit: r1511109 - in /uima/uima-as/trunk/uimaj-as-core/src/main: java/org/apache/uima/aae/delegate/Delegate.java resources/uimaee_messages.properties

Author: cwiklik
Date: Tue Aug  6 20:07:12 2013
New Revision: 1511109

URL: http://svn.apache.org/r1511109
Log:
UIMA-3155 Support timer per CAS

Modified:
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
    uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1511109&r1=1511108&r2=1511109&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Tue Aug  6 20:07:12 2013
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
@@ -95,6 +96,7 @@ public abstract class Delegate {
 
   public abstract String enrichProcessCASTimeoutMessage(int aCommand, String casReferenceId, long timeToWait, String timeoutMessage);
 
+  private AtomicLong getMetaReceiptTime = new AtomicLong();
   
   public Endpoint getNotificationEndpoint() {
     return notificationEndpoint;
@@ -166,7 +168,7 @@ public abstract class Delegate {
       if (!outstandingCasList.isEmpty()) {
         // Get the oldest entry
         entry = outstandingCasList.get(0);
-        if (entry != null) {
+        if (entry != null && !entry.usesDedicatedTimerThread ) {
           restartTimerForCas(entry);
         }
       }
@@ -181,8 +183,10 @@ public abstract class Delegate {
   private void restartTimerForCas(DelegateEntry entry) {
     if (getCasProcessTimeout() > 0) {
       entry.incrementRetryCount();
+      entry.setCommand(AsynchAEMessage.Process);
       // restart timer for retry
-      startDelegateTimer(entry.getCasReferenceId(), AsynchAEMessage.Process);
+//      startDelegateTimer(entry.getCasReferenceId(), AsynchAEMessage.Process);
+      startDelegateTimer(entry);
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(
                 Level.FINE,
@@ -218,15 +222,15 @@ public abstract class Delegate {
     return pendingDispatchList;
   }
 
-  public void addNewCasToOutstandingList(String aCasReferenceId) {
-    addNewCasToOutstandingList(aCasReferenceId, false,0);
+  public void addNewCasToOutstandingList(String aCasReferenceId, boolean useTimerThreadPerCAS) {
+    addNewCasToOutstandingList(aCasReferenceId, false,0,useTimerThreadPerCAS);
   }
 
-  public void addNewCasToOutstandingList(String aCasReferenceId, boolean isCasGeneratingChildren, int casHashCode) {
+  public void addNewCasToOutstandingList(String aCasReferenceId, boolean isCasGeneratingChildren, int casHashCode, boolean useTimerThreadPerCAS) {
     synchronized (outstandingCasList) {
       DelegateEntry entry = null;
       if ((entry = lookupEntry(aCasReferenceId, outstandingCasList)) == null) {
-        entry = new DelegateEntry(aCasReferenceId);
+        entry = new DelegateEntry(aCasReferenceId, useTimerThreadPerCAS);
         entry.setCasHashCode(String.valueOf(casHashCode));
         // Remember the command
         entry.setCommand(AsynchAEMessage.Process);
@@ -247,7 +251,7 @@ public abstract class Delegate {
    *          - CAS ID to add to pending list if not already there
    * 
    */
-  public void addCasToOutstandingList(String aCasReferenceId, int casHashcode) {
+  public void addCasToOutstandingList(String aCasReferenceId, int casHashcode, boolean useTimerThreadPerCAS) {
     synchronized (outstandingCasList) {
       DelegateEntry entry = null;
       // Check if the outstanding list already contains entry for the Cas Id. If it does, retry
@@ -255,16 +259,25 @@ public abstract class Delegate {
       // is calling this method. Increment number of retries and restart the timer.
       if (!outstandingCasList.isEmpty()
               && (entry = lookupEntry(aCasReferenceId, outstandingCasList)) != null) {
-        restartTimerForCas(entry);
+    	  if ( getCasProcessTimeout() > 0 ) {
+        	  entry.setCasHashCode(String.valueOf(casHashcode));
+
+        	  if ( useTimerThreadPerCAS ) {
+                  startDelegateTimer(entry);
+        	  } else {
+            	  restartTimerForCas(entry);
+        	  }
+    	  }
       } else {
         // Create a new entry to be stored in the list of CASes pending reply
-        entry = new DelegateEntry(aCasReferenceId);
+        entry = new DelegateEntry(aCasReferenceId, useTimerThreadPerCAS);
         // Remember the command
         entry.setCommand(AsynchAEMessage.Process);
         entry.setCasHashCode(String.valueOf(casHashcode));
         // Start delegate timer if the pending list is empty
-        if (outstandingCasList.isEmpty() && getCasProcessTimeout() > 0) {
-          startDelegateTimer(aCasReferenceId, AsynchAEMessage.Process);
+        if ( getCasProcessTimeout() > 0 && (useTimerThreadPerCAS || outstandingCasList.isEmpty())) {
+//          startDelegateTimer(aCasReferenceId, AsynchAEMessage.Process);
+            startDelegateTimer(entry);
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(
                     Level.FINE,
@@ -273,9 +286,10 @@ public abstract class Delegate {
                     UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                     "UIMAEE_start_timer_FINE",
                     new Object[] { getComponentName(), delegateKey, aCasReferenceId,
-                        getCasProcessTimeout() });
+                        getCasProcessTimeout()});
           }
         }
+        
         // Append Cas Entry to the end of the list
         outstandingCasList.add(entry);
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -304,13 +318,13 @@ public abstract class Delegate {
    * @param aCasReferenceId
    *          - CAS ID to add to the delayed list
    */
-  public int addCasToPendingDispatchList(String aCasReferenceId, long casHashCode) {
+  public int addCasToPendingDispatchList(String aCasReferenceId, long casHashCode, boolean useTimerThreadPerCAS) {
     synchronized (pendingDispatchList) {
       
       DelegateEntry entry = null;
       // Create a new entry to be stored in the list of CASes pending
       // dispatch
-      entry = new DelegateEntry(aCasReferenceId);
+      entry = new DelegateEntry(aCasReferenceId, useTimerThreadPerCAS);
       entry.setCasHashCode(String.valueOf(casHashCode));
       // Remember the command
       entry.setCommand(AsynchAEMessage.Process);
@@ -550,9 +564,15 @@ public abstract class Delegate {
     // Before removing the entry check if this is the oldest in the list. This will be
     // used to determine if we need to restart the delegate timer
     DelegateEntry oldestEntry = outstandingCasList.get(0);
-    boolean doStartDelegateTimer = oldestEntry.equals(aDelegateEntry) && getCasProcessTimeout() > 0;
+    boolean doStartDelegateTimer = false; //oldestEntry.equals(aDelegateEntry) && getCasProcessTimeout() > 0;
     outstandingCasList.remove(aDelegateEntry);
-
+    if ( getCasProcessTimeout() > 0 ) {
+        if ( aDelegateEntry.usesDedicatedTimerThread) {
+        	aDelegateEntry.getDelegateTimer().cancel();
+        } else {
+        	doStartDelegateTimer = oldestEntry.equals(aDelegateEntry);
+        }
+    }
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(
               Level.FINE,
@@ -572,7 +592,8 @@ public abstract class Delegate {
         DelegateEntry delegateEntry = outstandingCasList.get(0);
 
         // Restart the timer for the oldest CAS in the list
-        startDelegateTimer(delegateEntry.getCasReferenceId(), delegateEntry.getCommand());
+//        startDelegateTimer(delegateEntry.getCasReferenceId(), delegateEntry.getCommand());
+        startDelegateTimer(delegateEntry);
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(
                   Level.FINE,
@@ -630,13 +651,18 @@ public abstract class Delegate {
    * Cancels current timer
    */
   public void cancelDelegateGetMetaTimer() {
-   synchronized( getMetaTimerLock ) {
+	  getMetaReceiptTime.set(System.currentTimeMillis());
+	  synchronized( getMetaTimerLock ) {
+	   
      if (getMetaTimer != null) {
        getMetaTimer.cancel();
        getMetaTimer.purge();
      }
    }
   }
+  public long getMetaReceiptTime() {
+	  return getMetaReceiptTime.get();
+  }
   /**
    * Returns a timeout value for a given command type. The values are defined in the deployment
    * descriptor
@@ -680,21 +706,35 @@ public abstract class Delegate {
    * @param aCommand
    *          - command for which the timer is started
    */
-  private void startDelegateTimer(final String aCasReferenceId, final int aCommand) {
+//  private void startDelegateTimer(final String aCasReferenceId, final int aCommand) {
+  private void startDelegateTimer(final DelegateEntry delegateEntry) {
+	  final String aCasReferenceId = delegateEntry.getCasReferenceId();
+	  final int aCommand = delegateEntry.getCommand();
     synchronized( timerLock ) {
       final long timeToWait = getTimeoutValueForCommand(aCommand);
       Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);
       timer = new DelegateTimer("Controller:" + getComponentName() + ":Request TimerThread-Endpoint_impl:"
               + endpoint + ":" + System.nanoTime() + ":Cmd:" + aCommand, true, aCasReferenceId,this);
       final Delegate delegate = this;
+      
+      if ( delegateEntry.usesDedicatedTimerThread) {
+    	  // Save timer so that we can cancel it when a reply comes
+    	  delegateEntry.setDelegateTimer(timer);
+      } 
       timer.schedule(new TimerTask() {
         public void run() {
-          delegate.setState(TIMEOUT_STATE);
+        	if ( delegateEntry.usesDedicatedTimerThread) {
+          	  delegateEntry.getDelegateTimer().cancel();
+            }
+          
           ErrorContext errorContext = new ErrorContext();
           errorContext.add(AsynchAEMessage.Command, aCommand);
           String enrichedMessage = enrichProcessCASTimeoutMessage(aCommand, aCasReferenceId,timeToWait,"Delegate Service:"+delegateKey+" Has Timed Out While Processing CAS:"+aCasReferenceId );
           Exception cause = new MessageTimeoutException(enrichedMessage);
           if (AsynchAEMessage.Process == aCommand) {
+        	  if ( delegate.getMetaReceiptTime() == 0 || ( (delegate.getMetaReceiptTime() + timeToWait) < System.currentTimeMillis() )) {
+            	  delegate.setState(TIMEOUT_STATE);
+        	  }
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
                       "Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -716,6 +756,7 @@ public abstract class Delegate {
             }
 */            
           } else if (AsynchAEMessage.GetMeta == aCommand) {
+        	  delegate.setState(TIMEOUT_STATE);
             if ( aCasReferenceId != null ) {  // true on GetMeta Ping timeout
                 errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
                 errorContext.add(AsynchAEMessage.ErrorCause, AsynchAEMessage.PingTimeout);
@@ -727,18 +768,27 @@ public abstract class Delegate {
                       new Object[] { delegate.getKey(), timeToWait });
             }
           } else if (AsynchAEMessage.CollectionProcessComplete == aCommand) {
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+        	  delegate.setState(TIMEOUT_STATE);
+        	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
                       "Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                       "UIMAEE_cpc_timeout_no_reply__WARNING",
                       new Object[] { delegate.getKey(), timeToWait });
             }
 
+          } else {
+        	  delegate.setState(TIMEOUT_STATE);
           }
           errorContext.add(AsynchAEMessage.Endpoint, getEndpoint());
           handleError(cause, errorContext);
         }
       }, timeToRun);
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
+              "startDelegateTimer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+              "UIMAEE_timer_started_FINE",
+              new Object[] {  aCasReferenceId, timeToWait, delegateEntry.getCasHashCode(), Thread.currentThread().getId(), Thread.currentThread().getName()  });
+      }
     }
   }
   /**
@@ -870,6 +920,25 @@ public abstract class Delegate {
 
     private String casHashCode;
     
+    private DelegateTimer timer;
+    
+    private volatile boolean usesDedicatedTimerThread;
+    
+    public DelegateEntry(String aCasReferenceId, boolean usesDedicatedTimerThread) {
+        casReferenceId = aCasReferenceId;
+        this.usesDedicatedTimerThread = usesDedicatedTimerThread;
+      }
+   
+    public boolean usesDedicatedTimerThread() {
+    	return usesDedicatedTimerThread;
+    }
+    public void setDelegateTimer( DelegateTimer timer) {
+    	this.timer = timer;
+    }
+    public DelegateTimer getDelegateTimer() {
+    	return timer;
+    }
+    
     public String getCasHashCode() {
       return casHashCode;
     }
@@ -878,9 +947,7 @@ public abstract class Delegate {
       this.casHashCode = casHashCode;
     }
 
-    public DelegateEntry(String aCasReferenceId) {
-      casReferenceId = aCasReferenceId;
-    }
+   
 
     public boolean isGeneratingChildren() {
       return generatingChildren;

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1511109&r1=1511108&r2=1511109&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Tue Aug  6 20:07:12 2013
@@ -189,8 +189,8 @@ UIMAEE_next_step_dispatch_completed__FIN
 UIMAEE_cas_not_found__INFO= Controller: {0} Cas: {1} Not Found In {2}. 
 UIMAEE_removed_cas_from_delegate_list__FINE = Controller: {0} Removed CAS From Delegates List Of CASes Pending Reply. Delegate: {1} Cas Id: {2} List Size After Remove: {3}
 UIMAEE_add_cas_to_delegate_pending_reply_FINE = Controller: {0} Added CAS To Delegates List Of CASes Pending Reply. Delegate: {1} Cas Id: {2} List Size After Add: {3}
-UIMAEE_start_timer_FINE = Controller: {0} Started Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} ms
-UIMAEE_restart_timer_FINE = Controller: {0} Restarted Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} ms
+UIMAEE_start_timer_FINE = Controller: {0} Started Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} 
+UIMAEE_restart_timer_FINE = Controller: {0} Restarted Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} ms 
 UIMAEE_increment_retry_count_FINE = Controller: {0} Incremented Retry Count For Delegate: {1} Cas Id: {2} Current Retry Count: {3}
 UIMAEE_cancelled_timer_FINE = Controller: {0} Received GetMeta Reply From Delegate: {1} Cancelling Timer
 UIMAEE_add_cas_to_delegate_pending_dispatch_WARNING = Controller: {0} Added CAS {1} (Hashcode: {2}) To a List Of CASes Pending Dispatch. The Controller Has Previously Timed-out Waiting For a Reply From {3} Service. The Submitted CAS Will Remain in Pending Dispatch List Until GetMeta Ping Succeeds or GetMet Ping Times out. List Size Before Add: {4}
@@ -257,3 +257,4 @@ UIMAEE_ae_instance_destroy_called__INFO=
 UIMAEE_uncaught_error_WARNING=Controller: {0} Handling uncaught Throwable. Stack trace:\n {1}
 UIMAEE_terminal_error_WARNING=Controller:{0} Exiting via System.exit(2) Due to unrecoverable error
 UIMAEE_drop_cas_debug_FINEST=Controller:{0} Drop:{1} CAS:{2} ReplyReceived:{3}
+UIMAEE_timer_started_FINE=Timer Started For CAS: {0} Timeout Value:{1} Timer Thread ID:{2} Timer Thread Name:{3}
\ No newline at end of file