You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/01/22 16:23:36 UTC

svn commit: r1653912 - /uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java

Author: cwiklik
Date: Thu Jan 22 15:23:36 2015
New Revision: 1653912

URL: http://svn.apache.org/r1653912
Log:
UIMA-4066 Added transactionID support. Code cleanup. Added comments

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java?rev=1653912&r1=1653911&r2=1653912&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configurator/jp/HttpWorkerThread.java Thu Jan 22 15:23:36 2015
@@ -27,6 +27,7 @@ import java.net.SocketTimeoutException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.params.HttpMethodParams;
@@ -38,18 +39,19 @@ import org.apache.uima.ducc.container.ne
 import org.apache.uima.ducc.container.net.iface.IPerformanceMetrics;
 import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
 import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
+import org.apache.uima.ducc.container.net.impl.TransactionId;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
 public class HttpWorkerThread implements Runnable {
 	DuccLogger logger = new DuccLogger(HttpWorkerThread.class);
 	private DuccHttpClient httpClient = null;
-//	private IUimaProcessor uimaProcessor;
 	private JobProcessComponent duccComponent;
 	private Object monitor = new Object();
 	private CountDownLatch workerThreadCount = null;
 	private CountDownLatch threadReadyCount = null;
 	private Object processorInstance = null;
-
+    private static AtomicInteger IdGenerator =
+    		new AtomicInteger();
 	public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
 			Object processorInstance, CountDownLatch workerThreadCount,
 			CountDownLatch threadReadyCount) {
@@ -95,17 +97,17 @@ public class HttpWorkerThread implements
 			threadReadyCount.countDown();  // this thread is ready
 			// **************************************************************************
 			// now block and wait until all threads finish deploying and initializing 
-			// analytics in provided process container
+			// analytics in provided process container. Processing begins when
+			// all worker threads initialize their analytics.
 			// **************************************************************************
 			try {
-				threadReadyCount.await();
+				threadReadyCount.await();   // wait for all analytics to initialize
 			} catch( Exception ie) {}
 
-		
-//			workerThreadCount.countDown();
-
 			if (!error) {
 				synchronized(JobProcessComponent.class) {
+					// change the state of this process and notify
+					// Ducc agent that the process is ready and running
 					duccComponent.setState(ProcessState.Running);
 				}
 			}
@@ -113,23 +115,28 @@ public class HttpWorkerThread implements
 	   	}
 			
 			
