You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/05/01 13:42:39 UTC

svn commit: r1858488 [2/3] - in /uima/uima-ducc/trunk/uima-ducc-pullservice/src: main/java/org/apache/uima/ducc/ps/sd/task/ main/java/org/apache/uima/ducc/ps/sd/task/transport/ main/java/org/apache/uima/ducc/ps/service/protocol/builtin/ main/java/org/a...

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java Wed May  1 13:42:39 2019
@@ -1,20 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*                                                                                                                                                                                                   
+ * Licensed to the Apache Software Foundation (ASF) under one                                                                                                                                        
+ * or more contributor license agreements.  See the NOTICE file                                                                                                                                      
+ * distributed with this work for additional information                                                                                                                                             
+ * regarding copyright ownership.  The ASF licenses this file                                                                                                                                        
+ * to you under the Apache License, Version 2.0 (the                                                                                                                                                 
+ * "License"); you may not use this file except in compliance                                                                                                                                        
+ * with the License.  You may obtain a copy of the License at                                                                                                                                        
+ *                                                                                                                                                                                                   
+ *      http://www.apache.org/licenses/LICENSE-2.0                                                                                                                                                   
+ *                                                                                                                                                                                                   
+ * Unless required by applicable law or agreed to in writing,                                                                                                                                        
+ * software distributed under the License is distributed on an                                                                                                                                       
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY                                                                                                                                            
+ * KIND, either express or implied.  See the License for the                                                                                                                                         
+ * specific language governing permissions and limitations                                                                                                                                           
+ * under the License.                                                                                                                                                                                
 */
 package org.apache.uima.ducc.ps.service.transport.http;
 
