You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/08/06 22:07:12 UTC
svn commit: r1511109 - in /uima/uima-as/trunk/uimaj-as-core/src/main:
java/org/apache/uima/aae/delegate/Delegate.java
resources/uimaee_messages.properties
Author: cwiklik
Date: Tue Aug 6 20:07:12 2013
New Revision: 1511109
URL: http://svn.apache.org/r1511109
Log:
UIMA-3155 Support timer per CAS
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1511109&r1=1511108&r2=1511109&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Tue Aug 6 20:07:12 2013
@@ -24,6 +24,7 @@ import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
@@ -95,6 +96,7 @@ public abstract class Delegate {
public abstract String enrichProcessCASTimeoutMessage(int aCommand, String casReferenceId, long timeToWait, String timeoutMessage);
+ private AtomicLong getMetaReceiptTime = new AtomicLong();
public Endpoint getNotificationEndpoint() {
return notificationEndpoint;
@@ -166,7 +168,7 @@ public abstract class Delegate {
if (!outstandingCasList.isEmpty()) {
// Get the oldest entry
entry = outstandingCasList.get(0);
- if (entry != null) {
+ if (entry != null && !entry.usesDedicatedTimerThread ) {
restartTimerForCas(entry);
}
}
@@ -181,8 +183,10 @@ public abstract class Delegate {
private void restartTimerForCas(DelegateEntry entry) {
if (getCasProcessTimeout() > 0) {
entry.incrementRetryCount();
+ entry.setCommand(AsynchAEMessage.Process);
// restart timer for retry
- startDelegateTimer(entry.getCasReferenceId(), AsynchAEMessage.Process);
+// startDelegateTimer(entry.getCasReferenceId(), AsynchAEMessage.Process);
+ startDelegateTimer(entry);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -218,15 +222,15 @@ public abstract class Delegate {
return pendingDispatchList;
}
- public void addNewCasToOutstandingList(String aCasReferenceId) {
- addNewCasToOutstandingList(aCasReferenceId, false,0);
+ public void addNewCasToOutstandingList(String aCasReferenceId, boolean useTimerThreadPerCAS) {
+ addNewCasToOutstandingList(aCasReferenceId, false,0,useTimerThreadPerCAS);
}
- public void addNewCasToOutstandingList(String aCasReferenceId, boolean isCasGeneratingChildren, int casHashCode) {
+ public void addNewCasToOutstandingList(String aCasReferenceId, boolean isCasGeneratingChildren, int casHashCode, boolean useTimerThreadPerCAS) {
synchronized (outstandingCasList) {
DelegateEntry entry = null;
if ((entry = lookupEntry(aCasReferenceId, outstandingCasList)) == null) {
- entry = new DelegateEntry(aCasReferenceId);
+ entry = new DelegateEntry(aCasReferenceId, useTimerThreadPerCAS);
entry.setCasHashCode(String.valueOf(casHashCode));
// Remember the command
entry.setCommand(AsynchAEMessage.Process);
@@ -247,7 +251,7 @@ public abstract class Delegate {
* - CAS ID to add to pending list if not already there
*
*/
- public void addCasToOutstandingList(String aCasReferenceId, int casHashcode) {
+ public void addCasToOutstandingList(String aCasReferenceId, int casHashcode, boolean useTimerThreadPerCAS) {
synchronized (outstandingCasList) {
DelegateEntry entry = null;
// Check if the outstanding list already contains entry for the Cas Id. If it does, retry
@@ -255,16 +259,25 @@ public abstract class Delegate {
// is calling this method. Increment number of retries and restart the timer.
if (!outstandingCasList.isEmpty()
&& (entry = lookupEntry(aCasReferenceId, outstandingCasList)) != null) {
- restartTimerForCas(entry);
+ if ( getCasProcessTimeout() > 0 ) {
+ entry.setCasHashCode(String.valueOf(casHashcode));
+
+ if ( useTimerThreadPerCAS ) {
+ startDelegateTimer(entry);
+ } else {
+ restartTimerForCas(entry);
+ }
+ }
} else {
// Create a new entry to be stored in the list of CASes pending reply
- entry = new DelegateEntry(aCasReferenceId);
+ entry = new DelegateEntry(aCasReferenceId, useTimerThreadPerCAS);
// Remember the command
entry.setCommand(AsynchAEMessage.Process);
entry.setCasHashCode(String.valueOf(casHashcode));
// Start delegate timer if the pending list is empty
- if (outstandingCasList.isEmpty() && getCasProcessTimeout() > 0) {
- startDelegateTimer(aCasReferenceId, AsynchAEMessage.Process);
+ if ( getCasProcessTimeout() > 0 && (useTimerThreadPerCAS || outstandingCasList.isEmpty())) {
+// startDelegateTimer(aCasReferenceId, AsynchAEMessage.Process);
+ startDelegateTimer(entry);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -273,9 +286,10 @@ public abstract class Delegate {
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_start_timer_FINE",
new Object[] { getComponentName(), delegateKey, aCasReferenceId,
- getCasProcessTimeout() });
+ getCasProcessTimeout()});
}
}
+
// Append Cas Entry to the end of the list
outstandingCasList.add(entry);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -304,13 +318,13 @@ public abstract class Delegate {
* @param aCasReferenceId
* - CAS ID to add to the delayed list
*/
- public int addCasToPendingDispatchList(String aCasReferenceId, long casHashCode) {
+ public int addCasToPendingDispatchList(String aCasReferenceId, long casHashCode, boolean useTimerThreadPerCAS) {
synchronized (pendingDispatchList) {
DelegateEntry entry = null;
// Create a new entry to be stored in the list of CASes pending
// dispatch
- entry = new DelegateEntry(aCasReferenceId);
+ entry = new DelegateEntry(aCasReferenceId, useTimerThreadPerCAS);
entry.setCasHashCode(String.valueOf(casHashCode));
// Remember the command
entry.setCommand(AsynchAEMessage.Process);
@@ -550,9 +564,15 @@ public abstract class Delegate {
// Before removing the entry check if this is the oldest in the list. This will be
// used to determine if we need to restart the delegate timer
DelegateEntry oldestEntry = outstandingCasList.get(0);
- boolean doStartDelegateTimer = oldestEntry.equals(aDelegateEntry) && getCasProcessTimeout() > 0;
+ boolean doStartDelegateTimer = false; //oldestEntry.equals(aDelegateEntry) && getCasProcessTimeout() > 0;
outstandingCasList.remove(aDelegateEntry);
-
+ if ( getCasProcessTimeout() > 0 ) {
+ if ( aDelegateEntry.usesDedicatedTimerThread) {
+ aDelegateEntry.getDelegateTimer().cancel();
+ } else {
+ doStartDelegateTimer = oldestEntry.equals(aDelegateEntry);
+ }
+ }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -572,7 +592,8 @@ public abstract class Delegate {
DelegateEntry delegateEntry = outstandingCasList.get(0);
// Restart the timer for the oldest CAS in the list
- startDelegateTimer(delegateEntry.getCasReferenceId(), delegateEntry.getCommand());
+// startDelegateTimer(delegateEntry.getCasReferenceId(), delegateEntry.getCommand());
+ startDelegateTimer(delegateEntry);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -630,13 +651,18 @@ public abstract class Delegate {
* Cancels current timer
*/
public void cancelDelegateGetMetaTimer() {
- synchronized( getMetaTimerLock ) {
+ getMetaReceiptTime.set(System.currentTimeMillis());
+ synchronized( getMetaTimerLock ) {
+
if (getMetaTimer != null) {
getMetaTimer.cancel();
getMetaTimer.purge();
}
}
}
+ public long getMetaReceiptTime() {
+ return getMetaReceiptTime.get();
+ }
/**
* Returns a timeout value for a given command type. The values are defined in the deployment
* descriptor
@@ -680,21 +706,35 @@ public abstract class Delegate {
* @param aCommand
* - command for which the timer is started
*/
- private void startDelegateTimer(final String aCasReferenceId, final int aCommand) {
+// private void startDelegateTimer(final String aCasReferenceId, final int aCommand) {
+ private void startDelegateTimer(final DelegateEntry delegateEntry) {
+ final String aCasReferenceId = delegateEntry.getCasReferenceId();
+ final int aCommand = delegateEntry.getCommand();
synchronized( timerLock ) {
final long timeToWait = getTimeoutValueForCommand(aCommand);
Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);
timer = new DelegateTimer("Controller:" + getComponentName() + ":Request TimerThread-Endpoint_impl:"
+ endpoint + ":" + System.nanoTime() + ":Cmd:" + aCommand, true, aCasReferenceId,this);
final Delegate delegate = this;
+
+ if ( delegateEntry.usesDedicatedTimerThread) {
+ // Save timer so that we can cancel it when a reply comes
+ delegateEntry.setDelegateTimer(timer);
+ }
timer.schedule(new TimerTask() {
public void run() {
- delegate.setState(TIMEOUT_STATE);
+ if ( delegateEntry.usesDedicatedTimerThread) {
+ delegateEntry.getDelegateTimer().cancel();
+ }
+
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, aCommand);
String enrichedMessage = enrichProcessCASTimeoutMessage(aCommand, aCasReferenceId,timeToWait,"Delegate Service:"+delegateKey+" Has Timed Out While Processing CAS:"+aCasReferenceId );
Exception cause = new MessageTimeoutException(enrichedMessage);
if (AsynchAEMessage.Process == aCommand) {
+ if ( delegate.getMetaReceiptTime() == 0 || ( (delegate.getMetaReceiptTime() + timeToWait) < System.currentTimeMillis() )) {
+ delegate.setState(TIMEOUT_STATE);
+ }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -716,6 +756,7 @@ public abstract class Delegate {
}
*/
} else if (AsynchAEMessage.GetMeta == aCommand) {
+ delegate.setState(TIMEOUT_STATE);
if ( aCasReferenceId != null ) { // true on GetMeta Ping timeout
errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
errorContext.add(AsynchAEMessage.ErrorCause, AsynchAEMessage.PingTimeout);
@@ -727,18 +768,27 @@ public abstract class Delegate {
new Object[] { delegate.getKey(), timeToWait });
}
} else if (AsynchAEMessage.CollectionProcessComplete == aCommand) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ delegate.setState(TIMEOUT_STATE);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cpc_timeout_no_reply__WARNING",
new Object[] { delegate.getKey(), timeToWait });
}
+ } else {
+ delegate.setState(TIMEOUT_STATE);
}
errorContext.add(AsynchAEMessage.Endpoint, getEndpoint());
handleError(cause, errorContext);
}
}, timeToRun);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(),
+ "startDelegateTimer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_timer_started_FINE",
+ new Object[] { aCasReferenceId, timeToWait, delegateEntry.getCasHashCode(), Thread.currentThread().getId(), Thread.currentThread().getName() });
+ }
}
}
/**
@@ -870,6 +920,25 @@ public abstract class Delegate {
private String casHashCode;
+ private DelegateTimer timer;
+
+ private volatile boolean usesDedicatedTimerThread;
+
+ public DelegateEntry(String aCasReferenceId, boolean usesDedicatedTimerThread) {
+ casReferenceId = aCasReferenceId;
+ this.usesDedicatedTimerThread = usesDedicatedTimerThread;
+ }
+
+ public boolean usesDedicatedTimerThread() {
+ return usesDedicatedTimerThread;
+ }
+ public void setDelegateTimer( DelegateTimer timer) {
+ this.timer = timer;
+ }
+ public DelegateTimer getDelegateTimer() {
+ return timer;
+ }
+
public String getCasHashCode() {
return casHashCode;
}
@@ -878,9 +947,7 @@ public abstract class Delegate {
this.casHashCode = casHashCode;
}
- public DelegateEntry(String aCasReferenceId) {
- casReferenceId = aCasReferenceId;
- }
+
public boolean isGeneratingChildren() {
return generatingChildren;
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1511109&r1=1511108&r2=1511109&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Tue Aug 6 20:07:12 2013
@@ -189,8 +189,8 @@ UIMAEE_next_step_dispatch_completed__FIN
UIMAEE_cas_not_found__INFO= Controller: {0} Cas: {1} Not Found In {2}.
UIMAEE_removed_cas_from_delegate_list__FINE = Controller: {0} Removed CAS From Delegates List Of CASes Pending Reply. Delegate: {1} Cas Id: {2} List Size After Remove: {3}
UIMAEE_add_cas_to_delegate_pending_reply_FINE = Controller: {0} Added CAS To Delegates List Of CASes Pending Reply. Delegate: {1} Cas Id: {2} List Size After Add: {3}
-UIMAEE_start_timer_FINE = Controller: {0} Started Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} ms
-UIMAEE_restart_timer_FINE = Controller: {0} Restarted Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} ms
+UIMAEE_start_timer_FINE = Controller: {0} Started Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3}
+UIMAEE_restart_timer_FINE = Controller: {0} Restarted Timer For Delegate: {1} Cas Id: {2} Expecting Reply In {3} ms
UIMAEE_increment_retry_count_FINE = Controller: {0} Incremented Retry Count For Delegate: {1} Cas Id: {2} Current Retry Count: {3}
UIMAEE_cancelled_timer_FINE = Controller: {0} Received GetMeta Reply From Delegate: {1} Cancelling Timer
UIMAEE_add_cas_to_delegate_pending_dispatch_WARNING = Controller: {0} Added CAS {1} (Hashcode: {2}) To a List Of CASes Pending Dispatch. The Controller Has Previously Timed-out Waiting For a Reply From {3} Service. The Submitted CAS Will Remain in Pending Dispatch List Until GetMeta Ping Succeeds or GetMet Ping Times out. List Size Before Add: {4}
@@ -257,3 +257,4 @@ UIMAEE_ae_instance_destroy_called__INFO=
UIMAEE_uncaught_error_WARNING=Controller: {0} Handling uncaught Throwable. Stack trace:\n {1}
UIMAEE_terminal_error_WARNING=Controller:{0} Exiting via System.exit(2) Due to unrecoverable error
UIMAEE_drop_cas_debug_FINEST=Controller:{0} Drop:{1} CAS:{2} ReplyReceived:{3}
+UIMAEE_timer_started_FINE=Timer Started For CAS: {0} Timeout Value:{1} Timer Thread ID:{2} Timer Thread Name:{3}
\ No newline at end of file