You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/05/01 13:42:39 UTC
svn commit: r1858488 [2/3] - in
/uima/uima-ducc/trunk/uima-ducc-pullservice/src:
main/java/org/apache/uima/ducc/ps/sd/task/
main/java/org/apache/uima/ducc/ps/sd/task/transport/
main/java/org/apache/uima/ducc/ps/service/protocol/builtin/
main/java/org/a...
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java Wed May 1 13:42:39 2019
@@ -1,20 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.uima.ducc.ps.service.transport.http;
@@ -30,6 +30,7 @@ import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.http.HttpEntity;
@@ -61,358 +62,408 @@ import org.apache.uima.ducc.ps.service.u
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
public class HttpServiceTransport implements IServiceTransport {
- private Logger logger = UIMAFramework.getLogger(HttpServiceTransport.class);
- private HttpClient httpClient = null;
- private PoolingHttpClientConnectionManager cMgr = null;
- private int clientMaxConnections = 1;
- private int clientMaxConnectionsPerRoute = 1;
- private int clientMaxConnectionsPerHostPort = 0;
- private ReentrantLock registryLookupLock = new ReentrantLock();
- private long threadSleepTime=1000; // millis
- private final String nodeIP;
- private final String nodeName;
- private final String pid;
- private ITargetURI currentTargetUrl = new NoOpTargetURI();
- private static final String NA="N/A";
- private TransportStats stats = new TransportStats();
- private IRegistryClient registryClient;
- // holds reference to HttpPost object for every thread. Key=thread id
- private Map<Long,HttpPost> httpPostMap =
- new HashMap<>();
- private volatile boolean stopping = false;
- private volatile boolean running = false;
- private volatile boolean log = true;
-
- public HttpServiceTransport(IRegistryClient registryClient, int scaleout) throws ServiceException {
- this.registryClient = registryClient;
- clientMaxConnections = scaleout;
-
-
- if ( Objects.isNull(System.getenv("DUCC_IP")) || Objects.isNull(System.getenv("DUCC_NODENAME"))) {
- try {
- nodeIP = InetAddress.getLocalHost().getHostAddress();
- nodeName=InetAddress.getLocalHost().getCanonicalHostName();
-
- } catch( UnknownHostException e) {
- throw new RuntimeException(new TransportException("HttpServiceTransport.ctor - Unable to determine Host Name and IP",e));
- }
-
- } else {
- // Use agent provided node identity. This is important when running in a sim mode
- // where nodes are virtual.
- nodeIP = System.getenv("DUCC_IP");
- nodeName = System.getenv("DUCC_NODENAME");
-
- }
- pid = getProcessIP(NA);
- }
- private HttpPost getPostMethodForCurrentThread() {
- HttpPost postMethod;
- if ( !httpPostMap.containsKey(Thread.currentThread().getId())) {
- // each thread needs its own PostMethod
- postMethod =
- new HttpPost(currentTargetUrl.asString());
- httpPostMap.put(Thread.currentThread().getId(),postMethod);
- } else {
- postMethod = httpPostMap.get(Thread.currentThread().getId());
- }
- return postMethod;
- }
- private String getProcessIP(final String fallback) {
- // the following code returns '<pid>@<hostname>'
- String name = ManagementFactory.getRuntimeMXBean().getName();
- int pos = name.indexOf('@');
-
- if (pos < 1) {
- // pid not found
- return fallback;
- }
-
- try {
- return Long.toString(Long.parseLong(name.substring(0, pos)));
- } catch (NumberFormatException e) {
- // ignore
- }
- return fallback;
- }
- private void lookupNewTarget() {
- registryLookupLock.lock();
- while( !stopping ) {
- try {
- String newTarget = registryClient.lookUp(currentTargetUrl.asString());
- currentTargetUrl = TargetURIFactory.newTarget(newTarget);
- break;
- } catch( Exception e) {
- synchronized (httpClient) {
-
- try {
- httpClient.wait(threadSleepTime);
- } catch( InterruptedException ex) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- }
- if (registryLookupLock.isHeldByCurrentThread()) {
- registryLookupLock.unlock();
- }
- }
- public void addRequestorInfo(IMetaTaskTransaction transaction) {
- transaction.setRequesterAddress(nodeIP);
- transaction.setRequesterNodeName(nodeName);
- transaction.setRequesterProcessId(Integer.valueOf(pid));
- transaction.setRequesterThreadId((int)Thread.currentThread().getId());
- if ( logger.isLoggable(Level.FINE )) {
- logger.log(Level.FINE,"ip:"+transaction.getRequesterAddress());
- logger.log(Level.FINE, "nodeName:"+transaction.getRequesterNodeName());
- logger.log(Level.FINE, "processName:"+transaction.getRequesterProcessName());
- logger.log(Level.FINE,"processId:"+transaction.getRequesterProcessId());
- logger.log(Level.FINE, "threadId:"+transaction.getRequesterThreadId());
-
- }
-
- }
- public void initialize() throws ServiceInitializationException {
-
- // use plugged in registry to lookup target to connect to.
- // Sets global: currentTarget
- lookupNewTarget();
-
- cMgr = new PoolingHttpClientConnectionManager();
-
- if (clientMaxConnections > 0) {
- cMgr.setMaxTotal(clientMaxConnections);
- }
- // Set default max connections per route
- if (clientMaxConnectionsPerRoute > 0) {
- cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute);
- }
- HttpHost httpHost = new HttpHost(currentTargetUrl.asString(), Integer.valueOf(currentTargetUrl.getPort()), currentTargetUrl.getContext());
- if (clientMaxConnectionsPerHostPort > 0) {
- cMgr.setMaxPerRoute(new HttpRoute(httpHost), clientMaxConnectionsPerHostPort);
- }
-
- httpClient = HttpClients.custom().setConnectionManager(cMgr).build();
- running = true;
-
- }
- private void addCommonHeaders( HttpPost method ) {
- synchronized( HttpServiceTransport.class ) {
-
- method.setHeader("IP", nodeIP);
- method.setHeader("Hostname", nodeName);
- method.setHeader("ThreadID",
- String.valueOf(Thread.currentThread().getId()));
- method.setHeader("PID", pid);
-
- }
-
- }
-
- private HttpEntity wrapRequest(String serializedRequest) {
- return new StringEntity(serializedRequest, ContentType.APPLICATION_XML);
- }
-
- private boolean isRunning() {
- return running;
- }
-
- private IMetaTaskTransaction retryUntilSuccessfull(String request, HttpPost postMethod) {
- IMetaTaskTransaction response=null;
-
- // retry until service is stopped
- while (isRunning()) {
- try {
- response = doPost(postMethod);
- break;
-
- } catch (TransportException | IOException | URISyntaxException exx) {
- try {
- Thread.sleep(threadSleepTime);
- } catch( InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- lookupNewTarget();
-
- }
- return response;
-
- }
- private IMetaTaskTransaction doPost(HttpPost postMethod) throws URISyntaxException, IOException, TransportException {
- postMethod.setURI(new URI(currentTargetUrl.asString()));
-
- IMetaTaskTransaction metaTransaction=null;
- HttpResponse response = httpClient.execute(postMethod);
- // if ( stopping ) {
- // throw new TransportException("Service stopping - rejecting request");
- //}
- HttpEntity entity = response.getEntity();
- String serializedResponse = EntityUtils.toString(entity);
- Object transaction=null;
- try {
- transaction = XStreamUtils.unmarshall(serializedResponse);
- } catch(Exception e) {
- logger.log(Level.WARNING,"Process Thread:"+Thread.currentThread().getId()+" Error while deserializing response with XStream",e);
- throw new TransportException(e);
- }
- if ( Objects.isNull(transaction)) {
- throw new InvalidClassException(
- "Expected IMetaTaskTransaction - Instead Received NULL");
-
- } else if ( !(transaction instanceof IMetaTaskTransaction) ) {
- throw new InvalidClassException(
- "Expected IMetaTaskTransaction - Instead Received " + transaction.getClass().getName());
- }
- metaTransaction = (IMetaTaskTransaction) transaction;
-
- StatusLine statusLine = response.getStatusLine();
- if (statusLine.getStatusCode() != 200 ) {
- // all IOExceptions are retried
- throw new IOException(
- "Unexpected HttpClient response status:"+statusLine+ " Content causing error:"+serializedResponse);
- }
-
- stats.incrementSuccessCount();
- return metaTransaction;
- }
- /**
- * Dispatches request to remote driver via doPost(). Its synchronized to prevent over-running the driver with
- * requests from multiple threads. When the transport fails sending GET/ACK/END a single thread
- * will try to recover connection and send the request.
- *
- */
- @Override
- public synchronized IMetaTaskTransaction dispatch(String serializedRequest) throws TransportException {
- //if ( stopping ) {
- // throw new IllegalStateException("Service transport has been stopped, unable to dispatch request");
- // }
- IMetaTaskTransaction transaction=null;
- HttpEntity e = wrapRequest(serializedRequest);
- // Each thread has its own HttpPost method. If current thread
- // doesnt have one, it will be created and added to the local
- // Map. Subsequent requests will fetch it from the map using
- // current thread ID as a key.
- HttpPost postMethod = getPostMethodForCurrentThread();
- addCommonHeaders(postMethod);
- postMethod.setEntity(e);
- try {
- String simulatedException;
- // To test transport errors add command line option -DMockHttpPostError=exception where
- // exception is one of the following Strings:
- //
- // IOException,
- // SocketException,
- // UnknownHostException,
- // NoRouteToHostException,
- // NoHttpResponseException,
- // HttpHostConnectException,
- // URISyntaxException
- // Use JUnit test JunitTransoirtTestCase to test the above errors
-
- if ( ( simulatedException = System.getProperty("MockHttpPostError")) != null ) {
- HttpClientExceptionGenerator mockExceptionGenerator =
- new HttpClientExceptionGenerator(simulatedException);
- mockExceptionGenerator.throwSimulatedException();
- } else {
- transaction = doPost(postMethod);
- }
- } catch( IOException | URISyntaxException ex) {
- if ( stopping ) {
- // looks like the process is in the shutdown mode. Log an exception and dont retry
- logger.log(Level.INFO,"Process Thread:"+Thread.currentThread().getId()+" - Process is already stopping - Caught Exception while calling doPost() \n"+ex);
- throw new TransportException(ex);
- } else {
- if ( log ) {
- log = false;
- stats.incrementErrorCount();
- logger.log(Level.WARNING, this.getClass().getName()+".dispatch() >>>>>>>>>> Handling Exception \n"+ex);
- logger.log(Level.INFO, ">>>>>>>>>> Unable to communicate with target:"+currentTargetUrl.asString()+" - retrying until successfull - with "+threadSleepTime/1000+" seconds wait between retries ");
- }
- transaction = retryUntilSuccessfull(serializedRequest, postMethod);
- log = true;
- logger.log(Level.INFO, "Established connection to target:"+currentTargetUrl.asString());
- }
-
-
- } finally {
- postMethod.releaseConnection();
- }
- return transaction;
-
- }
-
- public void stop(boolean quiesce) {
-
- stopping = true;
- running = false;
- // Use System.out since the logger's ShutdownHook may have closed streams
- System.out.println(Utils.getTimestamp()+">>>>>>> "+Utils.getShortClassname(this.getClass())+" stop() called - mode:"+(quiesce==true?"quiesce":"stop"));
- logger.log(Level.INFO,this.getClass().getName()+" stop() called");
- if ( !quiesce && cMgr != null ) {
- cMgr.shutdown();
- System.out.println(Utils.getTimestamp()+">>>>>>> "+Utils.getShortClassname(this.getClass())+" stopped connection mgr");
- logger.log(Level.INFO,this.getClass().getName()+" stopped connection mgr");
-
- }
- }
- public static void main(String[] args) {
-
- }
-
- public static class HttpClientExceptionGenerator {
- public enum ERROR{ IOException, SocketException, UnknownHostException, NoRouteToHostException,NoHttpResponseException, HttpHostConnectException, URISyntaxException};
-
- Exception exceptionClass=null;
-
- public HttpClientExceptionGenerator(String exc) {
-
- for( ERROR e : ERROR.values()) {
- if ( exc != null && e.name().equals(exc)) {
- switch(e) {
- case IOException:
- exceptionClass = new IOException("Simulated IOException");
- break;
- case URISyntaxException:
- exceptionClass = new URISyntaxException("", "Simulated URISyntaxException");
- break;
- case NoRouteToHostException:
- exceptionClass = new NoRouteToHostException("Simulated NoRouteToHostException");
- break;
- case NoHttpResponseException:
- exceptionClass = new NoHttpResponseException("Simulated NoHttpResponseException");
- break;
- case SocketException:
- exceptionClass = new SocketException("Simulated SocketException");
- break;
- case UnknownHostException:
- exceptionClass = new UnknownHostException("Simulated UnknownHostException");
- break;
-
- default:
-
-
- }
- if ( exceptionClass != null ) {
- break;
- }
- }
- }
- }
- public void throwSimulatedException() throws IOException, URISyntaxException {
- if ( exceptionClass != null ) {
- if ( exceptionClass instanceof IOException ) {
- throw (IOException)exceptionClass;
- } else if ( exceptionClass instanceof URISyntaxException ) {
- throw (URISyntaxException)exceptionClass;
- }
-
- }
- }
-
-
- }
+ private Logger logger = UIMAFramework.getLogger(HttpServiceTransport.class);
+
+ private HttpClient httpClient = null;
+
+ private PoolingHttpClientConnectionManager cMgr = null;
+
+ private int clientMaxConnections = 1;
+
+ private int clientMaxConnectionsPerRoute = 1;
+
+ private int clientMaxConnectionsPerHostPort = 0;
+
+ private ReentrantLock registryLookupLock = new ReentrantLock();
+
+ private long threadSleepTime = 1000; // millis
+
+ private final String nodeIP;
+
+ private final String nodeName;
+
+ private final String pid;
+
+ private ITargetURI currentTargetUrl = new NoOpTargetURI();
+
+ private static final String NA = "N/A";
+
+ private TransportStats stats = new TransportStats();
+
+ private IRegistryClient registryClient;
+
+ // holds reference to HttpPost object for every thread. Key=thread id
+ private Map<Long, HttpPost> httpPostMap = new HashMap<>();
+
+ private volatile boolean stopping = false;
+
+ private volatile boolean running = false;
+
+ private volatile boolean log = true;
+
+ private AtomicLong xstreamTime = new AtomicLong();
+ // private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long,
+ // XStream>>() {
+ // @Override
+ // protected HashMap<Long, XStream> initialValue() {
+ // return new HashMap<Long, XStream>();
+ // }
+ // };
+
+ public HttpServiceTransport(IRegistryClient registryClient, int scaleout)
+ throws ServiceException {
+ this.registryClient = registryClient;
+ clientMaxConnections = scaleout;
+
+ try {
+ nodeIP = InetAddress.getLocalHost().getHostAddress();
+ nodeName = InetAddress.getLocalHost().getCanonicalHostName();
+ pid = getProcessIP(NA);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(new TransportException(
+ "HttpServiceTransport.ctor - Unable to determine Host Name and IP", e));
+ }
+
+ }
+
+ private HttpPost getPostMethodForCurrentThread() {
+ HttpPost postMethod;
+ if (!httpPostMap.containsKey(Thread.currentThread().getId())) {
+ // each thread needs its own PostMethod
+ postMethod = new HttpPost(currentTargetUrl.asString());
+ httpPostMap.put(Thread.currentThread().getId(), postMethod);
+ } else {
+ postMethod = httpPostMap.get(Thread.currentThread().getId());
+ }
+ return postMethod;
+ }
+
+ private String getProcessIP(final String fallback) {
+ // the following code returns '<pid>@<hostname>'
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ int pos = name.indexOf('@');
+
+ if (pos < 1) {
+ // pid not found
+ return fallback;
+ }
+
+ try {
+ return Long.toString(Long.parseLong(name.substring(0, pos)));
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ return fallback;
+ }
+
+ private void lookupNewTarget() {
+ registryLookupLock.lock();
+ while (!stopping) {
+ try {
+ String newTarget = registryClient.lookUp(currentTargetUrl.asString());
+ currentTargetUrl = TargetURIFactory.newTarget(newTarget);
+ break;
+ } catch (Exception e) {
+ synchronized (httpClient) {
+
+ try {
+ httpClient.wait(threadSleepTime);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ }
+ if (registryLookupLock.isHeldByCurrentThread()) {
+ registryLookupLock.unlock();
+ }
+ }
+
+ public void addRequestorInfo(IMetaTaskTransaction transaction) {
+ transaction.setRequesterAddress(nodeIP);
+ transaction.setRequesterNodeName(nodeName);
+ transaction.setRequesterProcessId(Integer.valueOf(pid));
+ transaction.setRequesterThreadId((int) Thread.currentThread().getId());
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "ip:" + transaction.getRequesterAddress());
+ logger.log(Level.FINE, "nodeName:" + transaction.getRequesterNodeName());
+ logger.log(Level.FINE, "processName:" + transaction.getRequesterProcessName());
+ logger.log(Level.FINE, "processId:" + transaction.getRequesterProcessId());
+ logger.log(Level.FINE, "threadId:" + transaction.getRequesterThreadId());
+
+ }
+
+ }
+
+ public void initialize() throws ServiceInitializationException {
+
+ // use plugged in registry to lookup target to connect to.
+ // Sets global: currentTarget
+ lookupNewTarget();
+
+ cMgr = new PoolingHttpClientConnectionManager();
+
+ if (clientMaxConnections > 0) {
+ cMgr.setMaxTotal(clientMaxConnections);
+ }
+ // Set default max connections per route
+ if (clientMaxConnectionsPerRoute > 0) {
+ cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute);
+ }
+ HttpHost httpHost = new HttpHost(currentTargetUrl.asString(),
+ Integer.valueOf(currentTargetUrl.getPort()), currentTargetUrl.getContext());
+ if (clientMaxConnectionsPerHostPort > 0) {
+ cMgr.setMaxPerRoute(new HttpRoute(httpHost), clientMaxConnectionsPerHostPort);
+ }
+
+ httpClient = HttpClients.custom().setConnectionManager(cMgr).build();
+ running = true;
+
+ }
+
+ private void addCommonHeaders(HttpPost method) {
+ // synchronized( HttpServiceTransport.class ) {
+
+ method.setHeader("IP", nodeIP);
+ method.setHeader("Hostname", nodeName);
+ method.setHeader("ThreadID", String.valueOf(Thread.currentThread().getId()));
+ method.setHeader("PID", pid);
+
+ // }
+
+ }
+
+ private HttpEntity wrapRequest(String serializedRequest) {
+ return new StringEntity(serializedRequest, ContentType.APPLICATION_XML);
+ }
+
+ private boolean isRunning() {
+ return running;
+ }
+
+ private IMetaTaskTransaction retryUntilSuccessfull(String request, HttpPost postMethod,
+ ThreadLocal<HashMap<Long, XStream>> localXStream) {
+ IMetaTaskTransaction response = null;
+
+ // retry until service is stopped
+ while (isRunning()) {
+ try {
+ response = doPost(postMethod, localXStream);
+ break;
+
+ } catch (TransportException | IOException | URISyntaxException exx) {
+ try {
+ Thread.sleep(threadSleepTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ lookupNewTarget();
+
+ }
+ return response;
+
+ }
+
+ private IMetaTaskTransaction doPost(HttpPost postMethod,
+ ThreadLocal<HashMap<Long, XStream>> localXStream)
+ throws URISyntaxException, IOException, TransportException {
+ postMethod.setURI(new URI(currentTargetUrl.asString()));
+
+ IMetaTaskTransaction metaTransaction = null;
+ HttpResponse response = httpClient.execute(postMethod);
+ if (stopping) {
+ throw new TransportException("Service stopping - rejecting request");
+ }
+ HttpEntity entity = response.getEntity();
+ String serializedResponse = EntityUtils.toString(entity);
+ Object transaction = null;
+ try {
+ long t1 = System.currentTimeMillis();
+ // transaction = XStreamUtils.unmarshall(serializedResponse);
+ transaction = localXStream.get().get(Thread.currentThread().getId())
+ .fromXML(serializedResponse);
+ xstreamTime.addAndGet((System.currentTimeMillis() - t1));
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Process Thread:" + Thread.currentThread().getId()
+ + " Error while deserializing response with XStream", e);
+ throw new TransportException(e);
+ }
+ if (Objects.isNull(transaction)) {
+ throw new InvalidClassException("Expected IMetaTaskTransaction - Instead Received NULL");
+
+ } else if (!(transaction instanceof IMetaTaskTransaction)) {
+ throw new InvalidClassException("Expected IMetaTaskTransaction - Instead Received "
+ + transaction.getClass().getName());
+ }
+ metaTransaction = (IMetaTaskTransaction) transaction;
+
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() != 200) {
+ // all IOExceptions are retried
+ throw new IOException("Unexpected HttpClient response status:" + statusLine
+ + " Content causing error:" + serializedResponse);
+ }
+
+ stats.incrementSuccessCount();
+ return metaTransaction;
+ }
+
+ /**
+ * Dispatches request to remote driver via doPost(). Its synchronized to prevent over-running the
+ * driver with requests from multiple threads. When the transport fails sending GET/ACK/END a
+ * single thread will try to recover connection and send the request.
+ *
+ */
+ @Override
+ public IMetaTaskTransaction dispatch(String serializedRequest,
+ ThreadLocal<HashMap<Long, XStream>> localXStream) throws TransportException {
+ if (stopping) {
+ throw new IllegalStateException(
+ "Service transport has been stopped, unable to dispatch request");
+ }
+ IMetaTaskTransaction transaction = null;
+ HttpEntity e = wrapRequest(serializedRequest);
+ // Each thread has its own HttpPost method. If current thread
+ // doesnt have one, it will be created and added to the local
+ // Map. Subsequent requests will fetch it from the map using
+ // current thread ID as a key.
+ HttpPost postMethod = getPostMethodForCurrentThread();
+ addCommonHeaders(postMethod);
+ postMethod.setEntity(e);
+ try {
+ String simulatedException;
+ // To test transport errors add command line option
+ // -DMockHttpPostError=exception where
+ // exception is one of the following Strings:
+ //
+ // IOException,
+ // SocketException,
+ // UnknownHostException,
+ // NoRouteToHostException,
+ // NoHttpResponseException,
+ // HttpHostConnectException,
+ // URISyntaxException
+ // Use JUnit test JunitTransoirtTestCase to test the above errors
+
+ if ((simulatedException = System.getProperty("MockHttpPostError")) != null) {
+ HttpClientExceptionGenerator mockExceptionGenerator = new HttpClientExceptionGenerator(
+ simulatedException);
+ mockExceptionGenerator.throwSimulatedException();
+ } else {
+ transaction = doPost(postMethod, localXStream);
+ }
+ } catch (IOException | URISyntaxException ex) {
+ if (stopping) {
+ // looks like the process is in the shutdown mode. Log an exception and dont
+ // retry
+ logger.log(Level.INFO, "Process Thread:" + Thread.currentThread().getId()
+ + " - Process is already stopping - Caught Exception while calling doPost() \n"
+ + ex);
+ throw new TransportException(ex);
+ } else {
+ if (log) {
+ log = false;
+ stats.incrementErrorCount();
+ logger.log(Level.WARNING,
+ this.getClass().getName() + ".dispatch() >>>>>>>>>> Handling Exception \n" + ex);
+ logger.log(Level.INFO,
+ ">>>>>>>>>> Unable to communicate with target:" + currentTargetUrl.asString()
+ + " - retrying until successfull - with " + threadSleepTime / 1000
+ + " seconds wait between retries ");
+ }
+ transaction = retryUntilSuccessfull(serializedRequest, postMethod, localXStream);
+ log = true;
+ logger.log(Level.INFO, "Established connection to target:" + currentTargetUrl.asString());
+ }
+
+ } finally {
+ postMethod.releaseConnection();
+ }
+ return transaction;
+
+ }
+
+ public void stop(boolean quiesce) {
+
+ stopping = true;
+ running = false;
+ // Use System.out since the logger's ShutdownHook may have closed streams
+ System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(this.getClass())
+ + " stop() called - mode:" + (quiesce == true ? "quiesce" : "stop"));
+ logger.log(Level.INFO, this.getClass().getName() + " stop() called");
+ System.out.println(" ########################################3 Total time in XStream:"
+ + (xstreamTime.get() / 1000) + " secs");
+ if (!quiesce && cMgr != null) {
+ cMgr.shutdown();
+ System.out.println(Utils.getTimestamp() + ">>>>>>> "
+ + Utils.getShortClassname(this.getClass()) + " stopped connection mgr");
+ logger.log(Level.INFO, this.getClass().getName() + " stopped connection mgr");
+
+ }
+ }
+
+ public static void main(String[] args) {
+
+ }
+
+ public static class HttpClientExceptionGenerator {
+ public enum ERROR {
+ IOException, SocketException, UnknownHostException, NoRouteToHostException, NoHttpResponseException, HttpHostConnectException, URISyntaxException
+ };
+
+ Exception exceptionClass = null;
+
+ public HttpClientExceptionGenerator(String exc) {
+
+ for (ERROR e : ERROR.values()) {
+ if (exc != null && e.name().equals(exc)) {
+ switch (e) {
+ case IOException:
+ exceptionClass = new IOException("Simulated IOException");
+ break;
+ case URISyntaxException:
+ exceptionClass = new URISyntaxException("", "Simulated URISyntaxException");
+ break;
+ case NoRouteToHostException:
+ exceptionClass = new NoRouteToHostException("Simulated NoRouteToHostException");
+ break;
+ case NoHttpResponseException:
+ exceptionClass = new NoHttpResponseException("Simulated NoHttpResponseException");
+ break;
+ case SocketException:
+ exceptionClass = new SocketException("Simulated SocketException");
+ break;
+ case UnknownHostException:
+ exceptionClass = new UnknownHostException("Simulated UnknownHostException");
+ break;
+
+ default:
+
+ }
+ if (exceptionClass != null) {
+ break;
+ }
+ }
+ }
+ }
+
+ public void throwSimulatedException() throws IOException, URISyntaxException {
+ if (exceptionClass != null) {
+ if (exceptionClass instanceof IOException) {
+ throw (IOException) exceptionClass;
+ } else if (exceptionClass instanceof URISyntaxException) {
+ throw (URISyntaxException) exceptionClass;
+ }
+
+ }
+ }
+
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java Wed May 1 13:42:39 2019
@@ -21,6 +21,12 @@ package org.apache.uima.ducc.ps;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,6 +40,7 @@ import org.apache.uima.cas.CAS;
import org.apache.uima.ducc.ps.net.iface.IMetaTask;
import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Direction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type;
import org.apache.uima.ducc.ps.net.impl.MetaTask;
import org.apache.uima.ducc.ps.service.transport.XStreamUtils;
import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
@@ -48,243 +55,441 @@ import org.eclipse.jetty.servlet.Servlet
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
public class Client {
- private Server server;
- private boolean block = false;
- private AtomicLong errorCount = new AtomicLong();
- private final static String app="test";
- private int httpPort = 12222;
- private int maxThreads = 50;
- private static UimaSerializer uimaSerializer = new UimaSerializer();
- private AtomicInteger correlationIdCounter =
- new AtomicInteger(0);
- private AtomicInteger atomicCounter =
- new AtomicInteger(1);
- private AtomicInteger atomicErrorCounter =
- new AtomicInteger(10);
- private volatile boolean noMoreErrors = false;
- protected String getApp() {
- return app;
- }
- protected int getJettyPort() {
- while(true) {
- ServerSocket socket=null;
- try {
- socket = new ServerSocket(httpPort);
- break;
- } catch( IOException e) {
- httpPort++;
- } finally {
- if ( socket != null ) {
- try {
- socket.close();
- } catch( Exception ee) {}
-
- }
- }
- }
- return httpPort;
- }
- protected int getPort() {
-
- return httpPort;
- }
-
- public void startJetty(boolean block) throws Exception {
- this.block = block;
-
-
- QueuedThreadPool threadPool = new QueuedThreadPool();
- if (maxThreads < threadPool.getMinThreads()) {
- System.out.println(
- "Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to "+threadPool.getMinThreads()+". Defaulting to jettyMaxThreads="+threadPool.getMaxThreads());
- threadPool.setMaxThreads(threadPool.getMinThreads());
- } else {
- threadPool.setMaxThreads(maxThreads);
- }
-
- server = new Server(threadPool);
-
- // Server connector
- ServerConnector connector = new ServerConnector(server);
- connector.setPort(getJettyPort());
- server.setConnectors(new Connector[] { connector });
- System.out.println("launching Jetty on Port:"+connector.getPort());
- ServletContextHandler context = new ServletContextHandler(
- ServletContextHandler.SESSIONS);
- context.setContextPath("/");
- server.setHandler(context);
-
- context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/"+app);
-
-
- server.start();
- System.out.println("Jetty Started - Waiting for Messages ...");
- }
-
- @After
- public void stopJetty()
- {
- try
- {
- if ( server != null ) {
- UIMAFramework.getLogger().log(Level.INFO, "Stopping Jetty");
- server.stop();
-
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- UIMAFramework.getLogger().log(Level.INFO,"Jetty Stopped");
- }
- public class TaskHandlerServlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
-
- public TaskHandlerServlet() {
- }
-
- protected void doPost(HttpServletRequest request,
- HttpServletResponse response) throws ServletException,
- IOException {
- try {
- //System.out.println("Handling HTTP Post Request");
- //long post_stime = System.nanoTime();
- StringBuilder sb = new StringBuilder();
- BufferedReader reader = request.getReader();
- String line;
- while ((line = reader.readLine()) != null) {
- sb.append(line);
- }
- String content = sb.toString().trim();
-
- //System.out.println( "Http Request Body:::"+String.valueOf(content));
-
-
- String nodeIP = request.getHeader("IP");
- String nodeName = request.getHeader("Hostname");
- String threadID = request.getHeader("ThreadID");
- String pid = request.getHeader("PID");
- System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+" ThreadID:"+threadID);
-
- IMetaTaskTransaction imt = null;
-
- imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
- IMetaTaskTransaction.Type type = imt.getType();
- switch(type) {
- case Get:
- System.out.println("---- Driver handling GET Request -- Thread:"+Thread.currentThread().getId());
- imt.setMetaTask(getMetaMetaCas());
- imt.getMetaTask().setAppData("CorrelationID-"+correlationIdCounter.incrementAndGet());
- if ( System.getProperty("simulate.no.work") == null || noMoreErrors) {
- imt.getMetaTask().setUserSpaceTask(getSerializedCAS());
- } else {
- System.out.println("---- Driver handling GET Request -- Client Out of Tasks -- Thread:"+Thread.currentThread().getId());
- imt.getMetaTask().setUserSpaceTask(null);
- if ( atomicErrorCounter.decrementAndGet() == 0 ) {
- noMoreErrors = true;
- }
- }
- // handleMetaCasTransationGet(trans, taskConsumer);
- break;
- case Ack:
- System.out.println("---- Driver handling ACK Request - ");
- //handleMetaCasTransationAck(trans, taskConsumer);
- break;
- case End:
- System.out.println("---- Driver handling END Request - "+imt.getMetaTask().getAppData());
- //handleMetaCasTransationEnd(trans, taskConsumer);
- if ( imt.getMetaTask().getUserSpaceException() != null ) {
- System.out.println("Client received error#"+errorCount.incrementAndGet());
- }
- break;
- case InvestmentReset:
- // handleMetaCasTransationInvestmentReset(trans, rwt);
- break;
- default:
- break;
- }
- // process service request
- //taskProtocolHandler.handle(imt);
-
- //long marshall_stime = System.nanoTime();
- // setup reply
-
- imt.setDirection(Direction.Response);
-
- response.setStatus(HttpServletResponse.SC_OK);
-
- response.setHeader("content-type", "text/xml");
- String body = XStreamUtils.marshall(imt);
-
- if (block ) {
- synchronized(this) {
- this.wait(0);
- }
-
- }
- System.out.println("Sending response");
- response.getWriter().write(body);
-
-
- //response.getWriter().write(content);
- } catch( InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- catch (Throwable e) {
- e.printStackTrace();
- throw new ServletException(e);
- }
- }
-
- }
- public long getErrorCount() {
- return errorCount.get();
- }
- private IMetaTask getMetaCas(String serializedCas) {
- if ( serializedCas == null ) {
- return null;
- }
- return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas);
- }
-
- private IMetaTask getMetaMetaCas() {
- //IMetaMetaCas mmc = new MetaMetaCas();
-
- String serializedCas = "Bogus";
-
- IMetaTask metaCas = getMetaCas(serializedCas);
-
- // mmc.setMetaCas(metaCas);
- //return mmc;
- return metaCas;
- }
- public String getSerializedCAS() {
- //logger.log(Level.INFO,"getSerializedCAS() Call "+seqno.incrementAndGet()
- // + " - from "+taskConsumer.getType()+":"+taskConsumer.getHostName()+"-"+taskConsumer.getPid()+"-"+taskConsumer.getThreadId() );
- String serializedCas = null;
- try {
- CAS cas = null;
- cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null);
- cas.setDocumentLanguage("en");
-
- //logger.log(Level.INFO,"delivering: " + text);
- cas.setDocumentText("TEST");
-// cas.setDocumentText("100 "+seqno.incrementAndGet()+" 1000 0");
-
- serializedCas = serialize(cas);
- cas.reset();
- cas.release();
-
- } catch( Exception e) {
- //logger.log(Level.WARNING,"Error",e);
- }
-
- return serializedCas;
- }
- private String serialize(CAS cas) throws Exception {
- String serializedCas = uimaSerializer.serializeCasToXmi(cas);
- return serializedCas;
- }
+ private Server server;
+
+ private boolean block = false;
+
+ private AtomicLong errorCount = new AtomicLong();
+
+ private AtomicLong taskCount = new AtomicLong();
+
+ private final static String app = "test";
+
+ private int httpPort = 12222;
+
+ private int maxThreads = 50;
+
+ private volatile boolean print = true;
+
+ private static UimaSerializer uimaSerializer = new UimaSerializer();
+
+ private AtomicInteger correlationIdCounter = new AtomicInteger(0);
+
+ private AtomicInteger atomicCounter = new AtomicInteger(1);
+
+ private AtomicInteger atomicErrorCounter = new AtomicInteger(16);
+
+ private volatile boolean noMoreErrors = false;
+
+ Map<String, List<ThreadMetrics>> metrics = new ConcurrentHashMap<>();
+
+ private AtomicLong idleTime = new AtomicLong();
+
+ private AtomicLong lastTime = new AtomicLong();
+
+ private AtomicLong xstreamTime = new AtomicLong();
+
+ private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long, XStream>>() {
+ @Override
+ protected HashMap<Long, XStream> initialValue() {
+ return new HashMap<Long, XStream>();
+ }
+ };
+
+ protected String getApp() {
+ return app;
+ }
+
+ protected int getJettyPort() {
+ while (true) {
+ ServerSocket socket = null;
+ try {
+ socket = new ServerSocket(httpPort);
+ break;
+ } catch (IOException e) {
+ httpPort++;
+ } finally {
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (Exception ee) {
+ }
+
+ }
+ }
+ }
+ return httpPort;
+ }
+
+ protected int getPort() {
+
+ return httpPort;
+ }
+
+ public void startJetty(boolean block) throws Exception {
+ this.block = block;
+
+ QueuedThreadPool threadPool = new QueuedThreadPool();
+ if (maxThreads < threadPool.getMinThreads()) {
+ System.out.println("Invalid value for jetty MaxThreads(" + maxThreads
+ + ") - it should be greater or equal to " + threadPool.getMinThreads()
+ + ". Defaulting to jettyMaxThreads=" + threadPool.getMaxThreads());
+ threadPool.setMaxThreads(threadPool.getMinThreads());
+ } else {
+ threadPool.setMaxThreads(maxThreads);
+ }
+
+ server = new Server(threadPool);
+
+ // Server connector
+ ServerConnector connector = new ServerConnector(server);
+ System.out.println(">>>> Jetty Acceptors:" + connector.getAcceptors());
+
+ connector.setPort(getJettyPort());
+ server.setConnectors(new Connector[] { connector });
+ System.out.println("launching Jetty on Port:" + connector.getPort());
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+ server.setHandler(context);
+
+ context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/" + app);
+
+ server.start();
+ System.out.println("Jetty Started - Waiting for Messages ...");
+ }
+
+ @After
+ public void stopJetty() {
+ try {
+ if (server != null) {
+ UIMAFramework.getLogger().log(Level.INFO, "Stopping Jetty");
+ server.stop();
+
+ }
+ // System.out.println(">>>>>>>>>>>>>>> IDLE TIME:"+(idleTime.longValue()/1000));
+
+ if (print) {
+ print = false;
+
+ for (Entry<String, List<ThreadMetrics>> me : metrics.entrySet()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Service Thread Id:").append(me.getKey()).append(" Number of Tasks Processed:")
+ .append(me.getValue().size());
+ int i = 0;
+ for (ThreadMetrics tm : me.getValue()) {
+ long analysisTime = 0;
+ try {
+ analysisTime = Long.parseLong(tm.getAnalysisTime());
+ } catch (Exception e) {
+ }
+ ThreadMetrics previous = null;
+ if (i > 0) {
+ previous = me.getValue().get(i - 1);
+ }
+ sb.append("\n\tTask ").append(tm.getCorrelationId()).// append(" Ack Time:").
+ append(tm.getAckTime() - tm.getGetTime()).append(" Get-Ack-End:")
+ .append(tm.getEndTime() - tm.getGetTime()).append(" ms")
+ .append(" Analysis Time:").append(tm.getAnalysisTime()).append(" Overhead:")
+ .append((tm.getEndTime() - tm.getGetTime()) - analysisTime);
+ if (previous != null) {
+ sb.append(" Idle Time:").append(tm.getGetTime() - previous.getEndTime());
+ }
+ i++;
+
+ }
+
+ System.out.println(sb.toString());
+ System.out.println(">>>>> Total Tasks Processed:" + taskCount + " Client Time in xstream:"
+ + (xstreamTime.get() / 1000));
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ UIMAFramework.getLogger().log(Level.INFO, "Jetty Stopped");
+ }
+
+ public class TaskHandlerServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ public TaskHandlerServlet() {
+ }
+
+ protected void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ try {
+ // System.out.println("Handling HTTP Post Request");
+ // long post_stime = System.nanoTime();
+ StringBuilder sb = new StringBuilder();
+ BufferedReader reader = request.getReader();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line);
+ }
+ String content = sb.toString().trim();
+
+ // System.out.println( "Http Request Body:::"+String.valueOf(content));
+
+ if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+ localXStream.get().put(Thread.currentThread().getId(), XStreamUtils.getXStreamInstance());// new
+ // XStream(new
+ // DomDriver()));
+ }
+
+ String nodeIP = request.getHeader("IP");
+ String nodeName = request.getHeader("Hostname");
+ String threadID = request.getHeader("ThreadID");
+ String pid = request.getHeader("PID");
+ // System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+"
+ // ThreadID:"+threadID);
+
+ IMetaTaskTransaction imt = null;
+ long t1 = System.currentTimeMillis();
+ // imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
+ imt = (IMetaTaskTransaction) localXStream.get().get(Thread.currentThread().getId())
+ .fromXML(content);
+ xstreamTime.addAndGet(System.currentTimeMillis() - t1);
+ IMetaTaskTransaction.Type type = imt.getType();
+ switch (type) {
+ case Get:
+ // idleTime.addAndGet(System.currentTimeMillis() - lastTime.longValue());
+ // lastTime.set(System.currentTimeMillis());
+
+ taskCount.incrementAndGet();
+ // System.out.println("---- Driver handling GET Request -- Thread:"+threadID);
+ List<ThreadMetrics> tmList = null;
+ if (metrics.containsKey(threadID)) {
+ tmList = metrics.get(threadID);
+ } else {
+ tmList = new ArrayList<>();
+ metrics.put(threadID, tmList);
+
+ }
+ ThreadMetrics tm = new ThreadMetrics();
+ tm.setGetTime(System.currentTimeMillis());
+ tmList.add(tm);
+
+ imt.setMetaTask(getMetaMetaCas());
+
+ imt.getMetaTask().setAppData("CorrelationID-" + correlationIdCounter.incrementAndGet());
+ tm.setCorrelationId(imt.getMetaTask().getAppData());
+ if (System.getProperty("simulate.no.work") == null || noMoreErrors) {
+ imt.getMetaTask().setUserSpaceTask(getSerializedCAS());
+ } else {
+ System.out
+ .println("---- Driver handling GET Request -- Client Out of Tasks -- Thread:"
+ + threadID);
+ imt.getMetaTask().setUserSpaceTask(null);
+ if (atomicErrorCounter.decrementAndGet() == 0) {
+ noMoreErrors = true;
+ }
+ }
+ // handleMetaCasTransationGet(trans, taskConsumer);
+ break;
+ case Ack:
+ // System.out.println("---- Driver handling ACK Request - ");
+ List<ThreadMetrics> tmList2 = metrics.get(threadID);
+ for (ThreadMetrics tm2 : tmList2) {
+ if (imt.getMetaTask().getAppData().equals(tm2.getCorrelationId())) {
+ tm2.setAckTime(System.currentTimeMillis());
+ break;
+ }
+ }
+
+ // handleMetaCasTransationAck(trans, taskConsumer);
+ break;
+ case End:
+ // System.out.println("---- Driver handling END Request -
+ // "+imt.getMetaTask().getAppData());
+ List<ThreadMetrics> tmList3 = metrics.get(threadID);
+ for (ThreadMetrics tm3 : tmList3) {
+ if (imt.getMetaTask().getAppData().equals(tm3.getCorrelationId())) {
+ tm3.setEndTime(System.currentTimeMillis());
+ if (imt.getMetaTask().getPerformanceMetrics() != null) {
+ String metrics = imt.getMetaTask().getPerformanceMetrics();
+ int start = metrics.indexOf("<analysisTime>") + "<analysisTime>".length();
+ int end = metrics.indexOf("</analysisTime>");
+ String analysisTime = metrics.substring(start, end);
+ // System.out.println(">>>>>>>>>>>>>>>>> Analysis Time:"+analysisTime);
+ tm3.setAnalysisTime(analysisTime);
+ }
+ break;
+ }
+ }
+ // handleMetaCasTransationEnd(trans, taskConsumer);
+ if (imt.getMetaTask().getUserSpaceException() != null) {
+ System.out.println("Client received error#" + errorCount.incrementAndGet());
+ }
+
+ break;
+ case InvestmentReset:
+ // handleMetaCasTransationInvestmentReset(trans, rwt);
+ break;
+ default:
+ break;
+ }
+ // process service request
+ // taskProtocolHandler.handle(imt);
+
+ // long marshall_stime = System.nanoTime();
+ // setup reply
+
+ imt.setDirection(Direction.Response);
+
+ response.setStatus(HttpServletResponse.SC_OK);
+
+ response.setHeader("content-type", "text/xml");
+ // String body = XStreamUtils.marshall(imt);
+ String body = localXStream.get().get(Thread.currentThread().getId()).toXML(imt);
+ if (block) {
+ synchronized (this) {
+ this.wait(0);
+ }
+
+ }
+
+ // System.out.println("Sending response");
+ response.getWriter().write(body);
+
+ // response.getWriter().write(content);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new ServletException(e);
+ }
+ }
+
+ }
+
+ public long getErrorCount() {
+ return errorCount.get();
+ }
+
+ private IMetaTask getMetaCas(String serializedCas) {
+ if (serializedCas == null) {
+ return null;
+ }
+ return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas);
+ }
+
+ private IMetaTask getMetaMetaCas() {
+ // IMetaMetaCas mmc = new MetaMetaCas();
+
+ String serializedCas = "Bogus";
+
+ IMetaTask metaCas = getMetaCas(serializedCas);
+
+ // mmc.setMetaCas(metaCas);
+ // return mmc;
+ return metaCas;
+ }
+
+ public String getSerializedCAS() {
+ // logger.log(Level.INFO,"getSerializedCAS() Call "+seqno.incrementAndGet()
+ // + " - from
+ // "+taskConsumer.getType()+":"+taskConsumer.getHostName()+"-"+taskConsumer.getPid()+"-"+taskConsumer.getThreadId()
+ // );
+ String serializedCas = null;
+ try {
+ CAS cas = null;
+ cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null);
+ cas.setDocumentLanguage("en");
+
+ // logger.log(Level.INFO,"delivering: " + text);
+ cas.setDocumentText("TEST");
+ // cas.setDocumentText("100 "+seqno.incrementAndGet()+" 1000 0");
+
+ serializedCas = serialize(cas);
+ cas.reset();
+ cas.release();
+
+ } catch (Exception e) {
+ // logger.log(Level.WARNING,"Error",e);
+ }
+
+ return serializedCas;
+ }
+
+ private String serialize(CAS cas) throws Exception {
+ String serializedCas = uimaSerializer.serializeCasToXmi(cas);
+ return serializedCas;
+ }
+
+ private class ThreadMetrics {
+ long getTime;
+
+ long ackTime;
+
+ long endTime;
+
+ long idleTime;
+
+ long lastTime;
+
+ String correlationId;
+
+ String analysisTime;
+
+ public long getLastTime() {
+ return lastTime;
+ }
+
+ public void setLastTime(long lastTime) {
+ this.lastTime = lastTime;
+ }
+
+ public String getAnalysisTime() {
+ return analysisTime;
+ }
+
+ public void setAnalysisTime(String analysisTime) {
+ this.analysisTime = analysisTime;
+ }
+
+ public long getIdleTime() {
+ return idleTime;
+ }
+
+ public void setIdleTime(long idleTime) {
+ this.idleTime = idleTime;
+ }
+
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ public void setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
+ }
+
+ public long getGetTime() {
+ return getTime;
+ }
+
+ public void setGetTime(long getTime) {
+ this.getTime = getTime;
+ }
+
+ public long getAckTime() {
+ return ackTime;
+ }
+
+ public void setAckTime(long ackTime) {
+ this.ackTime = ackTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java Wed May 1 13:42:39 2019
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps.service;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.uima.ducc.ps.Client;
import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder;
@@ -30,269 +31,264 @@ import org.apache.uima.ducc.ps.service.p
import org.junit.Test;
public class JunitPullServiceTestCase extends Client {
- private static final long DELAY=5000;
- CountDownLatch threadsReady;
- CountDownLatch stopLatch;
- {
- // static initializer sets amount of time the service delays
- // sending READY to a monitor
- System.setProperty("ducc.service.init.delay", "3000");
- }
- @Test
- public void testPullService() throws Exception {
- System.out.println("----------------- testPullService -------------------");
- int scaleout = 2;
- super.startJetty(false); // don't block
- String analysisEngineDescriptor = "TestAAE";
- System.setProperty("ducc.deploy.JpType", "uima");
-
- IServiceProcessor processor = new
- UimaServiceProcessor(analysisEngineDescriptor);
-
- String tasURL = "http://localhost:"+super.getPort()+"/test";
-
- IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
- .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
- .withOptionalsDone().build();
-
- try {
- service.initialize();
- Timer fTimer = new Timer("testPullService Timer");
- // after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
-
- service.start();
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- } finally {
- super.stopJetty();
- }
- }
- @Test
- public void testPullServiceQuiesce() throws Exception {
- System.out.println("----------------- testPullServiceQuiesce -------------------");
- int scaleout = 2;
- super.startJetty(false); // don't block
- String analysisEngineDescriptor = "TestAAE";
- System.setProperty("ducc.deploy.JpType", "uima");
- IServiceProcessor processor = new
- UimaServiceProcessor(analysisEngineDescriptor);
-
- String tasURL = "http://localhost:"+super.getPort()+"/test";
-
- IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
- .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
- .withOptionalsDone().build();
-
- try {
- service.initialize();
- Timer fTimer = new Timer("testPullService Timer");
- // after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer, true), DELAY);
-
- service.start();
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- }finally {
- super.stopJetty();
- }
- }
-
- @Test
- public void testPullServiceTimeout() throws Exception {
- System.out.println("----------------- testPullServiceTimeout -------------------");
- super.startJetty(true); // true=client blocks all POST requests
- int scaleout = 12;
- String analysisEngineDescriptor = "TestAAE";
- IServiceProcessor processor = new
- UimaServiceProcessor(analysisEngineDescriptor);
-
- String tasURL ="http://localhost:"+super.getPort()+"/test";
-
- IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
- .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
- .withOptionalsDone().build();
-
- try {
- service.initialize();
- System.out.println("----------- Starting Service .....");
- Timer fTimer = new Timer();
- //after 10sec stop the service
- fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
-
- service.start();
-
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- }finally {
- super.stopJetty();
- }
- }
-
- @Test
- public void testStopOnFirstError() throws Exception {
- System.out.println("----------------- testStopOnFirstError -------------------");
- int scaleout = 10;
- super.startJetty(false); // don't block
- String analysisEngineDescriptor = "NoOpAE";
- System.setProperty("ducc.deploy.JpType", "uima");
-
- IServiceProcessor processor =
- new UimaServiceProcessor(analysisEngineDescriptor);
- // fail on 1st error
- processor.setErrorHandlerWindow(1, 5);
-
- String tasURL = "http://localhost:"+super.getPort()+"/test";
-
- IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
- .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
- .withOptionalsDone().build();
-
- try {
- System.setProperty("ProcessFail","2");
- service.initialize();
-
- service.start();
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- } finally {
- System.getProperties().remove("ProcessFail");
- super.stopJetty();
- }
- }
- @Test
- public void testTerminateOn2ErrorsInWindowOf5() throws Exception {
- System.out.println("----------------- testTerminateOn2ErrorsInWindowOf5 -------------------");
- int scaleout = 10;
- super.startJetty(false); // don't block
- String analysisEngineDescriptor = "NoOpAE";
- System.setProperty("ducc.deploy.JpType", "uima");
-
- IServiceProcessor processor =
- new UimaServiceProcessor(analysisEngineDescriptor);
- // fail on 2nd error in a window of 5
- processor.setErrorHandlerWindow(2, 5);
- String tasURL = "http://localhost:"+super.getPort()+"/test";
-
- IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
- .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
- .withOptionalsDone().build();
-
- try {
- // fail task#1 and task#3 which should stop the test
- System.setProperty("ProcessFail","1,3");
- service.initialize();
-
- service.start();
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- } finally {
- System.getProperties().remove("ProcessFail");
- super.stopJetty();
- }
- }
- @Test
- public void testProcessFailureDefaultErrorHandler() throws Exception {
- System.out.println("----------------- testProcessFailureDefaultErrorHandler -------------------");
- int scaleout = 14;
- super.startJetty(false); // don't block
- String analysisEngineDescriptor = "NoOpAE";
- IServiceProcessor processor = new
- UimaServiceProcessor(analysisEngineDescriptor);
-
- String tasURL = "http://localhost:"+super.getPort()+"/test";
-
- IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
- .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
- .withOptionalsDone().build();
-
- try {
- // fail on 2nd task. This should terminate the test
- System.setProperty("ProcessFail","20");
- service.initialize();
- Timer fTimer = new Timer("testPullService Timer");
- // after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer, false), 20000);
-
- service.start();
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- } finally {
- System.getProperties().remove("ProcessFail");
- super.stopJetty();
- }
- }
-
- /*
- @Test
- public void testPullServiceBadClientURL() throws Exception {
- int scaleout = 2;
- super.startJetty(false); // don't block
- String analysisEngineDescriptor = "TestAAE";
- IServiceProcessor processor = new
- UimaServiceProcessor(analysisEngineDescriptor);
-
- String tasURL ="http://localhost2:8080/test";
-
- IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
- .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
- .withOptionalsDone().build();
-
- try {
- service.initialize();
- service.start();
-
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- }
- }
- */
- class MyTimerTask extends TimerTask {
- final IService service;
- final Timer fTimer;
- final boolean quiesce;
-
- MyTimerTask(IService service, Timer fTimer, boolean quiesce) {
- this.service = service;
- this.fTimer = fTimer;
- this.quiesce = quiesce;
- }
-
- @Override
-
- public void run() {
- this.cancel();
- fTimer.purge();
- fTimer.cancel();
- System.out.println("Timmer popped - stopping service");
- if (quiesce ) {
- service.quiesceAndStop();
- } else {
- service.stop();
- }
- }
+ private static final long DELAY = 20000;
- }
+ CountDownLatch threadsReady;
+
+ CountDownLatch stopLatch;
+ {
+ // static initializer sets amount of time the service delays
+ // sending READY to a monitor
+ System.setProperty("ducc.service.init.delay", "3000");
+ }
+
+ @Test
+ public void testPullService() throws Exception {
+ System.out.println("----------------- testPullService -------------------");
+ int scaleout = 20;
+ super.startJetty(false); // don't block
+ // String analysisEngineDescriptor = "TestAAE";
+ String analysisEngineDescriptor = "NoOpAE";
+ System.setProperty("ducc.deploy.JpType", "uima");
+ // System.setProperty("simulate.no.work", "3"); UNCOMMENT TO SIMULATE NO WORK
+ IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+ IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+ .withOptionalsDone().build();
+ long start = System.currentTimeMillis();
+ try {
+ service.initialize();
+ Timer fTimer = new Timer("testPullService Timer");
+ // after 5secs stop the pull service
+ fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ long end = System.currentTimeMillis();
+ super.stopJetty();
+
+ System.out.println("###################### start-end clock time:" + (end - start) / 1000);
+ }
+ }
+
+ @Test
+ public void testPullServiceQuiesce() throws Exception {
+ System.out.println("----------------- testPullServiceQuiesce -------------------");
+ int scaleout = 12;
+ super.startJetty(false); // don't block
+ String analysisEngineDescriptor = "TestAAE";
+ System.setProperty("ducc.deploy.JpType", "uima");
+ IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+ IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+ .withOptionalsDone().build();
+
+ try {
+ service.initialize();
+ Timer fTimer = new Timer("testPullService Timer");
+ // after 5secs stop the pull service
+ fTimer.schedule(new MyTimerTask(service, fTimer, true), DELAY);
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ super.stopJetty();
+ }
+ }
+
+ @Test
+ public void testPullServiceTimeout() throws Exception {
+ System.out.println("----------------- testPullServiceTimeout -------------------");
+ super.startJetty(true); // true=client blocks all POST requests
+ int scaleout = 12;
+ String analysisEngineDescriptor = "TestAAE";
+ IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+ IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+ .withOptionalsDone().build();
+
+ try {
+ service.initialize();
+ System.out.println("----------- Starting Service .....");
+ Timer fTimer = new Timer();
+ // after 10sec stop the service
+ fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ super.stopJetty();
+ }
+ }
+
+ @Test
+ public void testStopOnFirstError() throws Exception {
+ System.out.println("----------------- testStopOnFirstError -------------------");
+ int scaleout = 10;
+ super.startJetty(false); // don't block
+ String analysisEngineDescriptor = "NoOpAE";
+ System.setProperty("ducc.deploy.JpType", "uima");
+
+ IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+ // fail on 1st error
+ processor.setErrorHandlerWindow(1, 5);
+
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+ IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+ .withOptionalsDone().build();
+
+ try {
+ System.setProperty("ProcessFail", "2");
+ service.initialize();
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ System.getProperties().remove("ProcessFail");
+ super.stopJetty();
+ }
+ }
+
+ @Test
+ public void testTerminateOn2ErrorsInWindowOf5() throws Exception {
+ System.out.println("----------------- testTerminateOn2ErrorsInWindowOf5 -------------------");
+ int scaleout = 10;
+ super.startJetty(false); // don't block
+ String analysisEngineDescriptor = "NoOpAE";
+ System.setProperty("ducc.deploy.JpType", "uima");
+
+ IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+ // fail on 2nd error in a window of 5
+ processor.setErrorHandlerWindow(2, 5);
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+ IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+ .withOptionalsDone().build();
+
+ try {
+ // fail task#1 and task#3 which should stop the test
+ System.setProperty("ProcessFail", "1,3");
+ service.initialize();
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ System.getProperties().remove("ProcessFail");
+ super.stopJetty();
+ }
+ }
+
+ @Test
+ public void testProcessFailureDefaultErrorHandler() throws Exception {
+ System.out
+ .println("----------------- testProcessFailureDefaultErrorHandler -------------------");
+ int scaleout = 14;
+ super.startJetty(false); // don't block
+ String analysisEngineDescriptor = "NoOpAE";
+ IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+
+ IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+ .withOptionalsDone().build();
+
+ try {
+ // fail on 2nd task. This should terminate the test
+ System.setProperty("ProcessFail", "20");
+ service.initialize();
+ Timer fTimer = new Timer("testPullService Timer");
+ // after 5secs stop the pull service
+ fTimer.schedule(new MyTimerTask(service, fTimer, false), 20000);
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ System.getProperties().remove("ProcessFail");
+ super.stopJetty();
+ }
+ }
+
+ /*
+ * @Test public void testPullServiceBadClientURL() throws Exception { int scaleout = 2;
+ * super.startJetty(false); // don't block String analysisEngineDescriptor = "TestAAE";
+ * IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+ *
+ * String tasURL ="http://localhost2:8080/test";
+ *
+ * IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ * .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+ * .withOptionalsDone().build();
+ *
+ * try { service.initialize(); service.start();
+ *
+ *
+ * } catch (ServiceInitializationException e) { throw e; } catch (Exception e) { throw e; } }
+ */
+ class MyTimerTask extends TimerTask {
+ final IService service;
+
+ final Timer fTimer;
+
+ final boolean quiesce;
+
+ MyTimerTask(IService service, Timer fTimer, boolean quiesce) {
+ this.service = service;
+ this.fTimer = fTimer;
+ this.quiesce = quiesce;
+ }
+
+ @Override
+
+ public void run() {
+ this.cancel();
+ fTimer.purge();
+ fTimer.cancel();
+ System.out.println("Timmer popped - stopping service");
+ if (quiesce) {
+ service.quiesceAndStop();
+ } else {
+ service.stop();
+ }
+ }
+
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java Wed May 1 13:42:39 2019
@@ -28,189 +28,194 @@ import org.apache.uima.ducc.ps.service.e
import org.apache.uima.ducc.ps.service.main.ServiceWrapper;
import org.junit.Test;
-public class JUnitServiceWrapperTestCase extends Client {
- private static final long DELAY=5000;
- {
- // static initializer sets amount of time the service delays
- // sending READY to a monitor
- System.setProperty("ducc.service.init.delay", "3000");
- }
-
-
- @Test
- public void testPullServiceWrapperNoTask() throws Exception {
- // make client return null task in response to GET
- System.setProperty("simulate.no.work", "3");
- System.setProperty("ducc.process.thread.sleep.time", "1000");
- try {
- testPullServiceWrapper();
- } finally {
- System.getProperties().remove("simulate.no.work");
- }
- }
- @Test
- public void testPullServiceWrapper() throws Exception {
- System.out.println("-------------------------- testPullServiceWrapper ----------------------");;
-
- //int scaleout = 2;
- StateMonitor monitor = new StateMonitor();
- monitor.start();
- System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
- super.startJetty(false); // don't block
- String analysisEngineDescriptor = "TestAAE";
- System.setProperty("ducc.deploy.JpType", "uima");
-
- String tasURL = "http://localhost:"+super.getPort()+"/test";
- try {
- System.setProperty("ducc.deploy.JdURL", tasURL);
- System.setProperty("ducc.deploy.JpThreadCount","4");
- System.setProperty("ducc.deploy.service.type", "NotesService");
- System.setProperty("ducc.deploy.JpType", "uima");
-
- ServiceWrapper service = new ServiceWrapper();
-
- Timer fTimer = new Timer("testPullService Timer");
- // after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer), 40000);
-
- service.initialize(new String[] {analysisEngineDescriptor});
-
- service.start();
-
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- } finally {
- monitor.stop();
- super.stopJetty();
-
- }
- }
-
- @Test
- public void testPullServiceWrapperWithProcessFailure() throws Exception {
- System.out.println("-------------------------- testPullServiceWrapperWithProcessFailure ----------------------");;
- //int scaleout = 2;
- StateMonitor monitor = new StateMonitor();
- monitor.start();
- System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
- super.startJetty(false); // don't block
- String analysisEngineDescriptor = "NoOpAE";
-
- String tasURL = "http://localhost:"+super.getPort()+"/test";
- try {
- // Force process failure of the first task
- System.setProperty("ProcessFail","1");
-
- System.setProperty("ducc.deploy.JdURL", tasURL);
- System.setProperty("ducc.deploy.JpThreadCount","4");
- System.setProperty("ducc.deploy.service.type", "NotesService");
- System.setProperty("ducc.deploy.JpType", "uima");
- // use default error window (1,1)
- ServiceWrapper service = new ServiceWrapper();
-
- Timer fTimer = new Timer("testPullService Timer");
- // after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer), 10000);
-
- service.initialize(new String[] {analysisEngineDescriptor});
-
- service.start();
-
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- } finally {
- monitor.stop();
- System.getProperties().remove("ProcessFail");
- super.stopJetty();
- }
- }
- @Test
- public void testPullServiceWrapperDDGenerator() throws Exception {
- System.out.println("-------------------------- testPullServiceWrapperDDGenerator ----------------------");;
-
- //int scaleout = 2;
- StateMonitor monitor = new StateMonitor();
- monitor.start();
- System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
- super.startJetty(false); // don't block
- // Dont change the name of TestAAE.xml. This is setup to fail file lookup and force
- // generation of AE descriptor.
- String analysisEngineDescriptor = "TestAAE.xml";
- System.setProperty("ducc.deploy.JpType", "uima");
-
- String tasURL = "http://localhost:"+super.getPort()+"/test";
- try {
-
- System.setProperty("ducc.deploy.JdURL", tasURL);
- System.setProperty("ducc.deploy.JpThreadCount","4");
- System.setProperty("ducc.deploy.service.type", "NotesService");
- System.setProperty("ducc.deploy.JpType", "uima");
- System.setProperty("ducc.deploy.JpAeDescriptor","NoOpAE");
- System.setProperty("ducc.deploy.JobDirectory",System.getProperty("user.dir"));
- System.setProperty("ducc.deploy.JpFlowController","org.apache.uima.flow.FixedFlowController");
- System.setProperty("ducc.process.log.dir",System.getProperty("user.dir"));
- System.setProperty("ducc.job.id","2000");
- ServiceWrapper service = new ServiceWrapper();
-
- Timer fTimer = new Timer("testPullService Timer");
- // after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer), 20000);
-
- service.initialize(new String[] {analysisEngineDescriptor});
-
- service.start();
-
-
- } catch (ServiceInitializationException e) {
- throw e;
- } catch (Exception e) {
- throw e;
- } finally {
- monitor.stop();
- super.stopJetty();
- File directory = new File(System.getProperty("user.dir").
- concat("/").concat(System.getProperty("ducc.job.id")));
-
- if ( directory.exists() ) {
- for (File f : directory.listFiles()) {
- if (f.getName().startsWith("uima-ae-")) {
- f.delete();
- System.out.println("Removed generated descriptor:"+f.getAbsolutePath());
- }
- }
- directory.delete();
-
- }
-
-
- }
- }
- class MyTimerTask extends TimerTask {
- final ServiceWrapper service;
- final Timer fTimer;
-
- MyTimerTask(ServiceWrapper service, Timer fTimer) {
- this.service = service;
- this.fTimer = fTimer;
- }
-
- @Override
-
- public void run() {
- this.cancel();
- fTimer.purge();
- fTimer.cancel();
- System.out.println("Timmer popped - stopping service");
- service.stop();
+public class JUnitServiceWrapperTestCase extends Client {
+ private static final long DELAY = 5000;
+ {
+ // static initializer sets amount of time the service delays
+ // sending READY to a monitor
+ System.setProperty("ducc.service.init.delay", "3000");
+ }
+
+ @Test
+ public void testPullServiceWrapperNoTask() throws Exception {
+ // make client return null task in response to GET
+ System.setProperty("simulate.no.work", "3");
+ System.setProperty("ducc.process.thread.sleep.time", "1000");
+ try {
+ testPullServiceWrapper();
+ } finally {
+ System.getProperties().remove("simulate.no.work");
+ }
+ }
+
+ @Test
+ public void testPullServiceWrapper() throws Exception {
+ System.out.println("-------------------------- testPullServiceWrapper ----------------------");
+ ;
+
+ // int scaleout = 2;
+ StateMonitor monitor = new StateMonitor();
+ monitor.start();
+ System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT"));
+ super.startJetty(false); // don't block
+ String analysisEngineDescriptor = "TestAAE";
+ System.setProperty("ducc.deploy.JpType", "uima");
+
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+ try {
+ System.setProperty("ducc.deploy.JdURL", tasURL);
+ System.setProperty("ducc.deploy.JpThreadCount", "12");
+ System.setProperty("ducc.deploy.service.type", "NotesService");
+ System.setProperty("ducc.deploy.JpType", "uima");
+
+ ServiceWrapper service = new ServiceWrapper();
+
+ Timer fTimer = new Timer("testPullService Timer");
+ // after 5secs stop the pull service
+ fTimer.schedule(new MyTimerTask(service, fTimer), 40000);
+
+ service.initialize(new String[] { analysisEngineDescriptor });
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ monitor.stop();
+ super.stopJetty();
+
+ }
+ }
+
+ @Test
+ public void testPullServiceWrapperWithProcessFailure() throws Exception {
+ System.out.println(
+ "-------------------------- testPullServiceWrapperWithProcessFailure ----------------------");
+ ;
+ // int scaleout = 2;
+ StateMonitor monitor = new StateMonitor();
+ monitor.start();
+ System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT"));
+ super.startJetty(false); // don't block
+ String analysisEngineDescriptor = "NoOpAE";
+
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+ try {
+ // Force process failure of the first task
+ System.setProperty("ProcessFail", "1");
+
+ System.setProperty("ducc.deploy.JdURL", tasURL);
+ System.setProperty("ducc.deploy.JpThreadCount", "4");
+ System.setProperty("ducc.deploy.service.type", "NotesService");
+ System.setProperty("ducc.deploy.JpType", "uima");
+ // use default error window (1,1)
+ ServiceWrapper service = new ServiceWrapper();
+
+ Timer fTimer = new Timer("testPullService Timer");
+ // after 5secs stop the pull service
+ fTimer.schedule(new MyTimerTask(service, fTimer), 10000);
+
+ service.initialize(new String[] { analysisEngineDescriptor });
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ monitor.stop();
+ System.getProperties().remove("ProcessFail");
+ super.stopJetty();
+ }
+ }
+
+ @Test
+ public void testPullServiceWrapperDDGenerator() throws Exception {
+ System.out.println(
+ "-------------------------- testPullServiceWrapperDDGenerator ----------------------");
+ ;
+
+ // int scaleout = 2;
+ StateMonitor monitor = new StateMonitor();
+ monitor.start();
+ System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT"));
+ super.startJetty(false); // don't block
+ // Dont change the name of TestAAE.xml. This is setup to fail file lookup and force
+ // generation of AE descriptor.
+ String analysisEngineDescriptor = "TestAAE.xml";
+ System.setProperty("ducc.deploy.JpType", "uima");
+
+ String tasURL = "http://localhost:" + super.getPort() + "/test";
+ try {
+
+ System.setProperty("ducc.deploy.JdURL", tasURL);
+ System.setProperty("ducc.deploy.JpThreadCount", "4");
+ System.setProperty("ducc.deploy.service.type", "NotesService");
+ System.setProperty("ducc.deploy.JpType", "uima");
+ System.setProperty("ducc.deploy.JpAeDescriptor", "NoOpAE");
+ System.setProperty("ducc.deploy.JobDirectory", System.getProperty("user.dir"));
+ System.setProperty("ducc.deploy.JpFlowController",
+ "org.apache.uima.flow.FixedFlowController");
+ System.setProperty("ducc.process.log.dir", System.getProperty("user.dir"));
+ System.setProperty("ducc.job.id", "2000");
+ ServiceWrapper service = new ServiceWrapper();
+
+ Timer fTimer = new Timer("testPullService Timer");
+ // after 5secs stop the pull service
+ fTimer.schedule(new MyTimerTask(service, fTimer), 20000);
+
+ service.initialize(new String[] { analysisEngineDescriptor });
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ monitor.stop();
+ super.stopJetty();
+ File directory = new File(
+ System.getProperty("user.dir").concat("/").concat(System.getProperty("ducc.job.id")));
+
+ if (directory.exists()) {
+ for (File f : directory.listFiles()) {
+ if (f.getName().startsWith("uima-ae-")) {
+ f.delete();
+ System.out.println("Removed generated descriptor:" + f.getAbsolutePath());
+ }
+ }
+ directory.delete();
+
+ }
+
+ }
+ }
+
+ class MyTimerTask extends TimerTask {
+ final ServiceWrapper service;
+
+ final Timer fTimer;
+
+ MyTimerTask(ServiceWrapper service, Timer fTimer) {
+ this.service = service;
+ this.fTimer = fTimer;
+ }
+
+ @Override
+
+ public void run() {
+ this.cancel();
+ fTimer.purge();
+ fTimer.cancel();
+ System.out.println("Timmer popped - stopping service");
+ service.stop();
- }
+ }
- }
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java Wed May 1 13:42:39 2019
@@ -24,6 +24,7 @@ import java.lang.management.ManagementFa
import java.lang.management.RuntimeMXBean;
import java.util.Map;
import java.util.Objects;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.uima.UIMAFramework;
@@ -126,7 +127,24 @@ public class NoOpAE extends CasAnnotator
}
}
}
+ try {
+ //int n = getRandomNumberInRange(2000,3000);
+ int n = getRandomNumberInRange(250,450);
+ //System.out.println(" AE Sleeping for "+n + " millis");
+ Thread.sleep(n);
+
+ } catch( InterruptedException e) {
+
+ }
}
+ private static int getRandomNumberInRange(int min, int max) {
+ if (min >= max) {
+ throw new IllegalArgumentException("max must be greater than min");
+ }
+
+ Random r = new Random();
+ return r.nextInt((max - min) + 1) + min;
+ }
}