You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/31 18:57:18 UTC
svn commit: r1648760 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp:
DuccHttpClient.java HttpWorkerThread.java JobProcessComponent.java
Author: cwiklik
Date: Wed Dec 31 17:57:17 2014
New Revision: 1648760
URL: http://svn.apache.org/r1648760
Log:
UIMA-4060 force threads to wait until all initialize UIMA. When initialized notify agent with status=READY
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1648760&r1=1648759&r2=1648760&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Wed Dec 31 17:57:17 2014
@@ -17,6 +17,10 @@
* under the License.
*/
package org.apache.uima.ducc.transport.configuration.jp;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
import java.io.InvalidClassException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.Future;
@@ -305,17 +309,36 @@ public class DuccHttpClient {
RequestEntity e = new StringRequestEntity(body,"application/xml","UTF-8" );
postMethod.setRequestEntity(e);
addCommonHeaders(postMethod);
+
postMethod.setRequestHeader("Content-Length", String.valueOf(body.length()));
// wait for a reply
httpClient.executeMethod(postMethod);
- String responseData = postMethod.getResponseBodyAsString();
+ //InputStream responseData = postMethod.getResponseBodyAsStream();
+
+ //ByteArrayInputStream input = new ByteArray
+ //BufferedInputStream input=new BufferedInputStream(responseData);
+ //ByteArrayOutputStream output=new ByteArrayOutputStream();
+ String content = new String(postMethod.getResponseBody());
+
+ /*
+ byte b[]=new byte[1024];
+ StringBuffer sb = new StringBuffer();
+ int read=0;
+ while ((read=input.read(b)) > -1) {
+ output.write(b,0,read);
+ sb.append(new String(b));
+ }
+ responseData.close();
+ input.close();
+ */
if ( postMethod.getStatusLine().getStatusCode() != 200) {
logger.error("execute", null, "Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
}
logger.info("execute", null, "Thread:"+Thread.currentThread().getId()+" JD Reply Status:"+postMethod.getStatusLine());
- logger.info("execute", null, "Thread:"+Thread.currentThread().getId()+" Recv'd:"+responseData);
- Object o = XStreamUtils.unmarshall(responseData);
+ logger.info("execute", null, "Thread:"+Thread.currentThread().getId()+" Recv'd:"+content);
+ System.out.println(content);
+ Object o = XStreamUtils.unmarshall(content); //sb.toString());
if ( o instanceof IMetaCasTransaction) {
reply = (MetaCasTransaction)o;
break;
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1648760&r1=1648759&r2=1648760&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Wed Dec 31 17:57:17 2014
@@ -25,7 +25,10 @@ import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.XStreamUtils;
import org.apache.uima.ducc.container.jp.JobProcessManager;
@@ -45,6 +48,7 @@ public class HttpWorkerThread implements
static AtomicInteger counter = new AtomicInteger();
private Object monitor = new Object();
private CountDownLatch workerThreadCount = null;
+ private CountDownLatch threadReadyCount = null;
private JobProcessManager jobProcessManager = null;
/*
interface SMEvent {
@@ -235,11 +239,13 @@ public class HttpWorkerThread implements
}
*/
public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
- JobProcessManager jobProcessManager , CountDownLatch workerThreadCount) {
+ JobProcessManager jobProcessManager , CountDownLatch workerThreadCount,
+ CountDownLatch threadReadyCount) {
this.duccComponent = component;
this.httpClient = httpClient;
this.jobProcessManager = jobProcessManager;
this.workerThreadCount = workerThreadCount;
+ this.threadReadyCount = threadReadyCount;
}
private void initialize(boolean isUimaASJob ) throws Exception {
// For UIMA-AS job, there should only be one instance of UimaProcessor.
@@ -261,10 +267,19 @@ public class HttpWorkerThread implements
initialize(duccComponent.isUimaASJob());
// each thread needs its own PostMethod
PostMethod postMethod = new PostMethod(httpClient.getJdUrl());
-
+ // Set request timeout
+ postMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, duccComponent.getTimeout());
//States stateMachine = new States(States.Start);
// SMContext ctx = new SMContextImpl(httpClient, States.Start);
String command="";
+
+ threadReadyCount.countDown(); // this thread is ready
+
+ // **************************************************************************
+ // now block and wait until all threads finish deploying and initializing UIMA
+ // **************************************************************************
+ threadReadyCount.await();
+
// run forever (or until the process throws IllegalStateException
logger.info("HttpWorkerThread.run()", null, "Processing Work Items - Thread Id:"+Thread.currentThread().getId());
@@ -332,7 +347,19 @@ public class HttpWorkerThread implements
transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
- } catch( Exception ee) {
+ } catch( RuntimeException ee) {
+ if ( ee.getCause().equals( AnalysisEngineProcessException.class)) {
+ // This is process error. It may contain user defined
+ // exception in the stack trace. To protect against
+ // ClassNotF ound, the entire stack trace was serialized.
+ // Fetch the serialized stack trace and pass it on to
+ // to the JD.
+ transaction.getMetaCas().setUserSpaceException(ee.getMessage());
+ } else {
+ logger.error("run", null, ee);
+ }
+ transaction.getMetaCas().setUserSpaceException("Bob");
+ } catch( Exception ee) {
transaction.getMetaCas().setUserSpaceException("Bob");
logger.error("run", null, ee);
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1648760&r1=1648759&r2=1648760&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Wed Dec 31 17:57:17 2014
@@ -55,6 +55,7 @@ public class JobProcessComponent extends
private int threadSleepTime = 5000; // time to sleep between GET requests if JD sends null CAS
// private IUimaProcessor uimaProcessor = null;
private CountDownLatch workerThreadCount = null;
+ private CountDownLatch threadReadyCount=null;
ScheduledThreadPoolExecutor executor = null;
ExecutorService tpe = null;
private volatile boolean uimaASJob=false;
@@ -108,6 +109,9 @@ public class JobProcessComponent extends
public void setTimeout(int timeout) {
this.timeout = timeout;
}
+ public int getTimeout() {
+ return this.timeout;
+ }
/**
* This method is called by super during ducc framework boot
* sequence. It creates all the internal components and worker threads
@@ -120,8 +124,8 @@ public class JobProcessComponent extends
try {
if ( args == null || args.length ==0 || args[0] == null || args[0].trim().length() == 0) {
- logger.warn("start", null, "Missing Deployment Descriptor - the JP Requires DD argument");
- throw new RuntimeException("Missing Deployment Descriptor - the JP Requires DD argument");
+ logger.warn("start", null, "Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
+ throw new RuntimeException("Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
}
// the JobProcessConfiguration checked if the below property exists
String jps = System.getProperty(FlagsHelper.Name.UserClasspath.pname());
@@ -194,6 +198,9 @@ public class JobProcessComponent extends
// there is an exception. The IUimaProcessor is a wrapper around
// processing container where the analysis is being done.
int scaleout = jobProcessManager.initialize(jps, jpArgs, containerClass);
+ // initialize latch to count number of threads which initialized successfully
+ threadReadyCount = new CountDownLatch(scaleout);
+
// uimaProcessor = jobProcessManager.deploy(jps, uimaAsArgs, containerClass);
// Setup Thread Factory
@@ -207,12 +214,6 @@ public class JobProcessComponent extends
client.setTimeout(timeout);
// client.setScaleout(scaleout);//uimaProcessor.getScaleout());
- // pipelines deployed and initialized. This process is Ready
- currentState = ProcessState.Running;
- // Update agent with the most up-to-date state of the pipeline
- // monitor.run();
- // all is well, so notify agent that this process is in Running state
- agent.notify(currentState, processJmxUrl);
System.out.println("JMX Connect String:"+ processJmxUrl);
// Create thread pool and begin processing
getLogger().info("start", null, "Starting "+scaleout+" Process Threads - JMX Connect String:"+ processJmxUrl);
@@ -222,9 +223,17 @@ public class JobProcessComponent extends
// Future<?>[] threadHandles = new Future<?>[uimaProcessor.getScaleout()];
// for (int j = 0; j < uimaProcessor.getScaleout(); j++) {
for (int j = 0; j < scaleout; j++) {
- threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, jobProcessManager, workerThreadCount));
+ threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, jobProcessManager, workerThreadCount, threadReadyCount));
}
- getLogger().info("start", null, "All Http Worker Threads Started - Waiting For All Threads to Exit");
+ // wait until all process threads initialize
+ threadReadyCount.await();
+ // pipelines deployed and initialized. This process is Ready
+ currentState = ProcessState.Running;
+ // Update agent with the most up-to-date state of the pipeline
+ // all is well, so notify agent that this process is in Running state
+ agent.notify(currentState, processJmxUrl);
+
+ getLogger().info("start", null, "All Http Worker Threads Started - Waiting For All Threads to Exit");
for( Future<?> f : threadHandles ) {
if ( f != null ) {