You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2017/07/07 17:52:35 UTC
svn commit: r1801200 - in
/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp:
DuccHttpClient.java HttpWorkerThread.java JobProcessComponent.java
Author: cwiklik
Date: Fri Jul 7 17:52:35 2017
New Revision: 1801200
URL: http://svn.apache.org/viewvc?rev=1801200&view=rev
Log:
UIMA-5469 Modified to detect lost connection and begin recovery until a new good connection is made.
Modified:
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1801200&r1=1801199&r2=1801200&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Fri Jul 7 17:52:35 2017
@@ -25,69 +25,53 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
+import org.apache.http.NoHttpResponseException;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.pool.BasicConnPool;
import org.apache.http.impl.pool.BasicPoolEntry;
-import org.apache.http.protocol.HttpCoreContext;
-import org.apache.http.protocol.HttpProcessor;
-import org.apache.http.protocol.HttpProcessorBuilder;
-import org.apache.http.protocol.HttpRequestExecutor;
-import org.apache.http.protocol.RequestConnControl;
-import org.apache.http.protocol.RequestContent;
-import org.apache.http.protocol.RequestTargetHost;
-import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.util.EntityUtils;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.XStreamUtils;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
-import org.apache.uima.ducc.container.net.iface.IPerformanceMetrics;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
+import org.apache.uima.ducc.container.net.iface.IPerformanceMetrics;
import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
import org.apache.uima.ducc.container.net.impl.TransactionId;
public class DuccHttpClient {
- DuccLogger logger = new DuccLogger(DuccHttpClient.class);
- HttpRequestExecutor httpexecutor = null;
- ConnectionReuseStrategy connStrategy = null;
- HttpCoreContext coreContext = null;
- HttpProcessor httpproc = null;
- BasicConnPool connPool = null;
- HttpHost host = null;
- String target = null;
- NodeIdentity nodeIdentity;
- String pid = "";
- ReentrantLock lock = new ReentrantLock();
- int timeout;
-
- // New --------------------
- HttpClient httpClient = null;
- String jdUrl;
- PoolingHttpClientConnectionManager cMgr = null;
-
- //MultiThreadedHttpConnectionManager cMgr = null;
- int ClientMaxConnections = 0;
- int ClientMaxConnectionsPerRoute = 0;
- int ClientMaxConnectionsPerHostPort = 0;
+ private DuccLogger logger = new DuccLogger(DuccHttpClient.class);
+ private JobProcessComponent duccComponent;
+ private BasicConnPool connPool = null;
+ private HttpHost host = null;
+ private NodeIdentity nodeIdentity;
+ private String pid = "";
+ private ReentrantLock lock = new ReentrantLock();
+ private HttpClient httpClient = null;
+ private String jdUrl;
+ private PoolingHttpClientConnectionManager cMgr = null;
+
+ private int ClientMaxConnections = 0;
+ private int ClientMaxConnectionsPerRoute = 0;
+ private int ClientMaxConnectionsPerHostPort = 0;
- public void setTimeout( int timeout) {
- this.timeout = timeout;
- }
+ public DuccHttpClient(JobProcessComponent duccComponent) {
+ this.duccComponent = duccComponent;
+ }
public void setScaleout(int scaleout) {
connPool.setMaxTotal(scaleout);
connPool.setDefaultMaxPerRoute(scaleout);
@@ -135,39 +119,16 @@ public class DuccHttpClient {
cMgr.shutdown();
}
}
- public void intialize(String url, int port, String application)
- throws Exception {
- target = application;
- httpproc = HttpProcessorBuilder.create().add(new RequestContent())
- .add(new RequestTargetHost()).add(new RequestConnControl())
- .add(new RequestUserAgent("Test/1.1"))
- .add(new org.apache.http.protocol.RequestExpectContinue(true))
- .build();
-
- httpexecutor = new HttpRequestExecutor();
-
- coreContext = HttpCoreContext.create();
- host = new HttpHost(url, port);
- coreContext.setTargetHost(host);
- connPool = new BasicConnPool();
- connStrategy = new DefaultConnectionReuseStrategy();
- pid = getProcessIP("N/A");
- nodeIdentity = new NodeIdentity();
-
- // test connection to the JD
- testConnection();
- System.out.println("HttpClient Initialized");
- }
- public void testConnection() throws Exception {
- // test connection to the JD
- Future<BasicPoolEntry> future = connPool.lease(host, null);
- BasicPoolEntry poolEntry = null;
- try {
- poolEntry= future.get();
- } finally {
- connPool.release(poolEntry, true);
- }
- }
+// public void testConnection() throws Exception {
+// // test connection to the JD
+// Future<BasicPoolEntry> future = connPool.lease(host, null);
+// BasicPoolEntry poolEntry = null;
+// try {
+// poolEntry= future.get();
+// } finally {
+// connPool.release(poolEntry, true);
+// }
+// }
public void close() {
try {
// conn.close();
@@ -248,49 +209,48 @@ public class DuccHttpClient {
addCommonHeaders(transaction);
transaction.setDirection(Direction.Request);
-// while( retry-- > 0 ) {
- try {
+ try {
// Serialize request object to XML
String body = XStreamUtils.marshall(transaction);
- //HttpEntity e = new StringEntity(body,"application/xml","UTF-8" );
HttpEntity e = new StringEntity(body,ContentType.APPLICATION_XML); //, "application/xml","UTF-8" );
postMethod.setEntity(e);
addCommonHeaders(postMethod);
- //postMethod.setRequestHeader("Content-Length", String.valueOf(body.length()));
logger.debug("execute",null, "calling httpClient.executeMethod()");
- // wait for a reply
- //httpClient.executeMethod(postMethod);
- HttpResponse response = httpClient.execute(postMethod);
-
-
-
+ // wait for a reply. When connection fails, retry indefinitely
+ HttpResponse response = null;
+ try {
+ response = httpClient.execute(postMethod);
+ } catch( HttpHostConnectException ex) {
+ // Lost connection to the Task Allocation App
+ // Block until connection is restored
+ response = retryUntilSuccessfull(transaction, postMethod);
+ } catch( NoHttpResponseException ex ) {
+ // Lost connection to the Task Allocation App
+ // Block until connection is restored
+ response = retryUntilSuccessfull(transaction, postMethod);
+
+ }
+ // we may have blocked in retryUntilSuccessfull while this process
+ // was told to stop
+ if ( !duccComponent.isRunning() ) {
+ return null;
+ }
logger.debug("execute",null, "httpClient.executeMethod() returned");
HttpEntity entity = response.getEntity();
String content = EntityUtils.toString(entity);
StatusLine statusLine = response.getStatusLine();
if ( statusLine.getStatusCode() != 200) {
- logger.error("execute", null, "Unable to Communicate with JD - Error:"+statusLine);
- logger.error("execute", null, "Content causing error:"+content);
- System.out.println("Thread::"+Thread.currentThread().getId()+" ERRR::Content causing error:"+content);
- throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+statusLine);
+ logger.error("execute", null, "Unable to Communicate with JD - Error:"+statusLine);
+ logger.error("execute", null, "Content causing error:"+content);
+ System.out.println("Thread::"+Thread.currentThread().getId()+" ERRR::Content causing error:"+content);
+ throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+statusLine);
}
logger.debug("execute", null, "Thread:"+Thread.currentThread().getId()+" JD Reply Status:"+statusLine);
logger.debug("execute", null, "Thread:"+Thread.currentThread().getId()+" Recv'd:"+content);
- // String content = new String(postMethod.getResponseBody());
- /*
- if ( postMethod.getStatusLine().getStatusCode() != 200) {
- logger.error("execute", null, "Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
- logger.error("execute", null, "Content causing error:"+postMethod.getResponseBody());
- System.out.println("Thread::"+Thread.currentThread().getId()+" ERRR::Content causing error:"+postMethod.getResponseBody());
- throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
- }
- logger.debug("execute", null, "Thread:"+Thread.currentThread().getId()+" JD Reply Status:"+postMethod.getStatusLine());
- logger.debug("execute", null, "Thread:"+Thread.currentThread().getId()+" Recv'd:"+content);
- */
Object o;
try {
o = XStreamUtils.unmarshall(content); //sb.toString());
@@ -301,19 +261,15 @@ public class DuccHttpClient {
}
if ( o instanceof IMetaCasTransaction) {
reply = (MetaCasTransaction)o;
-// break;
} else {
throw new InvalidClassException("Expected IMetaCasTransaction - Instead Received "+o.getClass().getName());
}
- } catch( Exception t) {
- lastError = t;
-// logger.error("run", null, t);
- }
- finally {
- postMethod.releaseConnection();
- }
-
-// }
+ } catch( Exception t) {
+ lastError = t;
+ }
+ finally {
+ postMethod.releaseConnection();
+ }
if ( reply != null ) {
return reply;
} else {
@@ -325,12 +281,43 @@ public class DuccHttpClient {
}
}
}
+ private HttpResponse retryUntilSuccessfull(IMetaCasTransaction transaction, HttpPost postMethod) throws Exception {
+ HttpResponse response=null;
+ // Only one thread attempts recovery. Other threads will block here
+ // until connection to the remote is restored.
+ lock.lock();
+ logger.error("retryUntilSucessfull", null, "Thread:"+Thread.currentThread().getId()+" - Connection Lost to "+postMethod.getURI()+" - Retrying Until Successfull ...");
+
+ // retry indefinitely
+ while( duccComponent.isRunning() ) {
+ try {
+ // retry the command
+ response = httpClient.execute(postMethod);
+ logger.error("retryUntilSucessfull", null, "Thread:"+Thread.currentThread().getId()+" Recovered Connection ...");
+ // success, so release the lock so that other waiting threads
+ // can retry command
+ if ( lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+
+ break;
+
+ } catch( HttpHostConnectException exx ) {
+ // Connection still not available so sleep awhile
+ System.out.println(Thread.currentThread().getId()+" The Service is not available - sleeping and retrying");
+ synchronized(postMethod) {
+ postMethod.wait(duccComponent.getThreadSleepTime());
+ }
+ }
+ }
+ return response;
+ }
public static void main(String[] args) {
try {
HttpPost postMethod = new HttpPost(args[0]);
- DuccHttpClient client = new DuccHttpClient();
+ DuccHttpClient client = new DuccHttpClient(null);
// client.setScaleout(10);
- client.setTimeout(30000);
+ //client.setTimeout(30000);
client.initialize(args[0]);
int minor = 0;
IMetaCasTransaction transaction = new MetaCasTransaction();
Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1801200&r1=1801199&r2=1801200&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Fri Jul 7 17:52:35 2017
@@ -87,8 +87,9 @@ public class HttpWorkerThread implements
logger.debug("HttpWorkerThread.run()", null, "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
// send a request to JD and wait for a reply
transaction = httpClient.execute(transaction, postMethod);
+ //httpClient.testConnection();
// The JD may not provide a Work Item to process.
- if ( transaction.getMetaCas()!= null) {
+ if ( transaction != null && transaction.getMetaCas()!= null) {
logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
// Confirm receipt of the CAS.
transaction.setType(Type.Ack);
@@ -198,13 +199,22 @@ public class HttpWorkerThread implements
logger.info("HttpWorkerThread.run()", null, "Begin Processing Work Items - Thread Id:"+Thread.currentThread().getId());
try {
+ IMetaCasTransaction transaction=null;
+ int major = 0;
+ int minor = 0;
// Enter process loop. Stop this thread on the first process error.
while (duccComponent.isRunning()) {
try {
- int major = IdGenerator.addAndGet(1);
- int minor = 0;
- IMetaCasTransaction transaction = getWork(postMethod, major, minor);
+ major = IdGenerator.addAndGet(1);
+ minor = 0;
+ // the getWork() may block if connection is lost.
+ transaction = getWork(postMethod, major, minor);
+ // first check if we are still running
+ if ( !duccComponent.isRunning() ) {
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Process is Stopping - Terminating This Thread");
+ break;
+ }
if ( !logConnectionToJD ) {
logConnectionToJD = true; // reset flag in case we loose connection to JD in the future
logger.info("run", null, "T["+Thread.currentThread().getId()+"] - Regained Connection to JD");
@@ -221,9 +231,7 @@ public class HttpWorkerThread implements
// do a GET in case JD changes its mind. The JP will
// eventually be stopped by the agent
- // use class level locking to block all but one thread to do retries.
- // This is done to prevent flooding JD with retry requests
- synchronized (HttpWorkerThread.class) {
+ synchronized (HttpWorkerThread.class) {
while(duccComponent.isRunning() ) {
waitAwhile(duccComponent.getThreadSleepTime());
// just awoken, check if the JP is still in Running state
@@ -235,6 +243,7 @@ public class HttpWorkerThread implements
}
}
}
+
}
if ( duccComponent.isRunning()) {
boolean workItemFailed = false;
@@ -275,7 +284,7 @@ public class HttpWorkerThread implements
}
logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
- IPerformanceMetrics metricsWrapper =
+ PerformanceMetrics metricsWrapper =
new PerformanceMetrics();
metricsWrapper.set(metrics);
transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
@@ -339,7 +348,14 @@ public class HttpWorkerThread implements
}
httpClient.execute(transaction, postMethod); // Work Item Processed - End
- String wid = null;
+ // the execute() can block while recovering lost connection.
+ // first check if we are still running.
+ if ( !duccComponent.isRunning() ) {
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Process is Stopping - Terminating This Thread");
+ break;
+ }
+
+ String wid = null;
try {
wid = transaction.getMetaCas().getSystemKey();
} catch( Exception e) {
@@ -392,15 +408,7 @@ public class HttpWorkerThread implements
} catch( InterruptedException e) {
logger.error("run", null, "WorkerThread Interrupted - Terminating Thread "+Thread.currentThread().getId());
return;
- } catch( HttpHostConnectException e) {
- // Each thread should log once when it looses a connection to its JD. When a connection is recovered
- // re-enable the flag.
- if ( logConnectionToJD ) {
- logConnectionToJD = false;
- logger.error("run", null, "T["+Thread.currentThread().getId()+"] Lost Conection to JD - Will retry "+maxFrameworkErrors+" times - Failure caused by:\t"+e);
- }
- }
- catch (Exception e ) {
+ } catch (Exception e ) {
logger.error("run", null, e);
e.printStackTrace();
// If max framework error count has been reached
Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1801200&r1=1801199&r2=1801200&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Fri Jul 7 17:52:35 2017
@@ -276,7 +276,7 @@ implements IJobProcessor{
int scaleout = (Integer)initMethod.invoke(processorInstance, props, jpArgs);
getLogger().info("start", null,"Ducc JP JobType="+jobType);
- httpClient = new DuccHttpClient();
+ httpClient = new DuccHttpClient(this);
String jdURL="";
try {
jdURL = System.getProperty(FlagsHelper.Name.JdURL.pname());
@@ -302,7 +302,7 @@ implements IJobProcessor{
tpe = Executors.newFixedThreadPool(scaleout, tf);
// initialize http client's timeout
- httpClient.setTimeout(timeout);
+ //httpClient.setTimeout(timeout);
System.out.println("JMX Connect String:"+ processJmxUrl);
getLogger().info("start", null, "Starting "+scaleout+" Process Threads - JMX Connect String:"+ processJmxUrl);