You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/15 15:21:53 UTC
svn commit: r1645665 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp:
DuccHttpClient.java HttpWorkerThread.java
Author: cwiklik
Date: Mon Dec 15 14:21:53 2014
New Revision: 1645665
URL: http://svn.apache.org/r1645665
Log:
UIMA-4066 removed synchronization. Each thread has its own PostMethod instance. Added support for virtual nodes
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1645665&r1=1645664&r2=1645665&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Mon Dec 15 14:21:53 2014
@@ -50,13 +50,14 @@ import org.apache.http.protocol.RequestT
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.util.EntityUtils;
import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.XStreamUtils;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
public class DuccHttpClient {
-
+ DuccLogger logger = new DuccLogger(DuccHttpClient.class);
HttpRequestExecutor httpexecutor = null;
ConnectionReuseStrategy connStrategy = null;
HttpCoreContext coreContext = null;
@@ -64,8 +65,6 @@ public class DuccHttpClient {
BasicConnPool connPool = null;
HttpHost host = null;
String target = null;
-// String hostIP = "";
-// String hostname = "";
NodeIdentity nodeIdentity;
String pid = "";
ReentrantLock lock = new ReentrantLock();
@@ -73,7 +72,8 @@ public class DuccHttpClient {
// New --------------------
HttpClient httpClient = null;
- PostMethod postMethod;
+ //PostMethod postMethod;
+ String jdUrl;
public void setTimeout( int timeout) {
this.timeout = timeout;
@@ -83,13 +83,16 @@ public class DuccHttpClient {
connPool.setDefaultMaxPerRoute(scaleout);
connPool.setMaxPerRoute(host, scaleout);
}
+ public String getJdUrl() {
+ return jdUrl;
+ }
public void initialize(String jdUrl) throws Exception {
- postMethod = new PostMethod(jdUrl);
- pid = getProcessIP("N/A");
+ // postMethod = new PostMethod(jdUrl);
+ this.jdUrl = jdUrl;
+ pid = getProcessIP("N/A");
nodeIdentity = new NodeIdentity();
MultiThreadedHttpConnectionManager cMgr =
new MultiThreadedHttpConnectionManager();
-
httpClient =
new HttpClient(cMgr);
@@ -153,25 +156,39 @@ public class DuccHttpClient {
}
return fallback;
}
+ private String getIP() {
+ String ip =nodeIdentity.getIp();
+ if ( System.getenv("IP") != null) {
+ ip = System.getenv("IP");
+ }
+ return ip;
+ }
+ private String getNodeName() {
+ String nn =nodeIdentity.getName();
+ if ( System.getenv("NodeName") != null) {
+ nn = System.getenv("NodeName");
+ }
+ return nn;
+ }
private void addCommonHeaders( BasicHttpRequest request ) {
- request.setHeader("IP", nodeIdentity.getIp());
- request.setHeader("Hostname", nodeIdentity.getName());
+ request.setHeader("IP", getIP());
+ request.setHeader("Hostname", getNodeName());
request.setHeader("ThreadID",
String.valueOf(Thread.currentThread().getId()));
request.setHeader("PID", pid);
}
private void addCommonHeaders( IMetaCasTransaction transaction ) {
- transaction.setRequesterAddress(nodeIdentity.getIp());
- transaction.setRequesterName(nodeIdentity.getName());
+ transaction.setRequesterAddress(getIP());
+ transaction.setRequesterName(getNodeName());
transaction.setRequesterProcessId(Integer.valueOf(pid));
transaction.setRequesterThreadId((int)Thread.currentThread().getId());
}
private void addCommonHeaders( PostMethod method ) {
synchronized( DuccHttpClient.class) {
- method.setRequestHeader("IP", nodeIdentity.getIp());
- method.setRequestHeader("Hostname", nodeIdentity.getName());
+ method.setRequestHeader("IP", getIP());
+ method.setRequestHeader("Hostname", getNodeName());
method.setRequestHeader("ThreadID",
String.valueOf(Thread.currentThread().getId()));
method.setRequestHeader("PID", pid);
@@ -263,7 +280,7 @@ public class DuccHttpClient {
}
- public synchronized IMetaCasTransaction execute( IMetaCasTransaction transaction ) throws Exception {
+ public IMetaCasTransaction execute( IMetaCasTransaction transaction, PostMethod postMethod ) throws Exception {
int retry = 2;
Exception lastError = null;
IMetaCasTransaction reply=null;
@@ -275,22 +292,18 @@ public class DuccHttpClient {
try {
// Serialize request object to XML
String body = XStreamUtils.marshall(transaction);
- System.out.println("Body Length:"+body.length());
RequestEntity e = new StringRequestEntity(body,"application/xml","UTF-8" );
postMethod.setRequestEntity(e);
- System.out.println("Entity Body Length:"+postMethod.getRequestEntity().getContentLength());
- //addCommonHeaders(postMethod);
+ addCommonHeaders(postMethod);
postMethod.setRequestHeader("Content-Length", String.valueOf(body.length()));
// wait for a reply
httpClient.executeMethod(postMethod);
String responseData = postMethod.getResponseBodyAsString();
if ( postMethod.getStatusLine().getStatusCode() != 200) {
- System.out.println("Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
+ logger.error("execute", null, "Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
}
- System.out.println("<< Response: "+ postMethod.getStatusLine());
-// String responseData = EntityUtils.toString(postMethod.getEntity());
- System.out.println(responseData);
+ logger.info("execute", null, "JD Reply Status:"+postMethod.getStatusLine());
Object o = XStreamUtils.unmarshall(responseData);
if ( o instanceof IMetaCasTransaction) {
reply = (MetaCasTransaction)o;
@@ -300,10 +313,9 @@ public class DuccHttpClient {
}
} catch( Exception t) {
lastError = t;
- t.printStackTrace();
+ logger.error("run", null, t);
}
finally {
- System.out.println("==============");
postMethod.releaseConnection();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1645665&r1=1645664&r2=1645665&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Mon Dec 15 14:21:53 2014
@@ -25,6 +25,7 @@ import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.XStreamUtils;
import org.apache.uima.ducc.container.jp.JobProcessManager;
@@ -37,11 +38,11 @@ import org.apache.uima.ducc.container.ne
import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
public class HttpWorkerThread implements Runnable {
+ DuccLogger logger = new DuccLogger(HttpWorkerThread.class);
DuccHttpClient httpClient = null;
private IUimaProcessor uimaProcessor;
private JobProcessComponent duccComponent;
static AtomicInteger counter = new AtomicInteger();
- private DuccLogger logger;
private Object monitor = new Object();
private CountDownLatch workerThreadCount = null;
private JobProcessManager jobProcessManager = null;
@@ -237,7 +238,6 @@ public class HttpWorkerThread implements
JobProcessManager jobProcessManager , CountDownLatch workerThreadCount) {
this.duccComponent = component;
this.httpClient = httpClient;
- //this.uimaProcessor = processor;
this.jobProcessManager = jobProcessManager;
this.workerThreadCount = workerThreadCount;
}
@@ -257,6 +257,9 @@ public class HttpWorkerThread implements
public void run() {
try {
initialize(duccComponent.isUimaASJob());
+ // each thread needs its own PostMethod
+ PostMethod postMethod = new PostMethod(httpClient.getJdUrl());
+
//States stateMachine = new States(States.Start);
// SMContext ctx = new SMContextImpl(httpClient, States.Start);
String command="";
@@ -272,13 +275,15 @@ public class HttpWorkerThread implements
transaction.setType(Type.Get); // Tell JD you want a CAS
command = Type.Get.name();
// transaction = httpClient.post(transaction);
- transaction = httpClient.execute(transaction);
-
- // Confirm receipt of the CAS.
+ transaction = httpClient.execute(transaction, postMethod);
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd New WI:"+transaction.getMetaCas().getSystemKey());
+ System.out.println("Thread:"+Thread.currentThread().getId()+" Recv'd New WI:"+transaction.getMetaCas().getSystemKey());
+ // Confirm receipt of the CAS.
transaction.setType(Type.Ack);
command = Type.Ack.name();
/// httpClient.post(transaction); // Ready to process
- httpClient.execute(transaction); // Ready to process
+ httpClient.execute(transaction, postMethod); // Ready to process
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Sent ACK");
// if the JD did not provide a CAS, most likely the CR is
// done. In such case, reduce frequency of Get requests
@@ -290,7 +295,7 @@ public class HttpWorkerThread implements
// been processed and accounted for
if ( transaction.getJdState().equals(JdState.Ended) ) {
duccComponent.getLogger().warn("run", null, "Exiting Thread "+Thread.currentThread().getId()+" JD Finished Processing");
- System.out.println("Exiting Thred DriverState=Ended");
+ System.out.println("Exiting Thread DriverState=Ended");
break; // the JD completed. Exit the thread
}
// There is no CAS. It looks like the JD CR is done but there
@@ -307,6 +312,7 @@ public class HttpWorkerThread implements
@SuppressWarnings("unchecked")
List<Properties> metrics = (List<Properties>)
uimaProcessor.process(transaction.getMetaCas().getUserSpaceCas());
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
IPerformanceMetrics metricsWrapper =
new PerformanceMetrics();
@@ -317,7 +323,9 @@ public class HttpWorkerThread implements
transaction.setType(Type.End);
command = Type.End.name();
// httpClient.post(transaction); // Work Item Processed - End
- httpClient.execute(transaction); // Work Item Processed - End
+ httpClient.execute(transaction, postMethod); // Work Item Processed - End
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" sent END");
+
}
} catch( SocketTimeoutException e) {
duccComponent.getLogger().warn("run", null, "Timed Out While Awaiting Response from JD for "+command+" Request - Retrying ...");