You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2017/07/07 17:52:35 UTC

svn commit: r1801200 - in /uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp: DuccHttpClient.java HttpWorkerThread.java JobProcessComponent.java

Author: cwiklik
Date: Fri Jul  7 17:52:35 2017
New Revision: 1801200

URL: http://svn.apache.org/viewvc?rev=1801200&view=rev
Log:
UIMA-5469 Modified to detect lost connection and begin recovery until a new good connection is made.

Modified:
    uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
    uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java

Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1801200&r1=1801199&r2=1801200&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Fri Jul  7 17:52:35 2017
@@ -25,69 +25,53 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.http.ConnectionReuseStrategy;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpResponse;
+import org.apache.http.NoHttpResponseException;
 import org.apache.http.StatusLine;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.HttpHostConnectException;
 import org.apache.http.conn.routing.HttpRoute;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.impl.pool.BasicConnPool;
 import org.apache.http.impl.pool.BasicPoolEntry;
-import org.apache.http.protocol.HttpCoreContext;
-import org.apache.http.protocol.HttpProcessor;
-import org.apache.http.protocol.HttpProcessorBuilder;
-import org.apache.http.protocol.HttpRequestExecutor;
-import org.apache.http.protocol.RequestConnControl;
-import org.apache.http.protocol.RequestContent;
-import org.apache.http.protocol.RequestTargetHost;
-import org.apache.http.protocol.RequestUserAgent;
 import org.apache.http.util.EntityUtils;
 import org.apache.uima.ducc.common.IDuccUser;
 import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.XStreamUtils;
 import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
-import org.apache.uima.ducc.container.net.iface.IPerformanceMetrics;
 import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
 import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
