You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/12 18:14:41 UTC

svn commit: r1644980 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp: DuccHttpClient.java HttpWorkerThread.java JobProcessComponent.java

Author: cwiklik
Date: Fri Dec 12 17:14:41 2014
New Revision: 1644980

URL: http://svn.apache.org/r1644980
Log:
UIMA-4066 simplified HTTPClient related functionality

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1644980&r1=1644979&r2=1644980&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Fri Dec 12 17:14:41 2014
@@ -23,6 +23,11 @@ import java.net.InetAddress;
 import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
 import org.apache.http.ConnectionReuseStrategy;
 import org.apache.http.HttpClientConnection;
 import org.apache.http.HttpEntity;
@@ -66,6 +71,10 @@ public class DuccHttpClient {
 	ReentrantLock lock = new ReentrantLock();
 	int timeout;
 	
+	// New --------------------
+    HttpClient httpClient = new HttpClient(new MultiThreadedHttpConnectionManager());
+    PostMethod postMethod;
+	
 	public void setTimeout( int timeout) {
 		this.timeout = timeout;
 	}
@@ -74,6 +83,11 @@ public class DuccHttpClient {
 		connPool.setDefaultMaxPerRoute(scaleout);
 		connPool.setMaxPerRoute(host, scaleout);
 	}
+	public void initialize(String jdUrl) throws Exception {
+        postMethod = new PostMethod(jdUrl);
+        pid = getProcessIP("N/A");
+		nodeIdentity = new NodeIdentity();
+	}
 	public void intialize(String url, int port, String application)
 			throws Exception {
 		target = application;
@@ -147,6 +161,16 @@ public class DuccHttpClient {
     	transaction.setRequesterProcessId(Integer.valueOf(pid));
     	transaction.setRequesterThreadId((int)Thread.currentThread().getId());
     }
+    
+    private void addCommonHeaders( PostMethod method ) {
+    	method.setRequestHeader("IP", nodeIdentity.getIp());
+    	method.setRequestHeader("Hostname", nodeIdentity.getName());
+    	method.setRequestHeader("ThreadID",
+				String.valueOf(Thread.currentThread().getId()));
+    	method.setRequestHeader("PID", pid);
+		
+    }
+
 	public IMetaCasTransaction get(IMetaCasTransaction transaction) throws Exception {
 		// According to HTTP spec, GET request should not include the body. We need
 		// to send in body to the JD so use POST
@@ -168,6 +192,7 @@ public class DuccHttpClient {
 		int retry = 2;
 		Exception lastError = null;
 		IMetaCasTransaction reply=null;
+
 		while( retry-- > 0 ) {
 			try {
 				// Get the connection from the pool
@@ -209,6 +234,7 @@ public class DuccHttpClient {
 				}
 			} catch( Exception t) {
 				lastError = t;
+				t.printStackTrace();
 			}
 			finally {
 				System.out.println("==============");
@@ -228,4 +254,62 @@ public class DuccHttpClient {
 		} 
 	}
 
+	
+	public IMetaCasTransaction execute( IMetaCasTransaction transaction ) throws Exception {
+		int retry = 2;
+		Exception lastError = null;
+		IMetaCasTransaction reply=null;
+
+		addCommonHeaders(transaction);
+		transaction.setDirection(Direction.Request);
+		
+		while( retry-- > 0 ) {
+			try {
+				// Serialize request object to XML
+				String body = XStreamUtils.marshall(transaction);
+				System.out.println("Body Length:"+body.length());
+	            RequestEntity e = new StringRequestEntity(body,"application/xml","UTF-8" );
+	            postMethod.setRequestEntity(e);
+	            System.out.println("Entity Body Length:"+postMethod.getRequestEntity().getContentLength());
+	            addCommonHeaders(postMethod);
+	            postMethod.setRequestHeader("Content-Length", String.valueOf(body.length()));
+	            // wait for a reply
+	            httpClient.executeMethod(postMethod);
+                String responseData = postMethod.getResponseBodyAsString();	            
+				if ( postMethod.getStatusLine().getStatusCode() != 200) {
+					System.out.println("Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
+					throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
+				}
+				System.out.println("<< Response: "+ postMethod.getStatusLine());
+//				String responseData = EntityUtils.toString(postMethod.getEntity());
+				System.out.println(responseData);
+				Object o = XStreamUtils.unmarshall(responseData);
+				if ( o instanceof IMetaCasTransaction) {
+					reply = (MetaCasTransaction)o;
+					break;
+				} else {
+					throw new InvalidClassException("Expected IMetaCasTransaction - Instead Received "+o.getClass().getName());
+				}
+			} catch( Exception t) {
+				lastError = t;
+				t.printStackTrace();
+			}
+			finally {
+				System.out.println("==============");
+				postMethod.releaseConnection();
+			}
+			
+		}
+		if ( reply != null ) {
+			return reply;
+		} else {
+			if ( lastError != null ){
+				throw lastError;
+
+			} else {
+				throw new RuntimeException("Shouldn't happen ");
+			}
+		} 
+	}
+	
 }
\ No newline at end of file

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1644980&r1=1644979&r2=1644980&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Fri Dec 12 17:14:41 2014
@@ -271,12 +271,14 @@ public class HttpWorkerThread implements
 					// do a POST instead of a GET.
 					transaction.setType(Type.Get);  // Tell JD you want a CAS
 					command = Type.Get.name();
-					transaction = httpClient.post(transaction);
+//					transaction = httpClient.post(transaction);
+					transaction = httpClient.execute(transaction);
                     
 					// Confirm receipt of the CAS. 
 					transaction.setType(Type.Ack);
 					command = Type.Ack.name();
-					httpClient.post(transaction); // Ready to process
+///					httpClient.post(transaction); // Ready to process
+					httpClient.execute(transaction); // Ready to process
 					
 					// if the JD did not provide a CAS, most likely the CR is
 					// done. In such case, reduce frequency of Get requests
@@ -314,7 +316,8 @@ public class HttpWorkerThread implements
 						transaction.getMetaCas().setUserSpaceCas(null);
 						transaction.setType(Type.End);
 						command = Type.End.name();
-						httpClient.post(transaction); // Work Item Processed - End
+//						httpClient.post(transaction); // Work Item Processed - End
+						httpClient.execute(transaction); // Work Item Processed - End
 					}
 				} catch( SocketTimeoutException e) {
 					duccComponent.getLogger().warn("run", null, "Timed Out While Awaiting Response from JD for "+command+" Request - Retrying ...");

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1644980&r1=1644979&r2=1644980&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Fri Dec 12 17:14:41 2014
@@ -175,7 +175,8 @@ public class JobProcessComponent extends
 
 					// initialize http client. It tests the connection and fails
 					// if unable to connect
-					client.intialize(host, Integer.valueOf(port), target);
+//					client.intialize(host, Integer.valueOf(port), target);
+					client.initialize(jdURL);
 					logger.info("start", null,"The JP Connected To JD Using URL "+jdURL);
 				} catch( Exception ee ) {
 					if ( ee.getCause() != null && ee instanceof java.net.ConnectException ) {
@@ -199,7 +200,7 @@ public class JobProcessComponent extends
 
 				// initialize http client
 				client.setTimeout(timeout);
-				client.setScaleout(scaleout);//uimaProcessor.getScaleout());
+//				client.setScaleout(scaleout);//uimaProcessor.getScaleout());
 				
 		    	// pipelines deployed and initialized. This process is Ready
 		    	currentState = ProcessState.Running;