You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/12 18:14:41 UTC
svn commit: r1644980 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp:
DuccHttpClient.java HttpWorkerThread.java JobProcessComponent.java
Author: cwiklik
Date: Fri Dec 12 17:14:41 2014
New Revision: 1644980
URL: http://svn.apache.org/r1644980
Log:
UIMA-4066 simplified HTTPClient related functionality
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1644980&r1=1644979&r2=1644980&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Fri Dec 12 17:14:41 2014
@@ -23,6 +23,11 @@ import java.net.InetAddress;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpClientConnection;
import org.apache.http.HttpEntity;
@@ -66,6 +71,10 @@ public class DuccHttpClient {
ReentrantLock lock = new ReentrantLock();
int timeout;
+ // New --------------------
+ HttpClient httpClient = new HttpClient(new MultiThreadedHttpConnectionManager());
+ PostMethod postMethod;
+
public void setTimeout( int timeout) {
this.timeout = timeout;
}
@@ -74,6 +83,11 @@ public class DuccHttpClient {
connPool.setDefaultMaxPerRoute(scaleout);
connPool.setMaxPerRoute(host, scaleout);
}
+ public void initialize(String jdUrl) throws Exception {
+ postMethod = new PostMethod(jdUrl);
+ pid = getProcessIP("N/A");
+ nodeIdentity = new NodeIdentity();
+ }
public void intialize(String url, int port, String application)
throws Exception {
target = application;
@@ -147,6 +161,16 @@ public class DuccHttpClient {
transaction.setRequesterProcessId(Integer.valueOf(pid));
transaction.setRequesterThreadId((int)Thread.currentThread().getId());
}
+
+ private void addCommonHeaders( PostMethod method ) {
+ method.setRequestHeader("IP", nodeIdentity.getIp());
+ method.setRequestHeader("Hostname", nodeIdentity.getName());
+ method.setRequestHeader("ThreadID",
+ String.valueOf(Thread.currentThread().getId()));
+ method.setRequestHeader("PID", pid);
+
+ }
+
public IMetaCasTransaction get(IMetaCasTransaction transaction) throws Exception {
// According to HTTP spec, GET request should not include the body. We need
// to send in body to the JD so use POST
@@ -168,6 +192,7 @@ public class DuccHttpClient {
int retry = 2;
Exception lastError = null;
IMetaCasTransaction reply=null;
+
while( retry-- > 0 ) {
try {
// Get the connection from the pool
@@ -209,6 +234,7 @@ public class DuccHttpClient {
}
} catch( Exception t) {
lastError = t;
+ t.printStackTrace();
}
finally {
System.out.println("==============");
@@ -228,4 +254,62 @@ public class DuccHttpClient {
}
}
+
+ public IMetaCasTransaction execute( IMetaCasTransaction transaction ) throws Exception {
+ int retry = 2;
+ Exception lastError = null;
+ IMetaCasTransaction reply=null;
+
+ addCommonHeaders(transaction);
+ transaction.setDirection(Direction.Request);
+
+ while( retry-- > 0 ) {
+ try {
+ // Serialize request object to XML
+ String body = XStreamUtils.marshall(transaction);
+ System.out.println("Body Length:"+body.length());
+ RequestEntity e = new StringRequestEntity(body,"application/xml","UTF-8" );
+ postMethod.setRequestEntity(e);
+ System.out.println("Entity Body Length:"+postMethod.getRequestEntity().getContentLength());
+ addCommonHeaders(postMethod);
+ postMethod.setRequestHeader("Content-Length", String.valueOf(body.length()));
+ // wait for a reply
+ httpClient.executeMethod(postMethod);
+ String responseData = postMethod.getResponseBodyAsString();
+ if ( postMethod.getStatusLine().getStatusCode() != 200) {
+ System.out.println("Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
+ throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
+ }
+ System.out.println("<< Response: "+ postMethod.getStatusLine());
+// String responseData = EntityUtils.toString(postMethod.getEntity());
+ System.out.println(responseData);
+ Object o = XStreamUtils.unmarshall(responseData);
+ if ( o instanceof IMetaCasTransaction) {
+ reply = (MetaCasTransaction)o;
+ break;
+ } else {
+ throw new InvalidClassException("Expected IMetaCasTransaction - Instead Received "+o.getClass().getName());
+ }
+ } catch( Exception t) {
+ lastError = t;
+ t.printStackTrace();
+ }
+ finally {
+ System.out.println("==============");
+ postMethod.releaseConnection();
+ }
+
+ }
+ if ( reply != null ) {
+ return reply;
+ } else {
+ if ( lastError != null ){
+ throw lastError;
+
+ } else {
+ throw new RuntimeException("Shouldn't happen ");
+ }
+ }
+ }
+
}
\ No newline at end of file
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1644980&r1=1644979&r2=1644980&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Fri Dec 12 17:14:41 2014
@@ -271,12 +271,14 @@ public class HttpWorkerThread implements
// do a POST instead of a GET.
transaction.setType(Type.Get); // Tell JD you want a CAS
command = Type.Get.name();
- transaction = httpClient.post(transaction);
+// transaction = httpClient.post(transaction);
+ transaction = httpClient.execute(transaction);
// Confirm receipt of the CAS.
transaction.setType(Type.Ack);
command = Type.Ack.name();
- httpClient.post(transaction); // Ready to process
+/// httpClient.post(transaction); // Ready to process
+ httpClient.execute(transaction); // Ready to process
// if the JD did not provide a CAS, most likely the CR is
// done. In such case, reduce frequency of Get requests
@@ -314,7 +316,8 @@ public class HttpWorkerThread implements
transaction.getMetaCas().setUserSpaceCas(null);
transaction.setType(Type.End);
command = Type.End.name();
- httpClient.post(transaction); // Work Item Processed - End
+// httpClient.post(transaction); // Work Item Processed - End
+ httpClient.execute(transaction); // Work Item Processed - End
}
} catch( SocketTimeoutException e) {
duccComponent.getLogger().warn("run", null, "Timed Out While Awaiting Response from JD for "+command+" Request - Retrying ...");
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1644980&r1=1644979&r2=1644980&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Fri Dec 12 17:14:41 2014
@@ -175,7 +175,8 @@ public class JobProcessComponent extends
// initialize http client. It tests the connection and fails
// if unable to connect
- client.intialize(host, Integer.valueOf(port), target);
+// client.intialize(host, Integer.valueOf(port), target);
+ client.initialize(jdURL);
logger.info("start", null,"The JP Connected To JD Using URL "+jdURL);
} catch( Exception ee ) {
if ( ee.getCause() != null && ee instanceof java.net.ConnectException ) {
@@ -199,7 +200,7 @@ public class JobProcessComponent extends
// initialize http client
client.setTimeout(timeout);
- client.setScaleout(scaleout);//uimaProcessor.getScaleout());
+// client.setScaleout(scaleout);//uimaProcessor.getScaleout());
// pipelines deployed and initialized. This process is Ready
currentState = ProcessState.Running;