@@ -30,6 +30,7 @@ import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.http.HttpEntity;
@@ -61,358 +62,408 @@ import org.apache.uima.ducc.ps.service.u
 import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
 public class HttpServiceTransport implements IServiceTransport {
-	private Logger logger =  UIMAFramework.getLogger(HttpServiceTransport.class);
-	private HttpClient httpClient = null;
-	private PoolingHttpClientConnectionManager cMgr = null;
-	private int clientMaxConnections = 1;
-	private int clientMaxConnectionsPerRoute = 1;
-	private int clientMaxConnectionsPerHostPort = 0;
-	private ReentrantLock registryLookupLock = new ReentrantLock();
-    private long threadSleepTime=1000; // millis
-    private final String nodeIP;
-    private final String nodeName;
-    private final String pid;		
-    private ITargetURI currentTargetUrl = new NoOpTargetURI();
-    private static final  String NA="N/A";
-    private TransportStats stats = new TransportStats();
-    private IRegistryClient registryClient;
-    // holds reference to HttpPost object for every thread. Key=thread id
-    private Map<Long,HttpPost> httpPostMap = 
-    		new HashMap<>();
-    private volatile boolean stopping = false;
-	private volatile boolean running = false;
-	private volatile boolean log = true;
-	
- 	public HttpServiceTransport(IRegistryClient registryClient, int scaleout) throws ServiceException {
-		this.registryClient = registryClient;
-		clientMaxConnections = scaleout;
-
-		
-		if ( Objects.isNull(System.getenv("DUCC_IP")) || Objects.isNull(System.getenv("DUCC_NODENAME"))) {
-			try {
-				nodeIP = InetAddress.getLocalHost().getHostAddress();
-				nodeName=InetAddress.getLocalHost().getCanonicalHostName();
-				
-			} catch( UnknownHostException e) {
-				throw new RuntimeException(new TransportException("HttpServiceTransport.ctor - Unable to determine Host Name and IP",e));
-			}
-			
-		} else {
-			// Use agent provided node identity. This is important when running in a sim mode
-			// where nodes are virtual.
-			nodeIP =  System.getenv("DUCC_IP");
-			nodeName = System.getenv("DUCC_NODENAME");
-
-		}
-		pid = getProcessIP(NA);
-	}
-	private HttpPost getPostMethodForCurrentThread() {
-		HttpPost postMethod;
-		if ( !httpPostMap.containsKey(Thread.currentThread().getId())) {
-			// each thread needs its own PostMethod
-			postMethod =
-			    new HttpPost(currentTargetUrl.asString());
-			httpPostMap.put(Thread.currentThread().getId(),postMethod);
-		} else {
-			postMethod = httpPostMap.get(Thread.currentThread().getId());
-		}
-		return postMethod;
-	}
-	private String getProcessIP(final String fallback) {
-		// the following code returns '<pid>@<hostname>'
-		String name = ManagementFactory.getRuntimeMXBean().getName();
-		int pos = name.indexOf('@');
-
-		if (pos < 1) {
-			// pid not found
-			return fallback;
-		}
-
-		try {
-			return Long.toString(Long.parseLong(name.substring(0, pos)));
-		} catch (NumberFormatException e) {
-			// ignore
-		}
-		return fallback;
-	}
-	private void lookupNewTarget() {
-		registryLookupLock.lock();
-		while( !stopping ) {
-			try {
-				String newTarget = registryClient.lookUp(currentTargetUrl.asString());
-				currentTargetUrl = TargetURIFactory.newTarget(newTarget);
-				break;
-			} catch(  Exception e) {
-				synchronized (httpClient) {
-					
-					try {
-						httpClient.wait(threadSleepTime);
-					} catch( InterruptedException ex) {
-						Thread.currentThread().interrupt();
-						break;
-					}
-				}
-			}
-		}
-		if (registryLookupLock.isHeldByCurrentThread()) {
-			registryLookupLock.unlock();
-		}
-	}
-	public void addRequestorInfo(IMetaTaskTransaction transaction) {
-    	transaction.setRequesterAddress(nodeIP);
-    	transaction.setRequesterNodeName(nodeName);
-    	transaction.setRequesterProcessId(Integer.valueOf(pid));
-    	transaction.setRequesterThreadId((int)Thread.currentThread().getId());
-    	if ( logger.isLoggable(Level.FINE )) {
-        	logger.log(Level.FINE,"ip:"+transaction.getRequesterAddress());
-        	logger.log(Level.FINE, "nodeName:"+transaction.getRequesterNodeName());
-        	logger.log(Level.FINE, "processName:"+transaction.getRequesterProcessName());
-        	logger.log(Level.FINE,"processId:"+transaction.getRequesterProcessId());
-        	logger.log(Level.FINE, "threadId:"+transaction.getRequesterThreadId());
-
-    	}
- 		
-	}
-	public void initialize() throws ServiceInitializationException { 
-		
-		// use plugged in registry to lookup target to connect to.
-		// Sets global: currentTarget
-		lookupNewTarget();
-		
-		cMgr = new PoolingHttpClientConnectionManager();
-
-		if (clientMaxConnections > 0) {
-			cMgr.setMaxTotal(clientMaxConnections);
-		}
-		// Set default max connections per route
-		if (clientMaxConnectionsPerRoute > 0) {
-			cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute);
-		}
-		HttpHost httpHost = new HttpHost(currentTargetUrl.asString(), Integer.valueOf(currentTargetUrl.getPort()), currentTargetUrl.getContext());
-		if (clientMaxConnectionsPerHostPort > 0) {
-			cMgr.setMaxPerRoute(new HttpRoute(httpHost), clientMaxConnectionsPerHostPort);
-		}
-		
-		httpClient = HttpClients.custom().setConnectionManager(cMgr).build();
-		running = true;
-
-	}
-    private void addCommonHeaders( HttpPost method ) {
-    	synchronized( HttpServiceTransport.class ) {
-    		
-    		 method.setHeader("IP", nodeIP);
-             method.setHeader("Hostname", nodeName);
-             method.setHeader("ThreadID",
-                             String.valueOf(Thread.currentThread().getId()));
-             method.setHeader("PID", pid);
-    		
-    	}
-		
-    }
-
-	private HttpEntity wrapRequest(String serializedRequest) {
-		return new StringEntity(serializedRequest, ContentType.APPLICATION_XML);
-	}
-
-	private boolean isRunning() {
-		return running;
-	}
-
-	private IMetaTaskTransaction retryUntilSuccessfull(String request, HttpPost postMethod) {
-		IMetaTaskTransaction response=null;
-
-		// retry until service is stopped
-		while (isRunning()) {
-			try {
-				response =  doPost(postMethod);
-				break;
-
-			} catch (TransportException | IOException | URISyntaxException exx) {
-				try {
-					Thread.sleep(threadSleepTime);
-				} catch( InterruptedException e) {
-					Thread.currentThread().interrupt();
-				}
-			}
-			lookupNewTarget();
-			 
-		}
-		return response;
-		
-	}
-	private IMetaTaskTransaction doPost(HttpPost postMethod) throws URISyntaxException, IOException, TransportException {
-		postMethod.setURI(new URI(currentTargetUrl.asString()));
-
-		IMetaTaskTransaction metaTransaction=null;
-		HttpResponse response = httpClient.execute(postMethod);
-		//		if ( stopping ) {
-		//	throw new TransportException("Service stopping - rejecting request");
-		//}
-		HttpEntity entity = response.getEntity();
-		String serializedResponse = EntityUtils.toString(entity);
-		Object transaction=null;
-		try {
-			transaction = XStreamUtils.unmarshall(serializedResponse);
-		} catch(Exception e) {
-			logger.log(Level.WARNING,"Process Thread:"+Thread.currentThread().getId()+" Error while deserializing response with XStream",e);
-			throw new TransportException(e);
-		}
-		if ( Objects.isNull(transaction)) {
-			throw new InvalidClassException(
-					"Expected IMetaTaskTransaction - Instead Received NULL");
-
-		} else if ( !(transaction instanceof IMetaTaskTransaction) ) { 
-			throw new InvalidClassException(
-					"Expected IMetaTaskTransaction - Instead Received " + transaction.getClass().getName());
-		}
-		metaTransaction = (IMetaTaskTransaction) transaction;
-		
-		StatusLine statusLine = response.getStatusLine();
-		if (statusLine.getStatusCode() != 200 ) {
-			// all IOExceptions are retried
-			throw new IOException(
-					"Unexpected HttpClient response status:"+statusLine+ " Content causing error:"+serializedResponse);
-		}
-
-		stats.incrementSuccessCount();
-		return metaTransaction;
-	}
-	/**
-	 * Dispatches request to remote driver via doPost(). Its synchronized to prevent over-running the driver with
-	 * requests from multiple threads. When the transport fails sending GET/ACK/END a single thread
-	 * will try to recover connection and send the request.
-	 * 
-	 */
-	@Override
-	public synchronized IMetaTaskTransaction dispatch(String serializedRequest) throws TransportException  {
-	    //if ( stopping ) {
-	    //		throw new IllegalStateException("Service transport has been stopped, unable to dispatch request");
-	    //	}
-		IMetaTaskTransaction transaction=null;
-		HttpEntity e = wrapRequest(serializedRequest);
-		// Each thread has its own HttpPost method. If current thread
-		// doesnt have one, it will be created and added to the local
-		// Map. Subsequent requests will fetch it from the map using
-		// current thread ID as a key.
-		HttpPost postMethod = getPostMethodForCurrentThread();
-		addCommonHeaders(postMethod);
-		postMethod.setEntity(e);
-		try {
-			String simulatedException;
-			// To test transport errors add command line option -DMockHttpPostError=exception where
-			// exception is one of the following Strings:
-			//
-			// IOException, 
-			// SocketException, 
-			// UnknownHostException, 
-			// NoRouteToHostException,
-			// NoHttpResponseException, 
-			// HttpHostConnectException, 
-			// URISyntaxException
-			// Use JUnit test JunitTransoirtTestCase to test the above errors
-			
-			if ( ( simulatedException = System.getProperty("MockHttpPostError")) != null ) {
-				HttpClientExceptionGenerator mockExceptionGenerator = 
-						new HttpClientExceptionGenerator(simulatedException);
-				mockExceptionGenerator.throwSimulatedException();
-			} else {
-				transaction = doPost(postMethod);
-			}
-		} catch( IOException | URISyntaxException ex) {
-			if ( stopping ) {
-				// looks like the process is in the shutdown mode. Log an exception and dont retry
-				logger.log(Level.INFO,"Process Thread:"+Thread.currentThread().getId()+" - Process is already stopping - Caught Exception while calling doPost() \n"+ex);
-				throw new TransportException(ex);
-			} else {
-				if ( log ) {
-					log = false;
-					stats.incrementErrorCount();
-					logger.log(Level.WARNING, this.getClass().getName()+".dispatch() >>>>>>>>>> Handling Exception \n"+ex);
-					logger.log(Level.INFO, ">>>>>>>>>> Unable to communicate with target:"+currentTargetUrl.asString()+" - retrying until successfull - with "+threadSleepTime/1000+" seconds wait between retries  ");
-				}
-				transaction = retryUntilSuccessfull(serializedRequest, postMethod);
-				log = true;
-				logger.log(Level.INFO, "Established connection to target:"+currentTargetUrl.asString());
-			}
-
-
-		} finally {
-			postMethod.releaseConnection();
-		}
-		return transaction;
-		
-	}
-	
-	public void stop(boolean quiesce) {
-
-		stopping = true;
-		running = false;
-		// Use System.out since the logger's ShutdownHook may have closed streams
-		System.out.println(Utils.getTimestamp()+">>>>>>> "+Utils.getShortClassname(this.getClass())+" stop() called - mode:"+(quiesce==true?"quiesce":"stop"));
-		logger.log(Level.INFO,this.getClass().getName()+" stop() called");
-		if ( !quiesce && cMgr != null ) {
-			cMgr.shutdown();
-			System.out.println(Utils.getTimestamp()+">>>>>>> "+Utils.getShortClassname(this.getClass())+" stopped connection mgr");
-			logger.log(Level.INFO,this.getClass().getName()+" stopped connection mgr");
-
-		}
-	}
-	public static void main(String[] args) {
-
-	}
-
-	public static class HttpClientExceptionGenerator {
-		public enum ERROR{ IOException, SocketException, UnknownHostException, NoRouteToHostException,NoHttpResponseException, HttpHostConnectException, URISyntaxException};
-		
-		Exception exceptionClass=null;
-		
-		public HttpClientExceptionGenerator(String exc) {
-			
-			for( ERROR e : ERROR.values()) {
-				if ( exc != null && e.name().equals(exc)) {
-					switch(e) {
-					case IOException:
-						exceptionClass = new IOException("Simulated IOException");
-						break;
-					case URISyntaxException:
-						exceptionClass = new URISyntaxException("", "Simulated URISyntaxException");
-						break;
-					case NoRouteToHostException:
-						exceptionClass = new NoRouteToHostException("Simulated NoRouteToHostException");
-						break;
-					case NoHttpResponseException:
-						exceptionClass = new NoHttpResponseException("Simulated NoHttpResponseException");
-						break;	
-					case SocketException:
-						exceptionClass = new SocketException("Simulated SocketException");
-						break;
-					case UnknownHostException:
-						exceptionClass = new UnknownHostException("Simulated UnknownHostException");
-						break;
-						
-					default:
-						
-							
-					}
-					if ( exceptionClass != null ) {
-						break;
-					}
-				}
-			}
-		}
-		public void throwSimulatedException() throws IOException, URISyntaxException {
-			if ( exceptionClass != null ) {
-				if ( exceptionClass instanceof IOException ) {
-					throw (IOException)exceptionClass;
-				} else if ( exceptionClass instanceof URISyntaxException ) {
-					throw (URISyntaxException)exceptionClass;
-				}
-				
-			}
-		}
-		
-		
-	}
+  private Logger logger = UIMAFramework.getLogger(HttpServiceTransport.class);
+
+  private HttpClient httpClient = null;
+
+  private PoolingHttpClientConnectionManager cMgr = null;
+
+  private int clientMaxConnections = 1;
+
+  private int clientMaxConnectionsPerRoute = 1;
+
+  private int clientMaxConnectionsPerHostPort = 0;
+
+  private ReentrantLock registryLookupLock = new ReentrantLock();
+
+  private long threadSleepTime = 1000; // millis
+
+  private final String nodeIP;
+
+  private final String nodeName;
+
+  private final String pid;
+
+  private ITargetURI currentTargetUrl = new NoOpTargetURI();
+
+  private static final String NA = "N/A";
+
+  private TransportStats stats = new TransportStats();
+
+  private IRegistryClient registryClient;
+
+  // holds reference to HttpPost object for every thread. Key=thread id
+  private Map<Long, HttpPost> httpPostMap = new HashMap<>();
+
+  private volatile boolean stopping = false;
+
+  private volatile boolean running = false;
+
+  private volatile boolean log = true;
+
+  private AtomicLong xstreamTime = new AtomicLong();
+  // private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long,
+  // XStream>>() {
+  // @Override
+  // protected HashMap<Long, XStream> initialValue() {
+  // return new HashMap<Long, XStream>();
+  // }
+  // };
+
+  public HttpServiceTransport(IRegistryClient registryClient, int scaleout)
+          throws ServiceException {
+    this.registryClient = registryClient;
+    clientMaxConnections = scaleout;
+
+    try {
+      nodeIP = InetAddress.getLocalHost().getHostAddress();
+      nodeName = InetAddress.getLocalHost().getCanonicalHostName();
+      pid = getProcessIP(NA);
+    } catch (UnknownHostException e) {
+      throw new RuntimeException(new TransportException(
+              "HttpServiceTransport.ctor - Unable to determine Host Name and IP", e));
+    }
+
+  }
+
+  private HttpPost getPostMethodForCurrentThread() {
+    HttpPost postMethod;
+    if (!httpPostMap.containsKey(Thread.currentThread().getId())) {
+      // each thread needs its own PostMethod
+      postMethod = new HttpPost(currentTargetUrl.asString());
+      httpPostMap.put(Thread.currentThread().getId(), postMethod);
+    } else {
+      postMethod = httpPostMap.get(Thread.currentThread().getId());
+    }
+    return postMethod;
+  }
+
+  private String getProcessIP(final String fallback) {
+    // the following code returns '<pid>@<hostname>'
+    String name = ManagementFactory.getRuntimeMXBean().getName();
+    int pos = name.indexOf('@');
+
+    if (pos < 1) {
+      // pid not found
+      return fallback;
+    }
+
+    try {
+      return Long.toString(Long.parseLong(name.substring(0, pos)));
+    } catch (NumberFormatException e) {
+      // ignore
+    }
+    return fallback;
+  }
+
+  private void lookupNewTarget() {
+    registryLookupLock.lock();
+    while (!stopping) {
+      try {
+        String newTarget = registryClient.lookUp(currentTargetUrl.asString());
+        currentTargetUrl = TargetURIFactory.newTarget(newTarget);
+        break;
+      } catch (Exception e) {
+        synchronized (httpClient) {
+
+          try {
+            httpClient.wait(threadSleepTime);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            break;
+          }
+        }
+      }
+    }
+    if (registryLookupLock.isHeldByCurrentThread()) {
+      registryLookupLock.unlock();
+    }
+  }
+
+  public void addRequestorInfo(IMetaTaskTransaction transaction) {
+    transaction.setRequesterAddress(nodeIP);
+    transaction.setRequesterNodeName(nodeName);
+    transaction.setRequesterProcessId(Integer.valueOf(pid));
+    transaction.setRequesterThreadId((int) Thread.currentThread().getId());
+    if (logger.isLoggable(Level.FINE)) {
+      logger.log(Level.FINE, "ip:" + transaction.getRequesterAddress());
+      logger.log(Level.FINE, "nodeName:" + transaction.getRequesterNodeName());
+      logger.log(Level.FINE, "processName:" + transaction.getRequesterProcessName());
+      logger.log(Level.FINE, "processId:" + transaction.getRequesterProcessId());
+      logger.log(Level.FINE, "threadId:" + transaction.getRequesterThreadId());
+
+    }
+
+  }
+
+  public void initialize() throws ServiceInitializationException {
+
+    // use plugged in registry to lookup target to connect to.
+    // Sets global: currentTarget
+    lookupNewTarget();
+
+    cMgr = new PoolingHttpClientConnectionManager();
+
+    if (clientMaxConnections > 0) {
+      cMgr.setMaxTotal(clientMaxConnections);
+    }
+    // Set default max connections per route
+    if (clientMaxConnectionsPerRoute > 0) {
+      cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute);
+    }
+    HttpHost httpHost = new HttpHost(currentTargetUrl.asString(),
+            Integer.valueOf(currentTargetUrl.getPort()), currentTargetUrl.getContext());
+    if (clientMaxConnectionsPerHostPort > 0) {
+      cMgr.setMaxPerRoute(new HttpRoute(httpHost), clientMaxConnectionsPerHostPort);
+    }
+
+    httpClient = HttpClients.custom().setConnectionManager(cMgr).build();
+    running = true;
+
+  }
+
+  private void addCommonHeaders(HttpPost method) {
+    // synchronized( HttpServiceTransport.class ) {
+
+    method.setHeader("IP", nodeIP);
+    method.setHeader("Hostname", nodeName);
+    method.setHeader("ThreadID", String.valueOf(Thread.currentThread().getId()));
+    method.setHeader("PID", pid);
+
+    // }
+
+  }
+
+  private HttpEntity wrapRequest(String serializedRequest) {
+    return new StringEntity(serializedRequest, ContentType.APPLICATION_XML);
+  }
+
+  private boolean isRunning() {
+    return running;
+  }
+
+  private IMetaTaskTransaction retryUntilSuccessfull(String request, HttpPost postMethod,
+          ThreadLocal<HashMap<Long, XStream>> localXStream) {
+    IMetaTaskTransaction response = null;
+
+    // retry until service is stopped
+    while (isRunning()) {
+      try {
+        response = doPost(postMethod, localXStream);
+        break;
+
+      } catch (TransportException | IOException | URISyntaxException exx) {
+        try {
+          Thread.sleep(threadSleepTime);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      lookupNewTarget();
+
+    }
+    return response;
+
+  }
+
+  private IMetaTaskTransaction doPost(HttpPost postMethod,
+          ThreadLocal<HashMap<Long, XStream>> localXStream)
+          throws URISyntaxException, IOException, TransportException {
+    postMethod.setURI(new URI(currentTargetUrl.asString()));
+
+    IMetaTaskTransaction metaTransaction = null;
+    HttpResponse response = httpClient.execute(postMethod);
+    if (stopping) {
+      throw new TransportException("Service stopping - rejecting request");
+    }
+    HttpEntity entity = response.getEntity();
+    String serializedResponse = EntityUtils.toString(entity);
+    Object transaction = null;
+    try {
+      long t1 = System.currentTimeMillis();
+      // transaction = XStreamUtils.unmarshall(serializedResponse);
+      transaction = localXStream.get().get(Thread.currentThread().getId())
+              .fromXML(serializedResponse);
+      xstreamTime.addAndGet((System.currentTimeMillis() - t1));
+    } catch (Exception e) {
+      logger.log(Level.WARNING, "Process Thread:" + Thread.currentThread().getId()
+              + " Error while deserializing response with XStream", e);
+      throw new TransportException(e);
+    }
+    if (Objects.isNull(transaction)) {
+      throw new InvalidClassException("Expected IMetaTaskTransaction - Instead Received NULL");
+
+    } else if (!(transaction instanceof IMetaTaskTransaction)) {
+      throw new InvalidClassException("Expected IMetaTaskTransaction - Instead Received "
+              + transaction.getClass().getName());
+    }
+    metaTransaction = (IMetaTaskTransaction) transaction;
+
+    StatusLine statusLine = response.getStatusLine();
+    if (statusLine.getStatusCode() != 200) {
+      // all IOExceptions are retried
+      throw new IOException("Unexpected HttpClient response status:" + statusLine
+              + " Content causing error:" + serializedResponse);
+    }
+
+    stats.incrementSuccessCount();
+    return metaTransaction;
+  }
+
+  /**
+   * Dispatches request to remote driver via doPost(). Its synchronized to prevent over-running the
+   * driver with requests from multiple threads. When the transport fails sending GET/ACK/END a
+   * single thread will try to recover connection and send the request.
+   * 
+   */
+  @Override
+  public IMetaTaskTransaction dispatch(String serializedRequest,
+          ThreadLocal<HashMap<Long, XStream>> localXStream) throws TransportException {
+    if (stopping) {
+      throw new IllegalStateException(
+              "Service transport has been stopped, unable to dispatch request");
+    }
+    IMetaTaskTransaction transaction = null;
+    HttpEntity e = wrapRequest(serializedRequest);
+    // Each thread has its own HttpPost method. If current thread
+    // doesnt have one, it will be created and added to the local
+    // Map. Subsequent requests will fetch it from the map using
+    // current thread ID as a key.
+    HttpPost postMethod = getPostMethodForCurrentThread();
+    addCommonHeaders(postMethod);
+    postMethod.setEntity(e);
+    try {
+      String simulatedException;
+      // To test transport errors add command line option
+      // -DMockHttpPostError=exception where
+      // exception is one of the following Strings:
+      //
+      // IOException,
+      // SocketException,
+      // UnknownHostException,
+      // NoRouteToHostException,
+      // NoHttpResponseException,
+      // HttpHostConnectException,
+      // URISyntaxException
+      // Use JUnit test JunitTransoirtTestCase to test the above errors
+
+      if ((simulatedException = System.getProperty("MockHttpPostError")) != null) {
+        HttpClientExceptionGenerator mockExceptionGenerator = new HttpClientExceptionGenerator(
+                simulatedException);
+        mockExceptionGenerator.throwSimulatedException();
+      } else {
+        transaction = doPost(postMethod, localXStream);
+      }
+    } catch (IOException | URISyntaxException ex) {
+      if (stopping) {
+        // looks like the process is in the shutdown mode. Log an exception and dont
+        // retry
+        logger.log(Level.INFO, "Process Thread:" + Thread.currentThread().getId()
+                + " - Process is already stopping - Caught Exception while calling doPost() \n"
+                + ex);
+        throw new TransportException(ex);
+      } else {
+        if (log) {
+          log = false;
+          stats.incrementErrorCount();
+          logger.log(Level.WARNING,
+                  this.getClass().getName() + ".dispatch() >>>>>>>>>> Handling Exception \n" + ex);
+          logger.log(Level.INFO,
+                  ">>>>>>>>>> Unable to communicate with target:" + currentTargetUrl.asString()
+                          + " - retrying until successfull - with " + threadSleepTime / 1000
+                          + " seconds wait between retries  ");
+        }
+        transaction = retryUntilSuccessfull(serializedRequest, postMethod, localXStream);
+        log = true;
+        logger.log(Level.INFO, "Established connection to target:" + currentTargetUrl.asString());
+      }
+
+    } finally {
+      postMethod.releaseConnection();
+    }
+    return transaction;
+
+  }
+
+  public void stop(boolean quiesce) {
+
+    stopping = true;
+    running = false;
+    // Use System.out since the logger's ShutdownHook may have closed streams
+    System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(this.getClass())
+            + " stop() called - mode:" + (quiesce == true ? "quiesce" : "stop"));
+    logger.log(Level.INFO, this.getClass().getName() + " stop() called");
+    System.out.println(" ########################################3 Total time in XStream:"
+            + (xstreamTime.get() / 1000) + " secs");
+    if (!quiesce && cMgr != null) {
+      cMgr.shutdown();
+      System.out.println(Utils.getTimestamp() + ">>>>>>> "
+              + Utils.getShortClassname(this.getClass()) + " stopped connection mgr");
+      logger.log(Level.INFO, this.getClass().getName() + " stopped connection mgr");
+
+    }
+  }
+
+  public static void main(String[] args) {
+
+  }
+
+  public static class HttpClientExceptionGenerator {
+    public enum ERROR {
+      IOException, SocketException, UnknownHostException, NoRouteToHostException, NoHttpResponseException, HttpHostConnectException, URISyntaxException
+    };
+
+    Exception exceptionClass = null;
+
+    public HttpClientExceptionGenerator(String exc) {
+
+      for (ERROR e : ERROR.values()) {
+        if (exc != null && e.name().equals(exc)) {
+          switch (e) {
+            case IOException:
+              exceptionClass = new IOException("Simulated IOException");
+              break;
+            case URISyntaxException:
+              exceptionClass = new URISyntaxException("", "Simulated URISyntaxException");
+              break;
+            case NoRouteToHostException:
+              exceptionClass = new NoRouteToHostException("Simulated NoRouteToHostException");
+              break;
+            case NoHttpResponseException:
+              exceptionClass = new NoHttpResponseException("Simulated NoHttpResponseException");
+              break;
+            case SocketException:
+              exceptionClass = new SocketException("Simulated SocketException");
+              break;
+            case UnknownHostException:
+              exceptionClass = new UnknownHostException("Simulated UnknownHostException");
+              break;
+
+            default:
+
+          }
+          if (exceptionClass != null) {
+            break;
+          }
+        }
+      }
+    }
+
+    public void throwSimulatedException() throws IOException, URISyntaxException {
+      if (exceptionClass != null) {
+        if (exceptionClass instanceof IOException) {
+          throw (IOException) exceptionClass;
+        } else if (exceptionClass instanceof URISyntaxException) {
+          throw (URISyntaxException) exceptionClass;
+        }
+
+      }
+    }
+
+  }
 
 }

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java Wed May  1 13:42:39 2019
@@ -21,6 +21,12 @@ package org.apache.uima.ducc.ps;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -34,6 +40,7 @@ import org.apache.uima.cas.CAS;
 import org.apache.uima.ducc.ps.net.iface.IMetaTask;
 import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
 import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Direction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type;
 import org.apache.uima.ducc.ps.net.impl.MetaTask;
 import org.apache.uima.ducc.ps.service.transport.XStreamUtils;
 import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
