You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/02 16:42:23 UTC

svn commit: r1642918 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp: HttpWorkerThread.java JobProcessComponent.java

Author: cwiklik
Date: Tue Dec  2 15:42:23 2014
New Revision: 1642918

URL: http://svn.apache.org/r1642918
Log:
UIMA-4140 Added code to quiesce the JP on agent stop request

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1642918&r1=1642917&r2=1642918&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Tue Dec  2 15:42:23 2014
@@ -20,6 +20,7 @@
 package org.apache.uima.ducc.transport.configuration.jp;
 
 import java.net.SocketTimeoutException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.uima.ducc.common.utils.DuccLogger;
@@ -36,6 +37,7 @@ public class HttpWorkerThread implements
 	static AtomicInteger counter = new AtomicInteger();
     private DuccLogger logger;
 	private Object monitor = new Object();
+	private CountDownLatch workerThreadCount = null;
 /*
 	interface SMEvent {
 		Event action();
@@ -225,10 +227,11 @@ public class HttpWorkerThread implements
 	}
 	*/
 	public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
-			IUimaProcessor processor) {
+			IUimaProcessor processor, CountDownLatch workerThreadCount) {
 		this.duccComponent = component;
 		this.httpClient = httpClient;
 		this.uimaProcessor = processor;
+		this.workerThreadCount = workerThreadCount;
 	}
 
 	public void run() {
@@ -237,7 +240,7 @@ public class HttpWorkerThread implements
 //			SMContext ctx = new SMContextImpl(httpClient, States.Start);
 			String command="";
 			// run forever (or until the process throws IllegalStateException
-			while (true) {  //service.running && ctx.state().process(ctx)) {
+			while (duccComponent.isRunning()) {  //service.running && ctx.state().process(ctx)) {
 
 				try {
 					IMetaCasTransaction transaction = new MetaCasTransaction();
@@ -300,9 +303,11 @@ public class HttpWorkerThread implements
 
 		} catch (Throwable t) {
 			t.printStackTrace();
+			duccComponent.getLogger().warn("run", null, t);
 		} finally {
 			System.out.println("EXITING WorkThread ID:"
 					+ Thread.currentThread().getId());
+			workerThreadCount.countDown();
 		}
 
 	}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1642918&r1=1642917&r2=1642918&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Tue Dec  2 15:42:23 2014
@@ -19,7 +19,7 @@
 
 package org.apache.uima.ducc.transport.configuration.jp;
 
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -53,6 +53,8 @@ public class JobProcessComponent extends
 	private int timeout = 30000;  // default socket timeout for HTTPClient
 	private int threadSleepTime = 5000; // time to sleep between GET requests if JD sends null CAS
 	private IUimaProcessor uimaProcessor = null; 
+	private CountDownLatch workerThreadCount = null;
+	
 	// define default class to use to invoke methods via reflection
 	private String containerClass = "org.apache.uima.ducc.user.jp.UimaProcessContainer";
 ;
@@ -172,7 +174,7 @@ public class JobProcessComponent extends
 				// Setup Thread Factory 
 				UimaServiceThreadFactory tf = new UimaServiceThreadFactory(Thread
 						.currentThread().getThreadGroup());
-
+				workerThreadCount = new CountDownLatch(uimaProcessor.getScaleout());
 				// Setup Thread pool with thread count = scaleout
 				tpe = Executors.newFixedThreadPool(uimaProcessor.getScaleout(), tf);
 
@@ -192,7 +194,7 @@ public class JobProcessComponent extends
 		    	// Create and start worker threads that pull Work Items from the JD
 		    	Future<?>[] threadHandles = new Future<?>[uimaProcessor.getScaleout()];
 				for (int j = 0; j < uimaProcessor.getScaleout(); j++) {
-					threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, uimaProcessor));
+					threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, uimaProcessor, workerThreadCount));
 				}
 				for( Future<?> f : threadHandles ) {
 					f.get();  // wait for worker threads to exit
@@ -223,17 +225,6 @@ public class JobProcessComponent extends
 
 		    	stop();
 		    	super.stop();
-//		    	super.getContext().stop();
-//		    	new Thread() {
-//		    		public void run() {
-//		    			try {
-//		    				System.setProperty("dontKill", "true");
-//			    			uimaProcessor.stop();
-//		    			} catch( Exception e) {
-//		    				e.printStackTrace();
-//		    			}
-//		    		}
-//		    	}.start();
 				
 		    }
 		} catch( Exception e) {
@@ -244,14 +235,16 @@ public class JobProcessComponent extends
 		}
 
 	}
+	public boolean isRunning() {
+		return currentState.equals(ProcessState.Running);
+	}
 	public void stop() {
+		currentState = ProcessState.Stopping;
 		if ( super.isStopping() ) {
 			return;  // already stopping - nothing to do
 		}
-		//configuration.stop();
+
 		System.out.println("... AbstractManagedService - Stopping Service Adapter");
-//		serviceAdapter.stop();
-		System.out.println("... AbstractManagedService - Calling super.stop() ");
 	    try {
         	if (getContext() != null) {
     			for (Route route : getContext().getRoutes()) {
@@ -261,8 +254,9 @@ public class JobProcessComponent extends
     						+ route.getId());
     			}
     		}
-        	//jobProcessManager.
-			//agent.stop();
+        	// block for worker threads to exit run()
+        	workerThreadCount.await();
+        	
         	if ( uimaProcessor != null ) {
             	uimaProcessor.stop();
         	}
@@ -270,7 +264,6 @@ public class JobProcessComponent extends
             	agent.stop();
         	}
 			super.stop();
-			//super.getContext().stop();
 			
 	    } catch( Exception e) {
 	    	e.printStackTrace();