You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/03/31 20:57:07 UTC
svn commit: r1737271 -
/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
Author: cwiklik
Date: Thu Mar 31 18:57:07 2016
New Revision: 1737271
URL: http://svn.apache.org/viewvc?rev=1737271&view=rev
Log:
UIMA-4881 modified to use one thread to retry WI request
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1737271&r1=1737270&r2=1737271&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Thu Mar 31 18:57:07 2016
@@ -71,9 +71,58 @@ public class HttpWorkerThread implements
HttpWorkerThread.maxFrameworkFailures = maxFrameworkFailures;
maxFrameworkErrors = maxFrameworkFailures.get();
}
+
+ public IMetaCasTransaction getWork(PostMethod postMethod, int major, int minor) throws Exception {
+ String command="";
+
+ IMetaCasTransaction transaction = new MetaCasTransaction();
+ try {
+ TransactionId tid = new TransactionId(major, minor);
+ transaction.setTransactionId(tid);
+ // According to HTTP spec, GET may not contain Body in
+ // HTTP request. HttpClient actually enforces this. So
+ // do a POST instead of a GET.
+ transaction.setType(Type.Get); // Tell JD you want a Work Item
+ command = Type.Get.name();
+ logger.debug("HttpWorkerThread.run()", null, "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
+ // send a request to JD and wait for a reply
+ transaction = httpClient.execute(transaction, postMethod);
+ // The JD may not provide a Work Item to process.
+ if ( transaction.getMetaCas()!= null) {
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
+ // Confirm receipt of the CAS.
+ transaction.setType(Type.Ack);
+ command = Type.Ack.name();
+ tid = new TransactionId(major, minor++);
+ transaction.setTransactionId(tid);
+ logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Sending ACK request - WI:"+transaction.getMetaCas().getSystemKey());
+ transaction = httpClient.execute(transaction, postMethod);
+ if ( transaction.getMetaCas() == null) {
+ // this can be the case when a JD receives ACK late
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd, however there is no MetaCas. The JD Cancelled the transaction");
+ } else {
+ logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd");
+ }
+
+ }
+ } catch( SocketTimeoutException e) {
+ logger.warn("run", null, "Timed Out While Awaiting Response from JD for "+command+" Request - Retrying ...");
+ System.out.println("Time Out While Waiting For a Reply from JD For "+command+" Request");
+ }
+ return transaction;
+
+ }
+
+ private void waitAwhile(long sleepTime) throws InterruptedException {
+ synchronized (monitor) {
+ // There is no CAS. It looks like the JD CR is done but there
+ // are still WIs being processed. Slow down the rate of requests
+ monitor.wait(sleepTime);
+ }
+ }
@SuppressWarnings("unchecked")
public void run() {
- String command="";
+ //String command="";
PostMethod postMethod = null;
logger.info("HttpWorkerThread.run()", null, "Starting JP Process Thread Id:"+Thread.currentThread().getId());
Method processMethod = null;
@@ -82,7 +131,7 @@ public class HttpWorkerThread implements
// ***** DEPLOY ANALYTICS ***********
// First, deploy analytics in a provided process container. Use java reflection to call
// deploy method. The process container has been instantiated in the main thread and
- // loaded from ducc-user jar provided in system classpath
+ // loaded from ducc-user j ar provided in system classpath
try {
processMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("process", Object.class);
getKeyMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("getKey", String.class);
@@ -154,56 +203,35 @@ public class HttpWorkerThread implements
try {
int major = IdGenerator.addAndGet(1);
int minor = 0;
-
- IMetaCasTransaction transaction = new MetaCasTransaction();
- TransactionId tid = new TransactionId(major, minor);
- transaction.setTransactionId(tid);
- // According to HTTP spec, GET may not contain Body in
- // HTTP request. HttpClient actually enforces this. So
- // do a POST instead of a GET.
- transaction.setType(Type.Get); // Tell JD you want a Work Item
- command = Type.Get.name();
- logger.debug("HttpWorkerThread.run()", null, "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
- // send a request to JD and wait for a reply
- transaction = httpClient.execute(transaction, postMethod);
- // The JD may not provide a Work Item to process.
- if ( transaction.getMetaCas()!= null) {
- logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
- // Confirm receipt of the CAS.
- transaction.setType(Type.Ack);
- command = Type.Ack.name();
- tid = new TransactionId(major, minor++);
- transaction.setTransactionId(tid);
- logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Sending ACK request - WI:"+transaction.getMetaCas().getSystemKey());
- transaction = httpClient.execute(transaction, postMethod);
- if ( transaction.getMetaCas() == null) {
- // this can be the case when a JD receives ACK late
- logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd, however there is no MetaCas. The JD Cancelled the transaction");
- continue; // ask for more
- }
- logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd");
- } else {
- logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd JD Response, however there is no MetaCas. Sleeping for "+duccComponent.getThreadSleepTime());
- }
-
-
+ IMetaCasTransaction transaction = getWork(postMethod, major, minor);
+
// if the JD did not provide a Work Item, most likely the CR is
// done. In such case, reduce frequency of Get requests
// by sleeping in between Get's. Eventually the OR will
// deallocate this process and the thread will exit
if ( transaction.getMetaCas() == null || transaction.getMetaCas().getUserSpaceCas() == null) {
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd JD Response, however there is no MetaCas. Sleeping for "+duccComponent.getThreadSleepTime()+" and retrying");
+
// the JD says there are no more WIs. Sleep awhile
// do a GET in case JD changes its mind. The JP will
// eventually be stopped by the agent
- synchronized (monitor) {
- try {
- // There is no CAS. It looks like the JD CR is done but there
- // are still WIs being processed. Slow down the rate of requests
- monitor.wait(duccComponent.getThreadSleepTime());
- } catch (InterruptedException e) {
+
+ // use class level locking to block all but one thread to do retries.
+ // This is done to prevent flooding JD with retry requests
+ synchronized (HttpWorkerThread.class) {
+ while(duccComponent.isRunning() ) {
+ waitAwhile(duccComponent.getThreadSleepTime());
+ // just awoken, check if the JP is still in Running state
+ if ( duccComponent.isRunning()) {
+ transaction = getWork(postMethod, major, ++minor);
+ if ( transaction.getMetaCas() != null ) {
+ break;
+ }
+ }
}
}
- } else {
+ }
+ if ( duccComponent.isRunning()) {
boolean workItemFailed = false;
// process the Work item. Any exception here will cause the
// thread to terminate and also the JP to stop. The stopping
@@ -279,9 +307,10 @@ public class HttpWorkerThread implements
// Dont return serialized CAS to reduce the msg size
transaction.getMetaCas().setUserSpaceCas(null);
transaction.setType(Type.End);
- command = Type.End.name();
-
- tid = new TransactionId(major, minor++);
+ //String command = Type.End.name();
+
+ minor++; // getWork()
+ TransactionId tid = new TransactionId(major, minor++);
transaction.setTransactionId(tid);
httpClient.execute(transaction, postMethod); // Work Item Processed - End
@@ -334,10 +363,10 @@ public class HttpWorkerThread implements
break;
}
maxFrameworkFailures.set(maxFrameworkErrors); // reset framework failures on success
- }
- } catch( SocketTimeoutException e) {
- logger.warn("run", null, "Timed Out While Awaiting Response from JD for "+command+" Request - Retrying ...");
- System.out.println("Time Out While Waiting For a Reply from JD For "+command+" Request");
+ }
+ } catch( InterruptedException e) {
+ logger.error("run", null, "WorkerThread Interrupted - Terminating Thread "+Thread.currentThread().getId());
+ return;
}
catch (Exception e ) {
logger.error("run", null, e);