@@ -48,243 +55,441 @@ import org.eclipse.jetty.servlet.Servlet
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.junit.After;
 
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
 public class Client {
-	private Server server;
-	private boolean block = false;
-	private AtomicLong errorCount = new AtomicLong();
-	private final static String app="test";
-	private int httpPort = 12222;
-	private int maxThreads = 50;
-	private static UimaSerializer uimaSerializer = new UimaSerializer();
-	private AtomicInteger correlationIdCounter = 
-			new AtomicInteger(0);
-	private AtomicInteger atomicCounter =
-			new AtomicInteger(1);
-	private AtomicInteger atomicErrorCounter =
-			new AtomicInteger(10);
-	private volatile boolean noMoreErrors = false;
-	protected String getApp() {
-		return app;
-	}
-	protected int getJettyPort() {
-		while(true) {
-			ServerSocket socket=null;
-			try {
-				socket = new ServerSocket(httpPort);
-				break;
-			} catch( IOException e) {
-				httpPort++;
-			} finally {
-				if ( socket != null ) {
-					try {
-						socket.close();
-					} catch( Exception ee) {}
-					
-				}
-			}
-		}
-		return httpPort;
-	}
-	protected int getPort() {
-
-		return httpPort;
-	}
-	   
-	public void startJetty(boolean block) throws Exception {
-		this.block = block;
-	    	
-	        
-			QueuedThreadPool threadPool = new QueuedThreadPool();
-			if (maxThreads < threadPool.getMinThreads()) {
-				System.out.println(
-				"Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to "+threadPool.getMinThreads()+". Defaulting to jettyMaxThreads="+threadPool.getMaxThreads());
-				threadPool.setMaxThreads(threadPool.getMinThreads());
-			} else {
-				threadPool.setMaxThreads(maxThreads);
-			}
-
-		    server = new Server(threadPool);
-
-			// Server connector
-			ServerConnector connector = new ServerConnector(server);
-			connector.setPort(getJettyPort());
-			server.setConnectors(new Connector[] { connector });
-	        System.out.println("launching Jetty on Port:"+connector.getPort());
-			ServletContextHandler context = new ServletContextHandler(
-					ServletContextHandler.SESSIONS);
-			context.setContextPath("/");
-			server.setHandler(context);
-
-			context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/"+app);
-	        
-	        
-			server.start();
-	        System.out.println("Jetty Started - Waiting for Messages ...");
-	    }
-
-	    @After
-	    public void stopJetty()
-	    {
-	        try
-	        {
-	        	if ( server != null ) {
-		        	UIMAFramework.getLogger().log(Level.INFO, "Stopping Jetty");
-		            server.stop();
-	        		
-	        	}
-	        }
-	        catch (Exception e)
-	        {
-	            e.printStackTrace();
-	        }
-	        UIMAFramework.getLogger().log(Level.INFO,"Jetty Stopped");
-	    }
-		public class TaskHandlerServlet extends HttpServlet {
-			private static final long serialVersionUID = 1L;
-
-			public TaskHandlerServlet() {
-			}
-
-			protected void doPost(HttpServletRequest request,
-					HttpServletResponse response) throws ServletException,
-					IOException {
-				try {
-					//System.out.println("Handling HTTP Post Request");
-					//long post_stime = System.nanoTime();
-					StringBuilder sb = new StringBuilder();
-					BufferedReader reader = request.getReader();
-					String line;
-					while ((line = reader.readLine()) != null) {
-						sb.append(line);
-					}
-					String content = sb.toString().trim();
-
-					//System.out.println( "Http Request Body:::"+String.valueOf(content));
-					
-					
-		    		 String nodeIP = request.getHeader("IP");
-		             String nodeName = request.getHeader("Hostname");
-		             String threadID = request.getHeader("ThreadID");
-		             String pid = request.getHeader("PID");
-					System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+" ThreadID:"+threadID);
-
-					IMetaTaskTransaction imt = null;
-
-					imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
-					IMetaTaskTransaction.Type type = imt.getType();
-					switch(type) {
-					case Get:
-						System.out.println("---- Driver handling GET Request -- Thread:"+Thread.currentThread().getId());
-						imt.setMetaTask(getMetaMetaCas());
-						imt.getMetaTask().setAppData("CorrelationID-"+correlationIdCounter.incrementAndGet());
-						if ( System.getProperty("simulate.no.work") == null || noMoreErrors) {
-							imt.getMetaTask().setUserSpaceTask(getSerializedCAS());
-						} else {
-							System.out.println("---- Driver handling GET Request -- Client Out of Tasks -- Thread:"+Thread.currentThread().getId());
-							imt.getMetaTask().setUserSpaceTask(null);
-							if ( atomicErrorCounter.decrementAndGet() == 0 ) {
-								noMoreErrors = true;
-							}
-						}
-					//	handleMetaCasTransationGet(trans, taskConsumer);
-						break;
-					case Ack:
-						System.out.println("---- Driver handling ACK Request - ");
-						//handleMetaCasTransationAck(trans, taskConsumer);
-						break;
-					case End:
-						System.out.println("---- Driver handling END Request - "+imt.getMetaTask().getAppData());
-						//handleMetaCasTransationEnd(trans, taskConsumer);
-						if ( imt.getMetaTask().getUserSpaceException() != null ) {
-							System.out.println("Client received error#"+errorCount.incrementAndGet());
-						}
-						break;
-					case InvestmentReset:
-					//	handleMetaCasTransationInvestmentReset(trans, rwt);
-						break;
-					default:
-						break;
-					}
-					// process service request
-					//taskProtocolHandler.handle(imt);
-
-					//long marshall_stime = System.nanoTime();
-					// setup reply
-					
-					imt.setDirection(Direction.Response);
-
-					response.setStatus(HttpServletResponse.SC_OK);
-
-					response.setHeader("content-type", "text/xml");
-					String body = XStreamUtils.marshall(imt);
-
-					if (block ) {
-						synchronized(this) {
-							this.wait(0);
-						}
-						
-					}
-					System.out.println("Sending response");
-					response.getWriter().write(body);
-					
-					
-					//response.getWriter().write(content);
-				} catch( InterruptedException e) {
-					Thread.currentThread().interrupt();
-				}
-				catch (Throwable e) {
-					e.printStackTrace();
-					throw new ServletException(e);
-				}
-			}
-
-		}
-		public long getErrorCount() {
-			return errorCount.get();
-		}
-		private IMetaTask getMetaCas(String serializedCas) {
-			if ( serializedCas == null ) {
-				return null;
-			}
-			return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas);
-		}
-
-		private IMetaTask getMetaMetaCas() {
-			//IMetaMetaCas mmc = new MetaMetaCas();
-					
-			String serializedCas = "Bogus";
-
-			IMetaTask metaCas = getMetaCas(serializedCas);
-			
-		//	mmc.setMetaCas(metaCas);
-			//return mmc;
-			return metaCas;
-		}
-		public String getSerializedCAS() {
-			//logger.log(Level.INFO,"getSerializedCAS() Call "+seqno.incrementAndGet()
-			//        + " - from "+taskConsumer.getType()+":"+taskConsumer.getHostName()+"-"+taskConsumer.getPid()+"-"+taskConsumer.getThreadId() );
-			String serializedCas = null;
-			try {
-				CAS cas = null;
-				cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null);
-				cas.setDocumentLanguage("en");
-				
-				//logger.log(Level.INFO,"delivering: " + text);
-				cas.setDocumentText("TEST");
-//				cas.setDocumentText("100 "+seqno.incrementAndGet()+" 1000 0");
-
-				serializedCas = serialize(cas);
-				cas.reset();
-				cas.release();
-
-			} catch( Exception e) {
-				//logger.log(Level.WARNING,"Error",e);
-			}
-
-			return serializedCas;
-		}
-		private String serialize(CAS cas) throws Exception {
-			String serializedCas = uimaSerializer.serializeCasToXmi(cas);
-			return serializedCas;
-		}
+  private Server server;
+
+  private boolean block = false;
+
+  private AtomicLong errorCount = new AtomicLong();
+
+  private AtomicLong taskCount = new AtomicLong();
+
+  private final static String app = "test";
+
+  private int httpPort = 12222;
+
+  private int maxThreads = 50;
+
+  private volatile boolean print = true;
+
+  private static UimaSerializer uimaSerializer = new UimaSerializer();
+
+  private AtomicInteger correlationIdCounter = new AtomicInteger(0);
+
+  private AtomicInteger atomicCounter = new AtomicInteger(1);
+
+  private AtomicInteger atomicErrorCounter = new AtomicInteger(16);
+
+  private volatile boolean noMoreErrors = false;
+
+  Map<String, List<ThreadMetrics>> metrics = new ConcurrentHashMap<>();
+
+  private AtomicLong idleTime = new AtomicLong();
+
+  private AtomicLong lastTime = new AtomicLong();
+
+  private AtomicLong xstreamTime = new AtomicLong();
+
+  private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long, XStream>>() {
+    @Override
+    protected HashMap<Long, XStream> initialValue() {
+      return new HashMap<Long, XStream>();
+    }
+  };
+
+  protected String getApp() {
+    return app;
+  }
+
+  protected int getJettyPort() {
+    while (true) {
+      ServerSocket socket = null;
+      try {
+        socket = new ServerSocket(httpPort);
+        break;
+      } catch (IOException e) {
+        httpPort++;
+      } finally {
+        if (socket != null) {
+          try {
+            socket.close();
+          } catch (Exception ee) {
+          }
+
+        }
+      }
+    }
+    return httpPort;
+  }
+
+  protected int getPort() {
+
+    return httpPort;
+  }
+
+  public void startJetty(boolean block) throws Exception {
+    this.block = block;
+
+    QueuedThreadPool threadPool = new QueuedThreadPool();
+    if (maxThreads < threadPool.getMinThreads()) {
+      System.out.println("Invalid value for jetty MaxThreads(" + maxThreads
+              + ") - it should be greater or equal to " + threadPool.getMinThreads()
+              + ". Defaulting to jettyMaxThreads=" + threadPool.getMaxThreads());
+      threadPool.setMaxThreads(threadPool.getMinThreads());
+    } else {
+      threadPool.setMaxThreads(maxThreads);
+    }
+
+    server = new Server(threadPool);
+
+    // Server connector
+    ServerConnector connector = new ServerConnector(server);
+    System.out.println(">>>> Jetty Acceptors:" + connector.getAcceptors());
+
+    connector.setPort(getJettyPort());
+    server.setConnectors(new Connector[] { connector });
+    System.out.println("launching Jetty on Port:" + connector.getPort());
+    ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+    context.setContextPath("/");
+    server.setHandler(context);
+
+    context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/" + app);
+
+    server.start();
+    System.out.println("Jetty Started - Waiting for Messages ...");
+  }
+
+  @After
+  public void stopJetty() {
+    try {
+      if (server != null) {
+        UIMAFramework.getLogger().log(Level.INFO, "Stopping Jetty");
+        server.stop();
+
+      }
+      // System.out.println(">>>>>>>>>>>>>>> IDLE TIME:"+(idleTime.longValue()/1000));
+
+      if (print) {
+        print = false;
+
+        for (Entry<String, List<ThreadMetrics>> me : metrics.entrySet()) {
+          StringBuilder sb = new StringBuilder();
+          sb.append("Service Thread Id:").append(me.getKey()).append(" Number of Tasks Processed:")
+                  .append(me.getValue().size());
+          int i = 0;
+          for (ThreadMetrics tm : me.getValue()) {
+            long analysisTime = 0;
+            try {
+              analysisTime = Long.parseLong(tm.getAnalysisTime());
+            } catch (Exception e) {
+            }
+            ThreadMetrics previous = null;
+            if (i > 0) {
+              previous = me.getValue().get(i - 1);
+            }
+            sb.append("\n\tTask ").append(tm.getCorrelationId()).// append(" Ack Time:").
+                    append(tm.getAckTime() - tm.getGetTime()).append(" Get-Ack-End:")
+                    .append(tm.getEndTime() - tm.getGetTime()).append(" ms")
+                    .append(" Analysis Time:").append(tm.getAnalysisTime()).append(" Overhead:")
+                    .append((tm.getEndTime() - tm.getGetTime()) - analysisTime);
+            if (previous != null) {
+              sb.append(" Idle Time:").append(tm.getGetTime() - previous.getEndTime());
+            }
+            i++;
+
+          }
+
+          System.out.println(sb.toString());
+          System.out.println(">>>>> Total Tasks Processed:" + taskCount + " Client Time in xstream:"
+                  + (xstreamTime.get() / 1000));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    UIMAFramework.getLogger().log(Level.INFO, "Jetty Stopped");
+  }
+
+  public class TaskHandlerServlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    public TaskHandlerServlet() {
+    }
+
+    protected void doPost(HttpServletRequest request, HttpServletResponse response)
+            throws ServletException, IOException {
+      try {
+        // System.out.println("Handling HTTP Post Request");
+        // long post_stime = System.nanoTime();
+        StringBuilder sb = new StringBuilder();
+        BufferedReader reader = request.getReader();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          sb.append(line);
+        }
+        String content = sb.toString().trim();
+
+        // System.out.println( "Http Request Body:::"+String.valueOf(content));
+
+        if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+          localXStream.get().put(Thread.currentThread().getId(), XStreamUtils.getXStreamInstance());// new
+                                                                                                    // XStream(new
+                                                                                                    // DomDriver()));
+        }
+
+        String nodeIP = request.getHeader("IP");
+        String nodeName = request.getHeader("Hostname");
+        String threadID = request.getHeader("ThreadID");
+        String pid = request.getHeader("PID");
+        // System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+"
+        // ThreadID:"+threadID);
+
+        IMetaTaskTransaction imt = null;
+        long t1 = System.currentTimeMillis();
+        // imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
+        imt = (IMetaTaskTransaction) localXStream.get().get(Thread.currentThread().getId())
+                .fromXML(content);
+        xstreamTime.addAndGet(System.currentTimeMillis() - t1);
+        IMetaTaskTransaction.Type type = imt.getType();
+        switch (type) {
+          case Get:
+            // idleTime.addAndGet(System.currentTimeMillis() - lastTime.longValue());
+            // lastTime.set(System.currentTimeMillis());
+
+            taskCount.incrementAndGet();
+            // System.out.println("---- Driver handling GET Request -- Thread:"+threadID);
+            List<ThreadMetrics> tmList = null;
+            if (metrics.containsKey(threadID)) {
+              tmList = metrics.get(threadID);
+            } else {
+              tmList = new ArrayList<>();
+              metrics.put(threadID, tmList);
+
+            }
+            ThreadMetrics tm = new ThreadMetrics();
+            tm.setGetTime(System.currentTimeMillis());
+            tmList.add(tm);
+
+            imt.setMetaTask(getMetaMetaCas());
+
+            imt.getMetaTask().setAppData("CorrelationID-" + correlationIdCounter.incrementAndGet());
+            tm.setCorrelationId(imt.getMetaTask().getAppData());
+            if (System.getProperty("simulate.no.work") == null || noMoreErrors) {
+              imt.getMetaTask().setUserSpaceTask(getSerializedCAS());
+            } else {
+              System.out
+                      .println("---- Driver handling GET Request -- Client Out of Tasks -- Thread:"
+                              + threadID);
+              imt.getMetaTask().setUserSpaceTask(null);
+              if (atomicErrorCounter.decrementAndGet() == 0) {
+                noMoreErrors = true;
+              }
+            }
+            // handleMetaCasTransationGet(trans, taskConsumer);
+            break;
+          case Ack:
+            // System.out.println("---- Driver handling ACK Request - ");
+            List<ThreadMetrics> tmList2 = metrics.get(threadID);
+            for (ThreadMetrics tm2 : tmList2) {
+              if (imt.getMetaTask().getAppData().equals(tm2.getCorrelationId())) {
+                tm2.setAckTime(System.currentTimeMillis());
+                break;
+              }
+            }
+
+            // handleMetaCasTransationAck(trans, taskConsumer);
+            break;
+          case End:
+            // System.out.println("---- Driver handling END Request -
+            // "+imt.getMetaTask().getAppData());
+            List<ThreadMetrics> tmList3 = metrics.get(threadID);
+            for (ThreadMetrics tm3 : tmList3) {
+              if (imt.getMetaTask().getAppData().equals(tm3.getCorrelationId())) {
+                tm3.setEndTime(System.currentTimeMillis());
+                if (imt.getMetaTask().getPerformanceMetrics() != null) {
+                  String metrics = imt.getMetaTask().getPerformanceMetrics();
+                  int start = metrics.indexOf("<analysisTime>") + "<analysisTime>".length();
+                  int end = metrics.indexOf("</analysisTime>");
+                  String analysisTime = metrics.substring(start, end);
+                  // System.out.println(">>>>>>>>>>>>>>>>> Analysis Time:"+analysisTime);
+                  tm3.setAnalysisTime(analysisTime);
+                }
+                break;
+              }
+            }
+            // handleMetaCasTransationEnd(trans, taskConsumer);
+            if (imt.getMetaTask().getUserSpaceException() != null) {
+              System.out.println("Client received error#" + errorCount.incrementAndGet());
+            }
+
+            break;
+          case InvestmentReset:
+            // handleMetaCasTransationInvestmentReset(trans, rwt);
+            break;
+          default:
+            break;
+        }
+        // process service request
+        // taskProtocolHandler.handle(imt);
+
+        // long marshall_stime = System.nanoTime();
+        // setup reply
+
+        imt.setDirection(Direction.Response);
+
+        response.setStatus(HttpServletResponse.SC_OK);
+
+        response.setHeader("content-type", "text/xml");
+        // String body = XStreamUtils.marshall(imt);
+        String body = localXStream.get().get(Thread.currentThread().getId()).toXML(imt);
+        if (block) {
+          synchronized (this) {
+            this.wait(0);
+          }
+
+        }
+
+        // System.out.println("Sending response");
+        response.getWriter().write(body);
+
+        // response.getWriter().write(content);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (Throwable e) {
+        e.printStackTrace();
+        throw new ServletException(e);
+      }
+    }
+
+  }
+
+  public long getErrorCount() {
+    return errorCount.get();
+  }
+
+  private IMetaTask getMetaCas(String serializedCas) {
+    if (serializedCas == null) {
+      return null;
+    }
+    return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas);
+  }
+
+  private IMetaTask getMetaMetaCas() {
+    // IMetaMetaCas mmc = new MetaMetaCas();
+
+    String serializedCas = "Bogus";
+
+    IMetaTask metaCas = getMetaCas(serializedCas);
+
+    // mmc.setMetaCas(metaCas);
+    // return mmc;
+    return metaCas;
+  }
+
+  public String getSerializedCAS() {
+    // logger.log(Level.INFO,"getSerializedCAS() Call "+seqno.incrementAndGet()
+    // + " - from
+    // "+taskConsumer.getType()+":"+taskConsumer.getHostName()+"-"+taskConsumer.getPid()+"-"+taskConsumer.getThreadId()
+    // );
+    String serializedCas = null;
+    try {
+      CAS cas = null;
+      cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null);
+      cas.setDocumentLanguage("en");
+
+      // logger.log(Level.INFO,"delivering: " + text);
+      cas.setDocumentText("TEST");
+      // cas.setDocumentText("100 "+seqno.incrementAndGet()+" 1000 0");
+
+      serializedCas = serialize(cas);
+      cas.reset();
+      cas.release();
+
+    } catch (Exception e) {
+      // logger.log(Level.WARNING,"Error",e);
+    }
+
+    return serializedCas;
+  }
+
+  private String serialize(CAS cas) throws Exception {
+    String serializedCas = uimaSerializer.serializeCasToXmi(cas);
+    return serializedCas;
+  }
+
+  private class ThreadMetrics {
+    long getTime;
+
+    long ackTime;
+
+    long endTime;
+
+    long idleTime;
+
+    long lastTime;
+
+    String correlationId;
+
+    String analysisTime;
+
+    public long getLastTime() {
+      return lastTime;
+    }
+
+    public void setLastTime(long lastTime) {
+      this.lastTime = lastTime;
+    }
+
+    public String getAnalysisTime() {
+      return analysisTime;
+    }
+
+    public void setAnalysisTime(String analysisTime) {
+      this.analysisTime = analysisTime;
+    }
+
+    public long getIdleTime() {
+      return idleTime;
+    }
+
+    public void setIdleTime(long idleTime) {
+      this.idleTime = idleTime;
+    }
+
+    public String getCorrelationId() {
+      return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+      this.correlationId = correlationId;
+    }
+
+    public long getGetTime() {
+      return getTime;
+    }
+
+    public void setGetTime(long getTime) {
+      this.getTime = getTime;
+    }
+
+    public long getAckTime() {
+      return ackTime;
+    }
+
+    public void setAckTime(long ackTime) {
+      this.ackTime = ackTime;
+    }
+
+    public long getEndTime() {
+      return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+      this.endTime = endTime;
+    }
+
+  }
 }

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java Wed May  1 13:42:39 2019
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps.service;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.uima.ducc.ps.Client;
 import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder;
