You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/01/22 16:23:36 UTC
svn commit: r1653912 -
/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java
Author: cwiklik
Date: Thu Jan 22 15:23:36 2015
New Revision: 1653912
URL: http://svn.apache.org/r1653912
Log:
UIMA-4066 Added transactionID support. Code cleanup. Added comments
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java?rev=1653912&r1=1653911&r2=1653912&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java Thu Jan 22 15:23:36 2015
@@ -27,6 +27,7 @@ import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.params.HttpMethodParams;
@@ -38,18 +39,19 @@ import org.apache.uima.ducc.container.ne
import org.apache.uima.ducc.container.net.iface.IPerformanceMetrics;
import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
+import org.apache.uima.ducc.container.net.impl.TransactionId;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public class HttpWorkerThread implements Runnable {
DuccLogger logger = new DuccLogger(HttpWorkerThread.class);
private DuccHttpClient httpClient = null;
-// private IUimaProcessor uimaProcessor;
private JobProcessComponent duccComponent;
private Object monitor = new Object();
private CountDownLatch workerThreadCount = null;
private CountDownLatch threadReadyCount = null;
private Object processorInstance = null;
-
+ private static AtomicInteger IdGenerator =
+ new AtomicInteger();
public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
Object processorInstance, CountDownLatch workerThreadCount,
CountDownLatch threadReadyCount) {
@@ -95,17 +97,17 @@ public class HttpWorkerThread implements
threadReadyCount.countDown(); // this thread is ready
// **************************************************************************
// now block and wait until all threads finish deploying and initializing
- // analytics in provided process container
+ // analytics in provided process container. Processing begins when
+ // all worker threads initialize their analytics.
// **************************************************************************
try {
- threadReadyCount.await();
+ threadReadyCount.await(); // wait for all analytics to initialize
} catch( Exception ie) {}
-
-// workerThreadCount.countDown();
-
if (!error) {
synchronized(JobProcessComponent.class) {
+ // change the state of this process and notify
+ // Ducc agent that the process is ready and running
duccComponent.setState(ProcessState.Running);
}
}
@@ -113,23 +115,28 @@ public class HttpWorkerThread implements
}
- // run forever (or until the process throws IllegalStateException
- logger.info("HttpWorkerThread.run()", null, "Processing Work Items - Thread Id:"+Thread.currentThread().getId());
+ logger.info("HttpWorkerThread.run()", null, "Begin Processing Work Items - Thread Id:"+Thread.currentThread().getId());
try {
-
- while (duccComponent.isRunning()) { //service.running && ctx.state().process(ctx)) {
+ // Enter process loop. Stop this thread on the first process error.
+ while (duccComponent.isRunning()) {
try {
+ int major = IdGenerator.addAndGet(1);
+ int minor = 0;
+
IMetaCasTransaction transaction = new MetaCasTransaction();
- //System.out.println("Requesting Work from JD");
+ TransactionId tid = new TransactionId(major, minor);
+ transaction.setTransactionId(tid);
// According to HTTP spec, GET may not contain Body in
// HTTP request. HttpClient actually enforces this. So
// do a POST instead of a GET.
- transaction.setType(Type.Get); // Tell JD you want a CAS
+ transaction.setType(Type.Get); // Tell JD you want a Work Item
command = Type.Get.name();
logger.debug("HttpWorkerThread.run()", null, "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
- transaction = httpClient.execute(transaction, postMethod);
- if ( transaction.getMetaCas()!= null) {
+ // send a request to JD and wait for a reply
+ transaction = httpClient.execute(transaction, postMethod);
+ // The JD may not provide a Work Item to process.
+ if ( transaction.getMetaCas()!= null) {
logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
} else {
logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd JD Response, however there is no MetaCas. Sleeping for "+duccComponent.getThreadSleepTime());
@@ -138,34 +145,43 @@ public class HttpWorkerThread implements
// Confirm receipt of the CAS.
transaction.setType(Type.Ack);
command = Type.Ack.name();
- httpClient.execute(transaction, postMethod); // Ready to process
+ tid = new TransactionId(major, minor++);
+ transaction.setTransactionId(tid);
+ httpClient.execute(transaction, postMethod);
+
logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Sent ACK");
- // if the JD did not provide a CAS, most likely the CR is
+ // if the JD did not provide a Work Item, most likely the CR is
// done. In such case, reduce frequency of Get requests
- // by sleeping in between Get's. Eventually the JD will
- // confirm that there is no more work and this thread
- // can exit.
+ // by sleeping in between Get's. Eventually the OR will
+ // deallocate this process and the thread will exit
if ( transaction.getMetaCas() == null || transaction.getMetaCas().getUserSpaceCas() == null) {
// the JD says there are no more WIs. Sleep awhile
// do a GET in case JD changes its mind. The JP will
// eventually be stopped by the agent
synchronized (monitor) {
try {
+ // There is no CAS. It looks like the JD CR is done but there
+ // are still WIs being processed. Slow down the rate of requests
monitor.wait(duccComponent.getThreadSleepTime());
} catch (InterruptedException e) {
}
}
- // There is no CAS. It looks like the JD CR is done but there
- // are still WIs being processed. Slow down the rate of requests
} else {
boolean workItemFailed = false;
- // process the CAS
+ // process the Work item. Any exception here will cause the
+ // thread to terminate and also the JP to stop. The stopping
+ // is orderly allowing each thread to finish processing of
+ // the current WI. Once the JP notifies the Agent of a problem
+ // the Agent will wait for 1 minute (default) before killing
+ // this process via kill -9
try {
+ // ********** PROCESS() **************
// using java reflection, call process to analyze the CAS
List<Properties> metrics = (List<Properties>)processMethod.
invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas());
-
+ // ***********************************
+
logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
IPerformanceMetrics metricsWrapper =
new PerformanceMetrics();
@@ -187,6 +203,8 @@ public class HttpWorkerThread implements
// RuntimeException->AnalysisEngineException.message
workItemFailed = true;
IMetaCas mc = transaction.getMetaCas();
+
+ // Fetch serialized exception as a blob
Method getLastSerializedErrorMethod = processorInstance.getClass().getDeclaredMethod("getLastSerializedError");
byte[] serializedException =
(byte[])getLastSerializedErrorMethod.invoke(processorInstance);
@@ -195,6 +213,7 @@ public class HttpWorkerThread implements
logger.info("run", null, "Work item processing failed - returning serialized exception to the JD");
} catch( Exception ee) {
workItemFailed = true;
+ // Serialize exception for the JD.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream( baos );
oos.writeObject( ee);
@@ -202,9 +221,14 @@ public class HttpWorkerThread implements
transaction.getMetaCas().setUserSpaceException(baos.toByteArray());
logger.error("run", null, ee);
}
+ // Dont return serialized CAS to reduce the msg size
transaction.getMetaCas().setUserSpaceCas(null);
transaction.setType(Type.End);
command = Type.End.name();
+
+ tid = new TransactionId(major, minor++);
+ transaction.setTransactionId(tid);
+
httpClient.execute(transaction, postMethod); // Work Item Processed - End
String wid = null;
try {