You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/03/31 20:57:07 UTC

svn commit: r1737271 - /uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java

Author: cwiklik
Date: Thu Mar 31 18:57:07 2016
New Revision: 1737271

URL: http://svn.apache.org/viewvc?rev=1737271&view=rev
Log:
UIMA-4881 modified to use one thread to retry WI request

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1737271&r1=1737270&r2=1737271&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Thu Mar 31 18:57:07 2016
@@ -71,9 +71,58 @@ public class HttpWorkerThread implements
 		HttpWorkerThread.maxFrameworkFailures = maxFrameworkFailures;
 		maxFrameworkErrors = maxFrameworkFailures.get();
 	}   
+
+	public IMetaCasTransaction getWork(PostMethod postMethod, int major, int minor) throws Exception {
+		String command="";
+
+		IMetaCasTransaction transaction = new MetaCasTransaction();
+		try {
+			TransactionId tid = new TransactionId(major, minor);
+			transaction.setTransactionId(tid);
+			// According to HTTP spec, GET may not contain Body in 
+			// HTTP request. HttpClient actually enforces this. So
+			// do a POST instead of a GET.
+			transaction.setType(Type.Get);  // Tell JD you want a Work Item
+			command = Type.Get.name();
+	    	logger.debug("HttpWorkerThread.run()", null, "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
+			// send a request to JD and wait for a reply
+	    	transaction = httpClient.execute(transaction, postMethod);
+	        // The JD may not provide a Work Item to process.
+	    	if ( transaction.getMetaCas()!= null) {
+				logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
+				// Confirm receipt of the CAS. 
+				transaction.setType(Type.Ack);
+				command = Type.Ack.name();
+				tid = new TransactionId(major, minor++);
+				transaction.setTransactionId(tid);
+				logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Sending ACK request - WI:"+transaction.getMetaCas().getSystemKey());
+				transaction = httpClient.execute(transaction, postMethod); 
+				if ( transaction.getMetaCas() == null) {
+					// this can be the case when a JD receives ACK late 
+					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd, however there is no MetaCas. The JD Cancelled the transaction");
+				} else {
+		            logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd");
+				}
+
+	        }
+		} catch( SocketTimeoutException e) {
+			logger.warn("run", null, "Timed Out While Awaiting Response from JD for "+command+" Request - Retrying ...");
+			System.out.println("Time Out While Waiting For a Reply from JD For "+command+" Request");
+		}
+    	return transaction;
+
+	}
+
+	private void waitAwhile(long sleepTime) throws InterruptedException {
+		synchronized (monitor) {
+			// There is no CAS. It looks like the JD CR is done but there
+			// are still WIs being processed. Slow down the rate of requests	
+			monitor.wait(sleepTime);
+		}
+	}
 	@SuppressWarnings("unchecked")
 	public void run() {
-		String command="";
+		//String command="";
 		PostMethod postMethod = null;
 	    logger.info("HttpWorkerThread.run()", null, "Starting JP Process Thread Id:"+Thread.currentThread().getId());
 	    Method processMethod = null;
@@ -82,7 +131,7 @@ public class HttpWorkerThread implements
 	    // ***** DEPLOY ANALYTICS ***********
 	    // First, deploy analytics in a provided process container. Use java reflection to call
 	    // deploy method. The process container has been instantiated in the main thread and
-	    // loaded from ducc-user jar provided in system classpath
+	    // loaded from ducc-user j      ar provided in system classpath
 	    try {
 			processMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("process", Object.class);	
 			getKeyMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("getKey", String.class);	
@@ -154,56 +203,35 @@ public class HttpWorkerThread implements
 				try {
 					int major = IdGenerator.addAndGet(1);
 					int minor = 0;
-
-					IMetaCasTransaction transaction = new MetaCasTransaction();
-					TransactionId tid = new TransactionId(major, minor);
-					transaction.setTransactionId(tid);
-					// According to HTTP spec, GET may not contain Body in 
-					// HTTP request. HttpClient actually enforces this. So
-					// do a POST instead of a GET.
-					transaction.setType(Type.Get);  // Tell JD you want a Work Item
-					command = Type.Get.name();
-			    	logger.debug("HttpWorkerThread.run()", null, "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
-					// send a request to JD and wait for a reply
-			    	transaction = httpClient.execute(transaction, postMethod);
-                    // The JD may not provide a Work Item to process.
-			    	if ( transaction.getMetaCas()!= null) {
-    					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
-    					// Confirm receipt of the CAS. 
-    					transaction.setType(Type.Ack);
-    					command = Type.Ack.name();
-    					tid = new TransactionId(major, minor++);
-    					transaction.setTransactionId(tid);
-    					logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Sending ACK request - WI:"+transaction.getMetaCas().getSystemKey());
-    					transaction = httpClient.execute(transaction, postMethod); 
-    					if ( transaction.getMetaCas() == null) {
-    						// this can be the case when a JD receives ACK late 
-        					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd, however there is no MetaCas. The JD Cancelled the transaction");
-        					continue; // ask for more
-    					}
-                        logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd");
-                    } else {
-    					logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd JD Response, however there is no MetaCas. Sleeping for "+duccComponent.getThreadSleepTime());
-                    }
-
-                    
+					IMetaCasTransaction transaction = getWork(postMethod, major, minor);
+					
 					// if the JD did not provide a Work Item, most likely the CR is
 					// done. In such case, reduce frequency of Get requests
 					// by sleeping in between Get's. Eventually the OR will 
 					// deallocate this process and the thread will exit
 					if ( transaction.getMetaCas() == null || transaction.getMetaCas().getUserSpaceCas() == null) {
+    					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd JD Response, however there is no MetaCas. Sleeping for "+duccComponent.getThreadSleepTime()+" and retrying");
+    					
 						// the JD says there are no more WIs. Sleep awhile
 						// do a GET in case JD changes its mind. The JP will
 						// eventually be stopped by the agent
-						synchronized (monitor) {
-							try {
-								// There is no CAS. It looks like the JD CR is done but there
-								// are still WIs being processed. Slow down the rate of requests	
-								monitor.wait(duccComponent.getThreadSleepTime());
-							} catch (InterruptedException e) {
+
+    					// use class level locking to block all but one thread to do retries.
+						// This is done to prevent flooding JD with retry requests 
+						synchronized (HttpWorkerThread.class) {
+							while(duccComponent.isRunning() ) {
+								waitAwhile(duccComponent.getThreadSleepTime());
+								// just awoken, check if the JP is still in Running state
+								if ( duccComponent.isRunning()) {
+									transaction = getWork(postMethod, major, ++minor);
+									if ( transaction.getMetaCas() != null ) {
+										break;
+									}
+								}
 							}
 						}
-					} else {
+					} 
+					if ( duccComponent.isRunning()) {
 						boolean workItemFailed = false;
 						// process the Work item. Any exception here will cause the 
 						// thread to terminate and also the JP to stop. The stopping
@@ -279,9 +307,10 @@ public class HttpWorkerThread implements
 						// Dont return serialized CAS to reduce the msg size
 						transaction.getMetaCas().setUserSpaceCas(null);
 						transaction.setType(Type.End);
-						command = Type.End.name();
-
-						tid = new TransactionId(major, minor++);
+						//String command = Type.End.name();
+						
+						minor++; // getWork()  
+						TransactionId tid = new TransactionId(major, minor++);
 						transaction.setTransactionId(tid);
 
 						httpClient.execute(transaction, postMethod); // Work Item Processed - End
@@ -334,10 +363,10 @@ public class HttpWorkerThread implements
                         	break;
                         }
 	                    maxFrameworkFailures.set(maxFrameworkErrors);   // reset framework failures on success
-					}
-				} catch( SocketTimeoutException e) {
-					logger.warn("run", null, "Timed Out While Awaiting Response from JD for "+command+" Request - Retrying ...");
-					System.out.println("Time Out While Waiting For a Reply from JD For "+command+" Request");
+					} 
+				} catch( InterruptedException e) {
+					logger.error("run", null, "WorkerThread Interrupted - Terminating Thread "+Thread.currentThread().getId());
+					return;
 				}
 				catch (Exception e ) {
 					logger.error("run", null, e);