You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/02 16:42:23 UTC
svn commit: r1642918 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp:
HttpWorkerThread.java JobProcessComponent.java
Author: cwiklik
Date: Tue Dec 2 15:42:23 2014
New Revision: 1642918
URL: http://svn.apache.org/r1642918
Log:
UIMA-4140 Added code to quiesce the JP on agent stop request
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1642918&r1=1642917&r2=1642918&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Tue Dec 2 15:42:23 2014
@@ -20,6 +20,7 @@
package org.apache.uima.ducc.transport.configuration.jp;
import java.net.SocketTimeoutException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.uima.ducc.common.utils.DuccLogger;
@@ -36,6 +37,7 @@ public class HttpWorkerThread implements
static AtomicInteger counter = new AtomicInteger();
private DuccLogger logger;
private Object monitor = new Object();
+ private CountDownLatch workerThreadCount = null;
/*
interface SMEvent {
Event action();
@@ -225,10 +227,11 @@ public class HttpWorkerThread implements
}
*/
public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
- IUimaProcessor processor) {
+ IUimaProcessor processor, CountDownLatch workerThreadCount) {
this.duccComponent = component;
this.httpClient = httpClient;
this.uimaProcessor = processor;
+ this.workerThreadCount = workerThreadCount;
}
public void run() {
@@ -237,7 +240,7 @@ public class HttpWorkerThread implements
// SMContext ctx = new SMContextImpl(httpClient, States.Start);
String command="";
// run forever (or until the process throws IllegalStateException
- while (true) { //service.running && ctx.state().process(ctx)) {
+ while (duccComponent.isRunning()) { //service.running && ctx.state().process(ctx)) {
try {
IMetaCasTransaction transaction = new MetaCasTransaction();
@@ -300,9 +303,11 @@ public class HttpWorkerThread implements
} catch (Throwable t) {
t.printStackTrace();
+ duccComponent.getLogger().warn("run", null, t);
} finally {
System.out.println("EXITING WorkThread ID:"
+ Thread.currentThread().getId());
+ workerThreadCount.countDown();
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1642918&r1=1642917&r2=1642918&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Tue Dec 2 15:42:23 2014
@@ -19,7 +19,7 @@
package org.apache.uima.ducc.transport.configuration.jp;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -53,6 +53,8 @@ public class JobProcessComponent extends
private int timeout = 30000; // default socket timeout for HTTPClient
private int threadSleepTime = 5000; // time to sleep between GET requests if JD sends null CAS
private IUimaProcessor uimaProcessor = null;
+ private CountDownLatch workerThreadCount = null;
+
// define default class to use to invoke methods via reflection
private String containerClass = "org.apache.uima.ducc.user.jp.UimaProcessContainer";
;
@@ -172,7 +174,7 @@ public class JobProcessComponent extends
// Setup Thread Factory
UimaServiceThreadFactory tf = new UimaServiceThreadFactory(Thread
.currentThread().getThreadGroup());
-
+ workerThreadCount = new CountDownLatch(uimaProcessor.getScaleout());
// Setup Thread pool with thread count = scaleout
tpe = Executors.newFixedThreadPool(uimaProcessor.getScaleout(), tf);
@@ -192,7 +194,7 @@ public class JobProcessComponent extends
// Create and start worker threads that pull Work Items from the JD
Future<?>[] threadHandles = new Future<?>[uimaProcessor.getScaleout()];
for (int j = 0; j < uimaProcessor.getScaleout(); j++) {
- threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, uimaProcessor));
+ threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, uimaProcessor, workerThreadCount));
}
for( Future<?> f : threadHandles ) {
f.get(); // wait for worker threads to exit
@@ -223,17 +225,6 @@ public class JobProcessComponent extends
stop();
super.stop();
-// super.getContext().stop();
-// new Thread() {
-// public void run() {
-// try {
-// System.setProperty("dontKill", "true");
-// uimaProcessor.stop();
-// } catch( Exception e) {
-// e.printStackTrace();
-// }
-// }
-// }.start();
}
} catch( Exception e) {
@@ -244,14 +235,16 @@ public class JobProcessComponent extends
}
}
+ public boolean isRunning() {
+ return currentState.equals(ProcessState.Running);
+ }
public void stop() {
+ currentState = ProcessState.Stopping;
if ( super.isStopping() ) {
return; // already stopping - nothing to do
}
- //configuration.stop();
+
System.out.println("... AbstractManagedService - Stopping Service Adapter");
-// serviceAdapter.stop();
- System.out.println("... AbstractManagedService - Calling super.stop() ");
try {
if (getContext() != null) {
for (Route route : getContext().getRoutes()) {
@@ -261,8 +254,9 @@ public class JobProcessComponent extends
+ route.getId());
}
}
- //jobProcessManager.
- //agent.stop();
+ // block for worker threads to exit run()
+ workerThreadCount.await();
+
if ( uimaProcessor != null ) {
uimaProcessor.stop();
}
@@ -270,7 +264,6 @@ public class JobProcessComponent extends
agent.stop();
}
super.stop();
- //super.getContext().stop();
} catch( Exception e) {
e.printStackTrace();