@@ -30,269 +31,264 @@ import org.apache.uima.ducc.ps.service.p
 import org.junit.Test;
 
 public class JunitPullServiceTestCase extends Client {
-	private static final long  DELAY=5000;
-	CountDownLatch threadsReady;
-	CountDownLatch stopLatch;
-	{
-		// static initializer sets amount of time the service delays
-		// sending READY to a monitor
-		System.setProperty("ducc.service.init.delay", "3000");
-	}
-	@Test
-	public void testPullService() throws Exception {
-		System.out.println("----------------- testPullService -------------------");
-		int scaleout = 2;
-		super.startJetty(false);  // don't block
-		String analysisEngineDescriptor = "TestAAE";
-		System.setProperty("ducc.deploy.JpType", "uima");
-
-		IServiceProcessor processor = new
-				UimaServiceProcessor(analysisEngineDescriptor);
-
-		String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-		IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
-				.withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
-				.withOptionalsDone().build();
-
-		try {
-			service.initialize();
-			Timer fTimer = new Timer("testPullService Timer");
-			// after 5secs stop the pull service
-			fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
-
-			service.start();
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			super.stopJetty();
-		}
-	}
-	@Test
-	public void testPullServiceQuiesce() throws Exception {
-		System.out.println("----------------- testPullServiceQuiesce -------------------");
-		int scaleout = 2;
-		super.startJetty(false);  // don't block
-		String analysisEngineDescriptor = "TestAAE";
-		System.setProperty("ducc.deploy.JpType", "uima");
-		IServiceProcessor processor = new
-				UimaServiceProcessor(analysisEngineDescriptor);
-
-		String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-		IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
-				.withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
-				.withOptionalsDone().build();
-
-		try {
-			service.initialize();
-			Timer fTimer = new Timer("testPullService Timer");
-			// after 5secs stop the pull service
-			fTimer.schedule(new MyTimerTask(service, fTimer, true), DELAY);
-
-			service.start();
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		}finally {
-			super.stopJetty();
-		}
-	}
-
-	@Test
-	public void testPullServiceTimeout() throws Exception {
-		System.out.println("----------------- testPullServiceTimeout -------------------");
-		super.startJetty(true);  // true=client blocks all POST requests
-		int scaleout = 12;
-		String analysisEngineDescriptor = "TestAAE";
-		IServiceProcessor processor = new
-				UimaServiceProcessor(analysisEngineDescriptor);
-
-		String tasURL ="http://localhost:"+super.getPort()+"/test";
-
-		IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
-				.withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
-				.withOptionalsDone().build();
-
-		try {
-			service.initialize();
-			System.out.println("----------- Starting Service .....");
-			Timer fTimer = new Timer();
-			//after 10sec stop the service
-			fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
-
-			service.start();
-
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		}finally {
-			super.stopJetty();
-		}
-	}
-
-	@Test
-	public void testStopOnFirstError() throws Exception {
-		System.out.println("----------------- testStopOnFirstError -------------------");
-		int scaleout = 10;
-		super.startJetty(false);  // don't block
-		String analysisEngineDescriptor = "NoOpAE";
-		System.setProperty("ducc.deploy.JpType", "uima");
-
-		IServiceProcessor processor =
-				new UimaServiceProcessor(analysisEngineDescriptor);
-		// fail on 1st error
-		processor.setErrorHandlerWindow(1,  5);
-
-		String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-		IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
-				.withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
-				.withOptionalsDone().build();
-
-		try {
-			System.setProperty("ProcessFail","2");
-			service.initialize();
-
-			service.start();
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			System.getProperties().remove("ProcessFail");
-			super.stopJetty();
-		}
-	}
-	@Test
-	public void testTerminateOn2ErrorsInWindowOf5() throws Exception {
-		System.out.println("----------------- testTerminateOn2ErrorsInWindowOf5 -------------------");
-		int scaleout = 10;
-		super.startJetty(false);  // don't block
-		String analysisEngineDescriptor = "NoOpAE";
-		System.setProperty("ducc.deploy.JpType", "uima");
-
-		IServiceProcessor processor =
-				new UimaServiceProcessor(analysisEngineDescriptor);
-		// fail on 2nd error in a window of 5
-		processor.setErrorHandlerWindow(2,  5);
-		String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-		IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
-				.withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
-				.withOptionalsDone().build();
-
-		try {
-			// fail task#1 and task#3 which should stop the test
-			System.setProperty("ProcessFail","1,3");
-			service.initialize();
-
-			service.start();
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			System.getProperties().remove("ProcessFail");
-			super.stopJetty();
-		}
-	}
-	@Test
-	public void testProcessFailureDefaultErrorHandler() throws Exception {
-		System.out.println("----------------- testProcessFailureDefaultErrorHandler -------------------");
-		int scaleout = 14;
-		super.startJetty(false);  // don't block
-		String analysisEngineDescriptor = "NoOpAE";
-		IServiceProcessor processor = new
-				UimaServiceProcessor(analysisEngineDescriptor);
-
-		String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-		IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
-				.withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
-				.withOptionalsDone().build();
-
-		try {
-			// fail on 2nd task. This should terminate the test
-			 System.setProperty("ProcessFail","20");
-			service.initialize();
-			Timer fTimer = new Timer("testPullService Timer");
-			// after 5secs stop the pull service
-			fTimer.schedule(new MyTimerTask(service, fTimer, false), 20000);
-
-			service.start();
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			System.getProperties().remove("ProcessFail");
-			super.stopJetty();
-		}
-	}
-
-	/*
-	@Test
-	public void testPullServiceBadClientURL() throws Exception {
-		int scaleout = 2;
-		super.startJetty(false);  // don't block
-		String analysisEngineDescriptor = "TestAAE";
-		IServiceProcessor processor = new
-				UimaServiceProcessor(analysisEngineDescriptor);
-
-		String tasURL ="http://localhost2:8080/test";
-
-		IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
-				.withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
-				.withOptionalsDone().build();
-
-		try {
-			service.initialize();
-			service.start();
-
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		}
-	}
-	*/
-	class MyTimerTask extends TimerTask {
-		final IService service;
-		final Timer fTimer;
-		final boolean quiesce;
-
-		MyTimerTask(IService service, Timer fTimer, boolean quiesce) {
-			this.service = service;
-			this.fTimer = fTimer;
-			this.quiesce = quiesce;
-		}
-
-		@Override
-
-		public void run() {
-			this.cancel();
-			fTimer.purge();
-			fTimer.cancel();
-			System.out.println("Timmer popped - stopping service");
-			if (quiesce ) {
-				service.quiesceAndStop();
-			} else {
-				service.stop();
-			}
-		}
+  private static final long DELAY = 20000;
 
-	}
+  CountDownLatch threadsReady;
+
+  CountDownLatch stopLatch;
+  {
+    // static initializer sets amount of time the service delays
+    // sending READY to a monitor
+    System.setProperty("ducc.service.init.delay", "3000");
+  }
+
+  @Test
+  public void testPullService() throws Exception {
+    System.out.println("----------------- testPullService -------------------");
+    int scaleout = 20;
+    super.startJetty(false); // don't block
+    // String analysisEngineDescriptor = "TestAAE";
+    String analysisEngineDescriptor = "NoOpAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+    // System.setProperty("simulate.no.work", "3"); UNCOMMENT TO SIMULATE NO WORK
+    IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+    IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+    long start = System.currentTimeMillis();
+    try {
+      service.initialize();
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      long end = System.currentTimeMillis();
+      super.stopJetty();
+
+      System.out.println("###################### start-end clock time:" + (end - start) / 1000);
+    }
+  }
+
+  @Test
+  public void testPullServiceQuiesce() throws Exception {
+    System.out.println("----------------- testPullServiceQuiesce -------------------");
+    int scaleout = 12;
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "TestAAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+    IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+    IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      service.initialize();
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer, true), DELAY);
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testPullServiceTimeout() throws Exception {
+    System.out.println("----------------- testPullServiceTimeout -------------------");
+    super.startJetty(true); // true=client blocks all POST requests
+    int scaleout = 12;
+    String analysisEngineDescriptor = "TestAAE";
+    IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+    IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      service.initialize();
+      System.out.println("----------- Starting Service .....");
+      Timer fTimer = new Timer();
+      // after 10sec stop the service
+      fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testStopOnFirstError() throws Exception {
+    System.out.println("----------------- testStopOnFirstError -------------------");
+    int scaleout = 10;
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "NoOpAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+
+    IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+    // fail on 1st error
+    processor.setErrorHandlerWindow(1, 5);
+
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+    IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      System.setProperty("ProcessFail", "2");
+      service.initialize();
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      System.getProperties().remove("ProcessFail");
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testTerminateOn2ErrorsInWindowOf5() throws Exception {
+    System.out.println("----------------- testTerminateOn2ErrorsInWindowOf5 -------------------");
+    int scaleout = 10;
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "NoOpAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+
+    IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+    // fail on 2nd error in a window of 5
+    processor.setErrorHandlerWindow(2, 5);
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+    IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      // fail task#1 and task#3 which should stop the test
+      System.setProperty("ProcessFail", "1,3");
+      service.initialize();
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      System.getProperties().remove("ProcessFail");
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testProcessFailureDefaultErrorHandler() throws Exception {
+    System.out
+            .println("----------------- testProcessFailureDefaultErrorHandler -------------------");
+    int scaleout = 14;
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "NoOpAE";
+    IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+    IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      // fail on 2nd task. This should terminate the test
+      System.setProperty("ProcessFail", "20");
+      service.initialize();
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer, false), 20000);
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      System.getProperties().remove("ProcessFail");
+      super.stopJetty();
+    }
+  }
+
+  /*
+   * @Test public void testPullServiceBadClientURL() throws Exception { int scaleout = 2;
+   * super.startJetty(false); // don't block String analysisEngineDescriptor = "TestAAE";
+   * IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+   * 
+   * String tasURL ="http://localhost2:8080/test";
+   * 
+   * IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+   * .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+   * .withOptionalsDone().build();
+   * 
+   * try { service.initialize(); service.start();
+   * 
+   * 
+   * } catch (ServiceInitializationException e) { throw e; } catch (Exception e) { throw e; } }
+   */
+  class MyTimerTask extends TimerTask {
+    final IService service;
+
+    final Timer fTimer;
+
+    final boolean quiesce;
+
+    MyTimerTask(IService service, Timer fTimer, boolean quiesce) {
+      this.service = service;
+      this.fTimer = fTimer;
+      this.quiesce = quiesce;
+    }
+
+    @Override
+
+    public void run() {
+      this.cancel();
+      fTimer.purge();
+      fTimer.cancel();
+      System.out.println("Timmer popped - stopping service");
+      if (quiesce) {
+        service.quiesceAndStop();
+      } else {
+        service.stop();
+      }
+    }
+
+  }
 
 }

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java Wed May  1 13:42:39 2019
@@ -28,189 +28,194 @@ import org.apache.uima.ducc.ps.service.e
 import org.apache.uima.ducc.ps.service.main.ServiceWrapper;
 import org.junit.Test;
 