-		// run forever (or until the process throws IllegalStateException
-	   	logger.info("HttpWorkerThread.run()", null, "Processing Work Items - Thread Id:"+Thread.currentThread().getId());
+	   	logger.info("HttpWorkerThread.run()", null, "Begin Processing Work Items - Thread Id:"+Thread.currentThread().getId());
 		try {
-
-			while (duccComponent.isRunning()) {  //service.running && ctx.state().process(ctx)) {
+			// Enter process loop. Stop this thread on the first process error.
+			while (duccComponent.isRunning()) {  
 
 				try {
+					int major = IdGenerator.addAndGet(1);
+					int minor = 0;
+
 					IMetaCasTransaction transaction = new MetaCasTransaction();
-					//System.out.println("Requesting Work from JD");
+					TransactionId tid = new TransactionId(major, minor);
+					transaction.setTransactionId(tid);
 					// According to HTTP spec, GET may not contain Body in 
 					// HTTP request. HttpClient actually enforces this. So
 					// do a POST instead of a GET.
-					transaction.setType(Type.Get);  // Tell JD you want a CAS
+					transaction.setType(Type.Get);  // Tell JD you want a Work Item
 					command = Type.Get.name();
 			    	logger.debug("HttpWorkerThread.run()", null, "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
-					transaction = httpClient.execute(transaction, postMethod);
-                    if ( transaction.getMetaCas()!= null) {
+					// send a request to JD and wait for a reply
+			    	transaction = httpClient.execute(transaction, postMethod);
+                    // The JD may not provide a Work Item to process.
+			    	if ( transaction.getMetaCas()!= null) {
     					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
                     } else {
     					logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd JD Response, however there is no MetaCas. Sleeping for "+duccComponent.getThreadSleepTime());
@@ -138,34 +145,43 @@ public class HttpWorkerThread implements
 					// Confirm receipt of the CAS. 
 					transaction.setType(Type.Ack);
 					command = Type.Ack.name();
-					httpClient.execute(transaction, postMethod); // Ready to process
+					tid = new TransactionId(major, minor++);
+					transaction.setTransactionId(tid);
+					httpClient.execute(transaction, postMethod); 
+					
                     logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Sent ACK");
                     
-					// if the JD did not provide a CAS, most likely the CR is
+					// if the JD did not provide a Work Item, most likely the CR is
 					// done. In such case, reduce frequency of Get requests
-					// by sleeping in between Get's. Eventually the JD will 
-					// confirm that there is no more work and this thread
-					// can exit.
+					// by sleeping in between Get's. Eventually the OR will 
+					// deallocate this process and the thread will exit
 					if ( transaction.getMetaCas() == null || transaction.getMetaCas().getUserSpaceCas() == null) {
 						// the JD says there are no more WIs. Sleep awhile
 						// do a GET in case JD changes its mind. The JP will
 						// eventually be stopped by the agent
 						synchronized (monitor) {
 							try {
+								// There is no CAS. It looks like the JD CR is done but there
+								// are still WIs being processed. Slow down the rate of requests	
 								monitor.wait(duccComponent.getThreadSleepTime());
 							} catch (InterruptedException e) {
 							}
 						}
-						// There is no CAS. It looks like the JD CR is done but there
-						// are still WIs being processed. Slow down the rate of requests	
 					} else {
 						boolean workItemFailed = false;
-						// process the CAS
+						// process the Work item. Any exception here will cause the 
+						// thread to terminate and also the JP to stop. The stopping
+						// is orderly allowing each thread to finish processing of
+						// the current WI. Once the JP notifies the Agent of a problem
+						// the Agent will wait for 1 minute (default) before killing
+						// this process via kill -9
 						try {
+							//    ********** PROCESS() **************
 							// using java reflection, call process to analyze the CAS
 							 List<Properties> metrics = (List<Properties>)processMethod.
 							   invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas());
-							
+							//    ***********************************
+							 
 		                    logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
 							IPerformanceMetrics metricsWrapper =
 									new PerformanceMetrics();
@@ -187,6 +203,8 @@ public class HttpWorkerThread implements
 							// RuntimeException->AnalysisEngineException.message
 							workItemFailed = true;
 							IMetaCas mc = transaction.getMetaCas();
+							
+							// Fetch serialized exception as a blob
 							Method getLastSerializedErrorMethod = processorInstance.getClass().getDeclaredMethod("getLastSerializedError");
 							byte[] serializedException =
 							    (byte[])getLastSerializedErrorMethod.invoke(processorInstance);
@@ -195,6 +213,7 @@ public class HttpWorkerThread implements
 							logger.info("run", null, "Work item processing failed - returning serialized exception to the JD");
 						} catch( Exception ee) {
 							workItemFailed = true;
+							// Serialize exception for the JD.
 							ByteArrayOutputStream baos = new ByteArrayOutputStream();
 						    ObjectOutputStream oos = new ObjectOutputStream( baos );
 						    oos.writeObject( ee);
@@ -202,9 +221,14 @@ public class HttpWorkerThread implements
 							transaction.getMetaCas().setUserSpaceException(baos.toByteArray());
 							logger.error("run", null, ee);
 						}
+						// Dont return serialized CAS to reduce the msg size
 						transaction.getMetaCas().setUserSpaceCas(null);
 						transaction.setType(Type.End);
 						command = Type.End.name();
+
+						tid = new TransactionId(major, minor++);
+						transaction.setTransactionId(tid);
+
 						httpClient.execute(transaction, postMethod); // Work Item Processed - End
                     	String wid = null;
                     	try {