+import org.apache.uima.ducc.container.net.iface.IPerformanceMetrics;
 import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
 import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
 import org.apache.uima.ducc.container.net.impl.TransactionId;
 
 public class DuccHttpClient {
-	DuccLogger logger = new DuccLogger(DuccHttpClient.class);
-	HttpRequestExecutor httpexecutor = null;
-	ConnectionReuseStrategy connStrategy = null;
-	HttpCoreContext coreContext = null;
-	HttpProcessor httpproc = null;
-	BasicConnPool connPool = null;
-	HttpHost host = null;
-	String target = null;
-	NodeIdentity nodeIdentity;
-	String pid = "";
-	ReentrantLock lock = new ReentrantLock();
-	int timeout;
-	
-	// New --------------------
-    HttpClient httpClient = null;
-	String jdUrl;
-	PoolingHttpClientConnectionManager cMgr = null;
-
-	//MultiThreadedHttpConnectionManager cMgr = null;
-	 int ClientMaxConnections = 0;
-     int ClientMaxConnectionsPerRoute = 0;
-     int ClientMaxConnectionsPerHostPort = 0;
+	private DuccLogger logger = new DuccLogger(DuccHttpClient.class);
+	private JobProcessComponent duccComponent;
+	private BasicConnPool connPool = null;
+	private HttpHost host = null;
+	private NodeIdentity nodeIdentity;
+	private String pid = "";
+	private ReentrantLock lock = new ReentrantLock();
+	private HttpClient httpClient = null;
+	private String jdUrl;
+	private PoolingHttpClientConnectionManager cMgr = null;
+
+	private int ClientMaxConnections = 0;
+	private int ClientMaxConnectionsPerRoute = 0;
+	private int ClientMaxConnectionsPerHostPort = 0;
      
-	public void setTimeout( int timeout) {
-		this.timeout = timeout;
-	}
+    public DuccHttpClient(JobProcessComponent duccComponent) {
+    	 this.duccComponent = duccComponent;
+    }
 	public void setScaleout(int scaleout) {
 		connPool.setMaxTotal(scaleout);
 		connPool.setDefaultMaxPerRoute(scaleout);
@@ -135,39 +119,16 @@ public class DuccHttpClient {
 			cMgr.shutdown();
 		}
 	}
-	public void intialize(String url, int port, String application)
-			throws Exception {
-		target = application;
-		httpproc = HttpProcessorBuilder.create().add(new RequestContent())
-				.add(new RequestTargetHost()).add(new RequestConnControl())
-				.add(new RequestUserAgent("Test/1.1"))
-				.add(new org.apache.http.protocol.RequestExpectContinue(true))
-				.build();
-		
-		httpexecutor = new HttpRequestExecutor();
-
-		coreContext = HttpCoreContext.create();
-		host = new HttpHost(url, port);
-		coreContext.setTargetHost(host);
-		connPool = new BasicConnPool();
-		connStrategy = new DefaultConnectionReuseStrategy();
-		pid = getProcessIP("N/A");
-		nodeIdentity = new NodeIdentity();
-		
-		// test connection to the JD
-		testConnection();
-		System.out.println("HttpClient Initialized");
-	}
-	public void testConnection() throws Exception {
-		// test connection to the JD
-	    Future<BasicPoolEntry> future = connPool.lease(host,  null);
-		BasicPoolEntry poolEntry = null;
-		try {
-			poolEntry= future.get();
-		} finally {
-			connPool.release(poolEntry, true);
-		}
-	}
+//	public void testConnection() throws Exception {
+//		// test connection to the JD
+//	    Future<BasicPoolEntry> future = connPool.lease(host,  null);
+//		BasicPoolEntry poolEntry = null;
+//		try {
+//			poolEntry= future.get();
+//		} finally {
+//			connPool.release(poolEntry, true);
+//		}
+//	}
 	public void close() {
     	try {
         //	conn.close();
@@ -248,49 +209,48 @@ public class DuccHttpClient {
 		addCommonHeaders(transaction);
 		transaction.setDirection(Direction.Request);
 		
-//		while( retry-- > 0 ) {
-			try {
+		try {
 				// Serialize request object to XML
 				String body = XStreamUtils.marshall(transaction);
-	            //HttpEntity e = new StringEntity(body,"application/xml","UTF-8" );
 	            HttpEntity e = new StringEntity(body,ContentType.APPLICATION_XML); //, "application/xml","UTF-8" );
 	         
 	            postMethod.setEntity(e);
 	            
 	            addCommonHeaders(postMethod);
 	    
-	           //postMethod.setRequestHeader("Content-Length", String.valueOf(body.length()));
 	            logger.debug("execute",null, "calling httpClient.executeMethod()");
-	            // wait for a reply
-	            //httpClient.executeMethod(postMethod);
-	            HttpResponse response = httpClient.execute(postMethod);
-	            
-	            
-	            
+	            // wait for a reply. When connection fails, retry indefinitely
+	            HttpResponse response = null;
+	            try {
+	            	 response = httpClient.execute(postMethod);
+	            } catch( HttpHostConnectException ex) {
+	            	// Lost connection to the Task Allocation App
+	            	// Block until connection is restored
+	            	response = retryUntilSuccessfull(transaction, postMethod);
+	            } catch( NoHttpResponseException ex ) {
+	            	// Lost connection to the Task Allocation App
+	            	// Block until connection is restored
+	            	response = retryUntilSuccessfull(transaction, postMethod);
+	            	
+	            }
+	            // we may have blocked in retryUntilSuccessfull while this process
+	            // was told to stop
+	            if ( !duccComponent.isRunning() ) {
+	            	return null;
+	            }
 	            logger.debug("execute",null, "httpClient.executeMethod() returned");
 	            HttpEntity entity = response.getEntity();
                 String content = EntityUtils.toString(entity);
                 StatusLine statusLine = response.getStatusLine();
                 if ( statusLine.getStatusCode() != 200) {
-                        logger.error("execute", null, "Unable to Communicate with JD - Error:"+statusLine);
-                        logger.error("execute", null, "Content causing error:"+content);
-                        System.out.println("Thread::"+Thread.currentThread().getId()+" ERRR::Content causing error:"+content);
-                        throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+statusLine);
+                    logger.error("execute", null, "Unable to Communicate with JD - Error:"+statusLine);
+                    logger.error("execute", null, "Content causing error:"+content);
+                    System.out.println("Thread::"+Thread.currentThread().getId()+" ERRR::Content causing error:"+content);
+                    throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+statusLine);
                 }
                 logger.debug("execute", null, "Thread:"+Thread.currentThread().getId()+" JD Reply Status:"+statusLine);
                 logger.debug("execute", null, "Thread:"+Thread.currentThread().getId()+" Recv'd:"+content);
 
-               // String content = new String(postMethod.getResponseBody());
-                /*
-				if ( postMethod.getStatusLine().getStatusCode() != 200) {
-					logger.error("execute", null, "Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
-					logger.error("execute", null, "Content causing error:"+postMethod.getResponseBody());
-					System.out.println("Thread::"+Thread.currentThread().getId()+" ERRR::Content causing error:"+postMethod.getResponseBody());
-					throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
-				}
-				logger.debug("execute", null, "Thread:"+Thread.currentThread().getId()+" JD Reply Status:"+postMethod.getStatusLine());
-				logger.debug("execute", null, "Thread:"+Thread.currentThread().getId()+" Recv'd:"+content);
-			*/
 				Object o;
 				try {
 					o = XStreamUtils.unmarshall(content); //sb.toString());
@@ -301,19 +261,15 @@ public class DuccHttpClient {
 				}
 				if ( o instanceof IMetaCasTransaction) {
 					reply = (MetaCasTransaction)o;
-//					break;
 				} else {
 					throw new InvalidClassException("Expected IMetaCasTransaction - Instead Received "+o.getClass().getName());
 				}
-			} catch( Exception t) {
-				lastError = t;
-//				logger.error("run", null, t);
-			}
-			finally {
-				postMethod.releaseConnection();
-			}
-			
-//		}
+		} catch( Exception t) {
+			lastError = t;
+		}
+		finally {
+			postMethod.releaseConnection();
+		}
 		if ( reply != null ) {
 			return reply;
 		} else {
@@ -325,12 +281,43 @@ public class DuccHttpClient {
 			}
 		} 
 	}
+	private HttpResponse retryUntilSuccessfull(IMetaCasTransaction transaction, HttpPost postMethod) throws Exception {
+        HttpResponse response=null;
+		// Only one thread attempts recovery. Other threads will block here
+		// until connection to the remote is restored. 
+   		lock.lock();
+   		logger.error("retryUntilSucessfull", null, "Thread:"+Thread.currentThread().getId()+" - Connection Lost to "+postMethod.getURI()+" - Retrying Until Successfull ...");
+
+         // retry indefinitely
+		while( duccComponent.isRunning() ) {
+       		try {
+       			// retry the command
+       			response = httpClient.execute(postMethod);
+       			logger.error("retryUntilSucessfull", null, "Thread:"+Thread.currentThread().getId()+" Recovered Connection ...");
+       			// success, so release the lock so that other waiting threads
+       			// can retry command
+       		    if ( lock.isHeldByCurrentThread()) {
+        		    lock.unlock();
+    		    }
+
+       			break;
+       			
+       		} catch( HttpHostConnectException exx ) {
+       			// Connection still not available so sleep awhile
+       			System.out.println(Thread.currentThread().getId()+" The Service is not available - sleeping and retrying");
+       			synchronized(postMethod) {
+       				postMethod.wait(duccComponent.getThreadSleepTime());
+       			}
+       		}
+        }
+		return response;
+	}
 	public static void main(String[] args) {
 		try {
 			HttpPost postMethod = new HttpPost(args[0]);
-			DuccHttpClient client = new DuccHttpClient();
+			DuccHttpClient client = new DuccHttpClient(null);
 		//	client.setScaleout(10);
-			client.setTimeout(30000);
+			//client.setTimeout(30000);
 			client.initialize(args[0]);
 			int minor = 0;
 			IMetaCasTransaction transaction = new MetaCasTransaction();

Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1801200&r1=1801199&r2=1801200&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Fri Jul  7 17:52:35 2017
@@ -87,8 +87,9 @@ public class HttpWorkerThread implements
 	    	logger.debug("HttpWorkerThread.run()", null, "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
 			// send a request to JD and wait for a reply
 	    	transaction = httpClient.execute(transaction, postMethod);
+	    	//httpClient.testConnection();
 	        // The JD may not provide a Work Item to process.
-	    	if ( transaction.getMetaCas()!= null) {
+	    	if ( transaction != null && transaction.getMetaCas()!= null) {
 				logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
 				// Confirm receipt of the CAS. 
 				transaction.setType(Type.Ack);
@@ -198,13 +199,22 @@ public class HttpWorkerThread implements
 			
 	   	logger.info("HttpWorkerThread.run()", null, "Begin Processing Work Items - Thread Id:"+Thread.currentThread().getId());
 		try {
+			IMetaCasTransaction transaction=null;
+			int major = 0;
+			int minor = 0;
 			// Enter process loop. Stop this thread on the first process error.
 			while (duccComponent.isRunning()) {  
 
 				try {
-					int major = IdGenerator.addAndGet(1);
-					int minor = 0;
-					IMetaCasTransaction transaction = getWork(postMethod, major, minor);
+					major = IdGenerator.addAndGet(1);
+					minor = 0;
+					// the getWork() may block if connection is lost. 
+					transaction = getWork(postMethod, major, minor);
+					// first check if we are still running
+					if ( !duccComponent.isRunning() ) {
+    					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Process is Stopping - Terminating This Thread");
+    					break;
+					}
 					if ( !logConnectionToJD ) {
 						logConnectionToJD = true;   // reset flag in case we loose connection to JD in the future
 						logger.info("run", null, "T["+Thread.currentThread().getId()+"] - Regained Connection to JD");
@@ -221,9 +231,7 @@ public class HttpWorkerThread implements
 						// do a GET in case JD changes its mind. The JP will
 						// eventually be stopped by the agent
 
-    					// use class level locking to block all but one thread to do retries.
-						// This is done to prevent flooding JD with retry requests 
-						synchronized (HttpWorkerThread.class) {
+ 						synchronized (HttpWorkerThread.class) {
 							while(duccComponent.isRunning() ) {
 								waitAwhile(duccComponent.getThreadSleepTime());
 								// just awoken, check if the JP is still in Running state
@@ -235,6 +243,7 @@ public class HttpWorkerThread implements
 								}
 							}
 						}
+						
 					} 
 					if ( duccComponent.isRunning()) {
 						boolean workItemFailed = false;
@@ -275,7 +284,7 @@ public class HttpWorkerThread implements
 							}
 							
 		                    logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
-							IPerformanceMetrics metricsWrapper =
+							PerformanceMetrics metricsWrapper =
 									new PerformanceMetrics();
 							metricsWrapper.set(metrics);
 							transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
@@ -339,7 +348,14 @@ public class HttpWorkerThread implements
 						}
 
 						httpClient.execute(transaction, postMethod); // Work Item Processed - End
-                    	String wid = null;
+						// the execute() can block while recovering lost connection.
+						// first check if we are still running.
+						if ( !duccComponent.isRunning() ) {
+	    					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Process is Stopping - Terminating This Thread");
+	    					break;
+						}
+
+						String wid = null;
                     	try {
                     		wid = transaction.getMetaCas().getSystemKey();
                     	} catch( Exception e) {
@@ -392,15 +408,7 @@ public class HttpWorkerThread implements
 				} catch( InterruptedException e) {
 					logger.error("run", null, "WorkerThread Interrupted - Terminating Thread "+Thread.currentThread().getId());
 					return;
-				} catch( HttpHostConnectException e) {
-					// Each thread should log once when it looses a connection to its JD. When a connection is recovered
-					// re-enable the flag.
-					if ( logConnectionToJD ) {
-						logConnectionToJD = false;
-						logger.error("run", null, "T["+Thread.currentThread().getId()+"] Lost Conection to JD - Will retry "+maxFrameworkErrors+" times - Failure caused by:\t"+e);
-					}
-				}
-				catch (Exception e ) {
+				} catch (Exception e ) {
 					logger.error("run", null, e);
 					e.printStackTrace();
 					// If max framework error count has been reached 

Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1801200&r1=1801199&r2=1801200&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Fri Jul  7 17:52:35 2017
@@ -276,7 +276,7 @@ implements IJobProcessor{
 				int scaleout = (Integer)initMethod.invoke(processorInstance, props, jpArgs);
 				
 				getLogger().info("start", null,"Ducc JP JobType="+jobType);
-				httpClient = new DuccHttpClient();
+				httpClient = new DuccHttpClient(this);
 				String jdURL="";
 				try {
 					jdURL = System.getProperty(FlagsHelper.Name.JdURL.pname());
@@ -302,7 +302,7 @@ implements IJobProcessor{
 				tpe = Executors.newFixedThreadPool(scaleout, tf);
 
 				// initialize http client's timeout
-				httpClient.setTimeout(timeout);
+				//httpClient.setTimeout(timeout);
 				
 				System.out.println("JMX Connect String:"+ processJmxUrl);
 		    	getLogger().info("start", null, "Starting "+scaleout+" Process Threads - JMX Connect String:"+ processJmxUrl);