-public class JUnitServiceWrapperTestCase extends Client  {
-	private static final long  DELAY=5000;
-	{
-		// static initializer sets amount of time the service delays
-		// sending READY to a monitor
-		System.setProperty("ducc.service.init.delay", "3000");
-	}
-	
-	
-	@Test
-	public void testPullServiceWrapperNoTask() throws Exception {
-		// make client return null task in response to GET
-		System.setProperty("simulate.no.work", "3");
-		System.setProperty("ducc.process.thread.sleep.time", "1000");
-		try {
-			testPullServiceWrapper();
-		} finally {
-			System.getProperties().remove("simulate.no.work");
-		}
-	}
-	@Test
-	public void testPullServiceWrapper() throws Exception {
-		System.out.println("-------------------------- testPullServiceWrapper ----------------------");;
-
-		//int scaleout = 2;
-		StateMonitor monitor = new StateMonitor();
-		monitor.start();
-		System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
-		super.startJetty(false);  // don't block
-		String analysisEngineDescriptor = "TestAAE";
-		System.setProperty("ducc.deploy.JpType", "uima");
-
-		String tasURL = "http://localhost:"+super.getPort()+"/test";
-		try {
-			System.setProperty("ducc.deploy.JdURL", tasURL);
-			System.setProperty("ducc.deploy.JpThreadCount","4");
-			System.setProperty("ducc.deploy.service.type", "NotesService");
-			System.setProperty("ducc.deploy.JpType", "uima");
-
-			ServiceWrapper service = new ServiceWrapper();
-
-			Timer fTimer = new Timer("testPullService Timer");
-			// after 5secs stop the pull service
-			fTimer.schedule(new MyTimerTask(service, fTimer), 40000);
-				
-			service.initialize(new String[] {analysisEngineDescriptor});
-
-			service.start();
-
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			monitor.stop();
-			super.stopJetty();
-
-		}
-	}
-	
-	@Test
-	public void testPullServiceWrapperWithProcessFailure() throws Exception {
-		System.out.println("-------------------------- testPullServiceWrapperWithProcessFailure ----------------------");;
-		//int scaleout = 2;
-		StateMonitor monitor = new StateMonitor();
-		monitor.start();
-		System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
-		super.startJetty(false);  // don't block
-		String analysisEngineDescriptor = "NoOpAE";
-
-		String tasURL = "http://localhost:"+super.getPort()+"/test";
-		try {
-			// Force process failure of the first task
-			System.setProperty("ProcessFail","1");
-			 
-			System.setProperty("ducc.deploy.JdURL", tasURL);
-			System.setProperty("ducc.deploy.JpThreadCount","4");
-			System.setProperty("ducc.deploy.service.type", "NotesService");
-			System.setProperty("ducc.deploy.JpType", "uima");
-			// use default error window (1,1)
-			ServiceWrapper service = new ServiceWrapper();
-
-			Timer fTimer = new Timer("testPullService Timer");
-			// after 5secs stop the pull service
-			fTimer.schedule(new MyTimerTask(service, fTimer), 10000);
-				
-			service.initialize(new String[] {analysisEngineDescriptor});
-
-			service.start();
-
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			monitor.stop();
-			System.getProperties().remove("ProcessFail");
-			super.stopJetty();
-		}
-	}
-	@Test
-	public void testPullServiceWrapperDDGenerator() throws Exception {
-		System.out.println("-------------------------- testPullServiceWrapperDDGenerator ----------------------");;
-
-		//int scaleout = 2;
-		StateMonitor monitor = new StateMonitor();
-		monitor.start();
-		System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
-		super.startJetty(false);  // don't block
-		// Dont change the name of TestAAE.xml. This is setup to fail file lookup and force 
-		// generation of AE descriptor.
-		String analysisEngineDescriptor = "TestAAE.xml";
-		System.setProperty("ducc.deploy.JpType", "uima");
-
-		String tasURL = "http://localhost:"+super.getPort()+"/test";
-		try {
-			
-			System.setProperty("ducc.deploy.JdURL", tasURL);
-			System.setProperty("ducc.deploy.JpThreadCount","4");
-			System.setProperty("ducc.deploy.service.type", "NotesService");
-			System.setProperty("ducc.deploy.JpType", "uima");
-			System.setProperty("ducc.deploy.JpAeDescriptor","NoOpAE");
-			System.setProperty("ducc.deploy.JobDirectory",System.getProperty("user.dir"));
-			System.setProperty("ducc.deploy.JpFlowController","org.apache.uima.flow.FixedFlowController");
-			System.setProperty("ducc.process.log.dir",System.getProperty("user.dir"));
-			System.setProperty("ducc.job.id","2000");
-			ServiceWrapper service = new ServiceWrapper();
-
-			Timer fTimer = new Timer("testPullService Timer");
-			// after 5secs stop the pull service
-			fTimer.schedule(new MyTimerTask(service, fTimer), 20000);
-				
-			service.initialize(new String[] {analysisEngineDescriptor});
-
-			service.start();
-			
-
-		} catch (ServiceInitializationException e) {
-			throw e;
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			monitor.stop();
-			super.stopJetty();
-			File directory = new File(System.getProperty("user.dir").
-					 concat("/").concat(System.getProperty("ducc.job.id")));
-			
-			if ( directory.exists() ) {
-				for (File f : directory.listFiles()) {
-				    if (f.getName().startsWith("uima-ae-")) {
-				        f.delete();
-				        System.out.println("Removed generated descriptor:"+f.getAbsolutePath());
-				    }
-				}
-				directory.delete();
-				
-			}
-
-
-		}
-	}
-	class MyTimerTask extends TimerTask {
-		final ServiceWrapper service;
-		final Timer fTimer;
-
-		MyTimerTask(ServiceWrapper service, Timer fTimer) {
-			this.service = service;
-			this.fTimer = fTimer;
-		}
-
-		@Override
-
-		public void run() {
-			this.cancel();
-			fTimer.purge();
-			fTimer.cancel();
-			System.out.println("Timmer popped - stopping service");
-			service.stop();
+public class JUnitServiceWrapperTestCase extends Client {
+  private static final long DELAY = 5000;
+  {
+    // static initializer sets amount of time the service delays
+    // sending READY to a monitor
+    System.setProperty("ducc.service.init.delay", "3000");
+  }
+
+  @Test
+  public void testPullServiceWrapperNoTask() throws Exception {
+    // make client return null task in response to GET
+    System.setProperty("simulate.no.work", "3");
+    System.setProperty("ducc.process.thread.sleep.time", "1000");
+    try {
+      testPullServiceWrapper();
+    } finally {
+      System.getProperties().remove("simulate.no.work");
+    }
+  }
+
+  @Test
+  public void testPullServiceWrapper() throws Exception {
+    System.out.println("-------------------------- testPullServiceWrapper ----------------------");
+    ;
+
+    // int scaleout = 2;
+    StateMonitor monitor = new StateMonitor();
+    monitor.start();
+    System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT"));
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "TestAAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+    try {
+      System.setProperty("ducc.deploy.JdURL", tasURL);
+      System.setProperty("ducc.deploy.JpThreadCount", "12");
+      System.setProperty("ducc.deploy.service.type", "NotesService");
+      System.setProperty("ducc.deploy.JpType", "uima");
+
+      ServiceWrapper service = new ServiceWrapper();
+
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer), 40000);
+
+      service.initialize(new String[] { analysisEngineDescriptor });
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      monitor.stop();
+      super.stopJetty();
+
+    }
+  }
+
+  @Test
+  public void testPullServiceWrapperWithProcessFailure() throws Exception {
+    System.out.println(
+            "-------------------------- testPullServiceWrapperWithProcessFailure ----------------------");
+    ;
+    // int scaleout = 2;
+    StateMonitor monitor = new StateMonitor();
+    monitor.start();
+    System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT"));
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "NoOpAE";
+
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+    try {
+      // Force process failure of the first task
+      System.setProperty("ProcessFail", "1");
+
+      System.setProperty("ducc.deploy.JdURL", tasURL);
+      System.setProperty("ducc.deploy.JpThreadCount", "4");
+      System.setProperty("ducc.deploy.service.type", "NotesService");
+      System.setProperty("ducc.deploy.JpType", "uima");
+      // use default error window (1,1)
+      ServiceWrapper service = new ServiceWrapper();
+
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer), 10000);
+
+      service.initialize(new String[] { analysisEngineDescriptor });
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      monitor.stop();
+      System.getProperties().remove("ProcessFail");
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testPullServiceWrapperDDGenerator() throws Exception {
+    System.out.println(
+            "-------------------------- testPullServiceWrapperDDGenerator ----------------------");
+    ;
+
+    // int scaleout = 2;
+    StateMonitor monitor = new StateMonitor();
+    monitor.start();
+    System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT"));
+    super.startJetty(false); // don't block
+    // Dont change the name of TestAAE.xml. This is setup to fail file lookup and force
+    // generation of AE descriptor.
+    String analysisEngineDescriptor = "TestAAE.xml";
+    System.setProperty("ducc.deploy.JpType", "uima");
+
+    String tasURL = "http://localhost:" + super.getPort() + "/test";
+    try {
+
+      System.setProperty("ducc.deploy.JdURL", tasURL);
+      System.setProperty("ducc.deploy.JpThreadCount", "4");
+      System.setProperty("ducc.deploy.service.type", "NotesService");
+      System.setProperty("ducc.deploy.JpType", "uima");
+      System.setProperty("ducc.deploy.JpAeDescriptor", "NoOpAE");
+      System.setProperty("ducc.deploy.JobDirectory", System.getProperty("user.dir"));
+      System.setProperty("ducc.deploy.JpFlowController",
+              "org.apache.uima.flow.FixedFlowController");
+      System.setProperty("ducc.process.log.dir", System.getProperty("user.dir"));
+      System.setProperty("ducc.job.id", "2000");
+      ServiceWrapper service = new ServiceWrapper();
+
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer), 20000);
+
+      service.initialize(new String[] { analysisEngineDescriptor });
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      monitor.stop();
+      super.stopJetty();
+      File directory = new File(
+              System.getProperty("user.dir").concat("/").concat(System.getProperty("ducc.job.id")));
+
+      if (directory.exists()) {
+        for (File f : directory.listFiles()) {
+          if (f.getName().startsWith("uima-ae-")) {
+            f.delete();
+            System.out.println("Removed generated descriptor:" + f.getAbsolutePath());
+          }
+        }
+        directory.delete();
+
+      }
+
+    }
+  }
+
+  class MyTimerTask extends TimerTask {
+    final ServiceWrapper service;
+
+    final Timer fTimer;
+
+    MyTimerTask(ServiceWrapper service, Timer fTimer) {
+      this.service = service;
+      this.fTimer = fTimer;
+    }
+
+    @Override
+
+    public void run() {
+      this.cancel();
+      fTimer.purge();
+      fTimer.cancel();
+      System.out.println("Timmer popped - stopping service");
+      service.stop();
 
-		}
+    }
 
-	}
+  }
 
 }

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java Wed May  1 13:42:39 2019
@@ -24,6 +24,7 @@ import java.lang.management.ManagementFa
 import java.lang.management.RuntimeMXBean;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.uima.UIMAFramework;
@@ -126,7 +127,24 @@ public class NoOpAE extends CasAnnotator
    				}
     		}
    		}
+   		try {
+   			//int n = getRandomNumberInRange(2000,3000);
+   			int n = getRandomNumberInRange(250,450);
+   			//System.out.println(" AE Sleeping for "+n + " millis");
+   	   	 Thread.sleep(n);
+   			
+   		} catch( InterruptedException e) {
+   			
+   		}
     }
+    private static int getRandomNumberInRange(int min, int max) {
 
+		if (min >= max) {
+			throw new IllegalArgumentException("max must be greater than min");
+		}
+
+		Random r = new Random();
+		return r.nextInt((max - min) + 1) + min;
+	}
 
 }