You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/15 15:21:53 UTC

svn commit: r1645665 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp: DuccHttpClient.java HttpWorkerThread.java

Author: cwiklik
Date: Mon Dec 15 14:21:53 2014
New Revision: 1645665

URL: http://svn.apache.org/r1645665
Log:
UIMA-4066 removed synchronization. Each thread has its own PostMethod instance. Added support for virtual nodes

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1645665&r1=1645664&r2=1645665&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Mon Dec 15 14:21:53 2014
@@ -50,13 +50,14 @@ import org.apache.http.protocol.RequestT
 import org.apache.http.protocol.RequestUserAgent;
 import org.apache.http.util.EntityUtils;
 import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.XStreamUtils;
 import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
 import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
 import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
 
 public class DuccHttpClient {
-	
+	DuccLogger logger = new DuccLogger(DuccHttpClient.class);
 	HttpRequestExecutor httpexecutor = null;
 	ConnectionReuseStrategy connStrategy = null;
 	HttpCoreContext coreContext = null;
@@ -64,8 +65,6 @@ public class DuccHttpClient {
 	BasicConnPool connPool = null;
 	HttpHost host = null;
 	String target = null;
-//	String hostIP = "";
-//	String hostname = "";
 	NodeIdentity nodeIdentity;
 	String pid = "";
 	ReentrantLock lock = new ReentrantLock();
@@ -73,7 +72,8 @@ public class DuccHttpClient {
 	
 	// New --------------------
     HttpClient httpClient = null;
-    PostMethod postMethod;
+    //PostMethod postMethod;
+	String jdUrl;
 	
 	public void setTimeout( int timeout) {
 		this.timeout = timeout;
@@ -83,13 +83,16 @@ public class DuccHttpClient {
 		connPool.setDefaultMaxPerRoute(scaleout);
 		connPool.setMaxPerRoute(host, scaleout);
 	}
+	public String getJdUrl() {
+		return jdUrl;
+	}
 	public void initialize(String jdUrl) throws Exception {
-        postMethod = new PostMethod(jdUrl);
-        pid = getProcessIP("N/A");
+    //    postMethod = new PostMethod(jdUrl);
+		this.jdUrl = jdUrl;
+		pid = getProcessIP("N/A");
 		nodeIdentity = new NodeIdentity();
 		MultiThreadedHttpConnectionManager cMgr =
 		    new MultiThreadedHttpConnectionManager();
-		
 		httpClient = 
     		new HttpClient(cMgr);
 		 
@@ -153,25 +156,39 @@ public class DuccHttpClient {
 		}
 		return fallback;
 	}
+	private String getIP() {
+		String ip =nodeIdentity.getIp();
+		if ( System.getenv("IP") != null) {
+			ip = System.getenv("IP");
+		}
+		return ip;
+	}
+	private String getNodeName() {
+		String nn =nodeIdentity.getName();
+		if ( System.getenv("NodeName") != null) {
+			nn = System.getenv("NodeName");
+		}
+		return nn;
+	}
     private void addCommonHeaders( BasicHttpRequest request ) {
-    	request.setHeader("IP", nodeIdentity.getIp());
-		request.setHeader("Hostname", nodeIdentity.getName());
+    	request.setHeader("IP", getIP());
+		request.setHeader("Hostname", getNodeName());
 		request.setHeader("ThreadID",
 				String.valueOf(Thread.currentThread().getId()));
 		request.setHeader("PID", pid);
 		
     }
     private void addCommonHeaders( IMetaCasTransaction transaction ) {
-    	transaction.setRequesterAddress(nodeIdentity.getIp());
-    	transaction.setRequesterName(nodeIdentity.getName());
+    	transaction.setRequesterAddress(getIP());
+    	transaction.setRequesterName(getNodeName());
     	transaction.setRequesterProcessId(Integer.valueOf(pid));
     	transaction.setRequesterThreadId((int)Thread.currentThread().getId());
     }
     
     private void addCommonHeaders( PostMethod method ) {
     	synchronized( DuccHttpClient.class) {
-        	method.setRequestHeader("IP", nodeIdentity.getIp());
-        	method.setRequestHeader("Hostname", nodeIdentity.getName());
+        	method.setRequestHeader("IP", getIP());
+        	method.setRequestHeader("Hostname", getNodeName());
         	method.setRequestHeader("ThreadID",
     				String.valueOf(Thread.currentThread().getId()));
         	method.setRequestHeader("PID", pid);
@@ -263,7 +280,7 @@ public class DuccHttpClient {
 	}
 
 	
-	public synchronized IMetaCasTransaction execute( IMetaCasTransaction transaction ) throws Exception {
+	public IMetaCasTransaction execute( IMetaCasTransaction transaction, PostMethod postMethod ) throws Exception {
 		int retry = 2;
 		Exception lastError = null;
 		IMetaCasTransaction reply=null;
@@ -275,22 +292,18 @@ public class DuccHttpClient {
 			try {
 				// Serialize request object to XML
 				String body = XStreamUtils.marshall(transaction);
-				System.out.println("Body Length:"+body.length());
 	            RequestEntity e = new StringRequestEntity(body,"application/xml","UTF-8" );
 	            postMethod.setRequestEntity(e);
-	            System.out.println("Entity Body Length:"+postMethod.getRequestEntity().getContentLength());
-	            //addCommonHeaders(postMethod);
+	            addCommonHeaders(postMethod);
 	            postMethod.setRequestHeader("Content-Length", String.valueOf(body.length()));
 	            // wait for a reply
 	            httpClient.executeMethod(postMethod);
                 String responseData = postMethod.getResponseBodyAsString();	            
 				if ( postMethod.getStatusLine().getStatusCode() != 200) {
-					System.out.println("Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
+					logger.error("execute", null, "Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
 					throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
 				}
-				System.out.println("<< Response: "+ postMethod.getStatusLine());
-//				String responseData = EntityUtils.toString(postMethod.getEntity());
-				System.out.println(responseData);
+				logger.info("execute", null, "JD Reply Status:"+postMethod.getStatusLine());
 				Object o = XStreamUtils.unmarshall(responseData);
 				if ( o instanceof IMetaCasTransaction) {
 					reply = (MetaCasTransaction)o;
@@ -300,10 +313,9 @@ public class DuccHttpClient {
 				}
 			} catch( Exception t) {
 				lastError = t;
-				t.printStackTrace();
+				logger.error("run", null, t);
 			}
 			finally {
-				System.out.println("==============");
 				postMethod.releaseConnection();
 			}
 			

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1645665&r1=1645664&r2=1645665&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Mon Dec 15 14:21:53 2014
@@ -25,6 +25,7 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.XStreamUtils;
 import org.apache.uima.ducc.container.jp.JobProcessManager;
@@ -37,11 +38,11 @@ import org.apache.uima.ducc.container.ne
 import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
 
 public class HttpWorkerThread implements Runnable {
+	DuccLogger logger = new DuccLogger(HttpWorkerThread.class);
 	DuccHttpClient httpClient = null;
 	private IUimaProcessor uimaProcessor;
 	private JobProcessComponent duccComponent;
 	static AtomicInteger counter = new AtomicInteger();
-    private DuccLogger logger;
 	private Object monitor = new Object();
 	private CountDownLatch workerThreadCount = null;
 	private JobProcessManager jobProcessManager = null;
@@ -237,7 +238,6 @@ public class HttpWorkerThread implements
 			JobProcessManager jobProcessManager , CountDownLatch workerThreadCount) {
 		this.duccComponent = component;
 		this.httpClient = httpClient;
-		//this.uimaProcessor = processor;
 		this.jobProcessManager = jobProcessManager;
 		this.workerThreadCount = workerThreadCount;
 	}
@@ -257,6 +257,9 @@ public class HttpWorkerThread implements
 	public void run() {
 		try {
 			initialize(duccComponent.isUimaASJob());
+			// each thread needs its own PostMethod
+			PostMethod postMethod = new PostMethod(httpClient.getJdUrl());
+		
 			//States stateMachine = new States(States.Start);
 //			SMContext ctx = new SMContextImpl(httpClient, States.Start);
 			String command="";
@@ -272,13 +275,15 @@ public class HttpWorkerThread implements
 					transaction.setType(Type.Get);  // Tell JD you want a CAS
 					command = Type.Get.name();
 //					transaction = httpClient.post(transaction);
-					transaction = httpClient.execute(transaction);
-                    
-					// Confirm receipt of the CAS. 
+					transaction = httpClient.execute(transaction, postMethod);
+                    logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd New WI:"+transaction.getMetaCas().getSystemKey());
+					System.out.println("Thread:"+Thread.currentThread().getId()+" Recv'd New WI:"+transaction.getMetaCas().getSystemKey());
+                    // Confirm receipt of the CAS. 
 					transaction.setType(Type.Ack);
 					command = Type.Ack.name();
 ///					httpClient.post(transaction); // Ready to process
-					httpClient.execute(transaction); // Ready to process
+					httpClient.execute(transaction, postMethod); // Ready to process
+                    logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Sent ACK");
 					
 					// if the JD did not provide a CAS, most likely the CR is
 					// done. In such case, reduce frequency of Get requests
@@ -290,7 +295,7 @@ public class HttpWorkerThread implements
 						// been processed and accounted for
 						if ( transaction.getJdState().equals(JdState.Ended) ) {
 							duccComponent.getLogger().warn("run", null, "Exiting Thread "+Thread.currentThread().getId()+" JD Finished Processing");
-							System.out.println("Exiting Thred DriverState=Ended");
+							System.out.println("Exiting Thread DriverState=Ended");
 							break; // the JD completed. Exit the thread
 						}
 						// There is no CAS. It looks like the JD CR is done but there
@@ -307,6 +312,7 @@ public class HttpWorkerThread implements
 						@SuppressWarnings("unchecked")
 						List<Properties> metrics = (List<Properties>) 
 								uimaProcessor.process(transaction.getMetaCas().getUserSpaceCas());
+	                    logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
 						
 						IPerformanceMetrics metricsWrapper =
 								new PerformanceMetrics();
@@ -317,7 +323,9 @@ public class HttpWorkerThread implements
 						transaction.setType(Type.End);
 						command = Type.End.name();
 //						httpClient.post(transaction); // Work Item Processed - End
-						httpClient.execute(transaction); // Work Item Processed - End
+						httpClient.execute(transaction, postMethod); // Work Item Processed - End
+	                    logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" sent END");
+
 					}
 				} catch( SocketTimeoutException e) {
 					duccComponent.getLogger().warn("run", null, "Timed Out While Awaiting Response from JD for "+command+" Request - Retrying ...");