You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/31 18:57:18 UTC

svn commit: r1648760 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp: DuccHttpClient.java HttpWorkerThread.java JobProcessComponent.java

Author: cwiklik
Date: Wed Dec 31 17:57:17 2014
New Revision: 1648760

URL: http://svn.apache.org/r1648760
Log:
UIMA-4060 force threads to wait until all initialize UIMA. When initialized notify agent with status=READY

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1648760&r1=1648759&r2=1648760&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Wed Dec 31 17:57:17 2014
@@ -17,6 +17,10 @@
  * under the License.
 */
 package org.apache.uima.ducc.transport.configuration.jp;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
 import java.io.InvalidClassException;
 import java.lang.management.ManagementFactory;
 import java.util.concurrent.Future;
@@ -305,17 +309,36 @@ public class DuccHttpClient {
 	            RequestEntity e = new StringRequestEntity(body,"application/xml","UTF-8" );
 	            postMethod.setRequestEntity(e);
 	            addCommonHeaders(postMethod);
+	    
 	            postMethod.setRequestHeader("Content-Length", String.valueOf(body.length()));
 	            // wait for a reply
 	            httpClient.executeMethod(postMethod);
-                String responseData = postMethod.getResponseBodyAsString();	            
+                //InputStream responseData = postMethod.getResponseBodyAsStream();
+                
+                //ByteArrayInputStream input = new ByteArray
+                //BufferedInputStream input=new BufferedInputStream(responseData);
+                //ByteArrayOutputStream output=new ByteArrayOutputStream();
+                String content = new String(postMethod.getResponseBody());
+                
+                /*
+                byte b[]=new byte[1024];
+                StringBuffer sb = new StringBuffer();
+                int read=0;
+                while ((read=input.read(b)) > -1) {
+                  output.write(b,0,read);
+                  sb.append(new String(b));
+                }
+                responseData.close();
+                input.close();
+                */
 				if ( postMethod.getStatusLine().getStatusCode() != 200) {
 					logger.error("execute", null, "Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
 					throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
 				}
 				logger.info("execute", null, "Thread:"+Thread.currentThread().getId()+" JD Reply Status:"+postMethod.getStatusLine());
-				logger.info("execute", null, "Thread:"+Thread.currentThread().getId()+" Recv'd:"+responseData);
-				Object o = XStreamUtils.unmarshall(responseData);
+				logger.info("execute", null, "Thread:"+Thread.currentThread().getId()+" Recv'd:"+content);
+				System.out.println(content);
+				Object o = XStreamUtils.unmarshall(content); //sb.toString());
 				if ( o instanceof IMetaCasTransaction) {
 					reply = (MetaCasTransaction)o;
 					break;

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1648760&r1=1648759&r2=1648760&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Wed Dec 31 17:57:17 2014
@@ -25,7 +25,10 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.httpclient.HttpMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.XStreamUtils;
 import org.apache.uima.ducc.container.jp.JobProcessManager;
@@ -45,6 +48,7 @@ public class HttpWorkerThread implements
 	static AtomicInteger counter = new AtomicInteger();
 	private Object monitor = new Object();
 	private CountDownLatch workerThreadCount = null;
+	private CountDownLatch threadReadyCount = null;
 	private JobProcessManager jobProcessManager = null;
 /*
 	interface SMEvent {
@@ -235,11 +239,13 @@ public class HttpWorkerThread implements
 	}
 	*/
 	public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
-			JobProcessManager jobProcessManager , CountDownLatch workerThreadCount) {
+			JobProcessManager jobProcessManager , CountDownLatch workerThreadCount,
+			CountDownLatch threadReadyCount) {
 		this.duccComponent = component;
 		this.httpClient = httpClient;
 		this.jobProcessManager = jobProcessManager;
 		this.workerThreadCount = workerThreadCount;
+		this.threadReadyCount = threadReadyCount;
 	}
     private void initialize(boolean isUimaASJob ) throws Exception {
     	// For UIMA-AS job, there should only be one instance of UimaProcessor.
@@ -261,10 +267,19 @@ public class HttpWorkerThread implements
 			initialize(duccComponent.isUimaASJob());
 			// each thread needs its own PostMethod
 			PostMethod postMethod = new PostMethod(httpClient.getJdUrl());
-		
+			// Set request timeout
+			postMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, duccComponent.getTimeout());
 			//States stateMachine = new States(States.Start);
 //			SMContext ctx = new SMContextImpl(httpClient, States.Start);
 			String command="";
+			
+			threadReadyCount.countDown();  // this thread is ready
+			
+			// **************************************************************************
+			// now block and wait until all threads finish deploying and initializing UIMA
+			// **************************************************************************
+			threadReadyCount.await();
+			
 			// run forever (or until the process throws IllegalStateException
 	    	logger.info("HttpWorkerThread.run()", null, "Processing Work Items - Thread Id:"+Thread.currentThread().getId());
 
@@ -332,7 +347,19 @@ public class HttpWorkerThread implements
 							
 							transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
 							
-						} catch( Exception ee) {
+						} catch( RuntimeException ee) {
+							if ( ee.getCause().equals( AnalysisEngineProcessException.class)) {
+								// This is process error. It may contain user defined
+								// exception in the stack trace. To protect against
+								// ClassNotF ound, the entire stack trace was serialized.
+								// Fetch the serialized stack trace and pass it on to
+								// to the JD.
+								transaction.getMetaCas().setUserSpaceException(ee.getMessage());
+							} else {
+								logger.error("run", null, ee);
+							}
+							transaction.getMetaCas().setUserSpaceException("Bob");
+						}  catch( Exception ee) {
 							transaction.getMetaCas().setUserSpaceException("Bob");
 							logger.error("run", null, ee);
 						}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1648760&r1=1648759&r2=1648760&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Wed Dec 31 17:57:17 2014
@@ -55,6 +55,7 @@ public class JobProcessComponent extends
 	private int threadSleepTime = 5000; // time to sleep between GET requests if JD sends null CAS
 //	private IUimaProcessor uimaProcessor = null; 
 	private CountDownLatch workerThreadCount = null;
+	private CountDownLatch threadReadyCount=null;
 	ScheduledThreadPoolExecutor executor = null;
 	ExecutorService tpe = null;
     private volatile boolean uimaASJob=false;
@@ -108,6 +109,9 @@ public class JobProcessComponent extends
 	public void setTimeout(int timeout) {
 		this.timeout = timeout;
 	}
+	public int getTimeout() {
+		return this.timeout;
+	}
 	/**
 	 * This method is called by super during ducc framework boot
 	 * sequence. It creates all the internal components and worker threads
@@ -120,8 +124,8 @@ public class JobProcessComponent extends
 		
 		try {
 			if ( args == null || args.length ==0 || args[0] == null || args[0].trim().length() == 0) {
-				logger.warn("start", null, "Missing Deployment Descriptor - the JP Requires DD argument");
-                throw new RuntimeException("Missing Deployment Descriptor - the JP Requires DD argument");
+				logger.warn("start", null, "Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
+                throw new RuntimeException("Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
 			}
 			// the JobProcessConfiguration checked if the below property exists
 			String jps = System.getProperty(FlagsHelper.Name.UserClasspath.pname());
@@ -194,6 +198,9 @@ public class JobProcessComponent extends
 		    	// there is an exception. The IUimaProcessor is a wrapper around
 		    	// processing container where the analysis is being done.
 		    	int scaleout =	jobProcessManager.initialize(jps, jpArgs, containerClass);
+		    	// initialize latch to count number of threads which initialized successfully
+		    	threadReadyCount = new CountDownLatch(scaleout);
+		    	
 //		    	uimaProcessor =	jobProcessManager.deploy(jps, uimaAsArgs, containerClass);
 
 				// Setup Thread Factory 
@@ -207,12 +214,6 @@ public class JobProcessComponent extends
 				client.setTimeout(timeout);
 //				client.setScaleout(scaleout);//uimaProcessor.getScaleout());
 				
-		    	// pipelines deployed and initialized. This process is Ready
-		    	currentState = ProcessState.Running;
-				// Update agent with the most up-to-date state of the pipeline
-			//	monitor.run();
-				// all is well, so notify agent that this process is in Running state
-				agent.notify(currentState, processJmxUrl);
 				System.out.println("JMX Connect String:"+ processJmxUrl);
                 // Create thread pool and begin processing
 		    	getLogger().info("start", null, "Starting "+scaleout+" Process Threads - JMX Connect String:"+ processJmxUrl);
@@ -222,9 +223,17 @@ public class JobProcessComponent extends
 //		    	Future<?>[] threadHandles = new Future<?>[uimaProcessor.getScaleout()];
 //				for (int j = 0; j < uimaProcessor.getScaleout(); j++) {
 				for (int j = 0; j < scaleout; j++) {
-					threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, jobProcessManager, workerThreadCount));
+					threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, jobProcessManager, workerThreadCount, threadReadyCount));
 				}
-		    	getLogger().info("start", null, "All Http Worker Threads Started - Waiting For All Threads to Exit");
+				// wait until all process threads initialize
+				threadReadyCount.await();
+		    	// pipelines deployed and initialized. This process is Ready
+		    	currentState = ProcessState.Running;
+				// Update agent with the most up-to-date state of the pipeline
+				// all is well, so notify agent that this process is in Running state
+				agent.notify(currentState, processJmxUrl);
+
+				getLogger().info("start", null, "All Http Worker Threads Started - Waiting For All Threads to Exit");
 
 				for( Future<?> f : threadHandles ) {
 					if ( f != null ) {