You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/05/01 13:42:39 UTC
svn commit: r1858488 [3/3] - in
/uima/uima-ducc/trunk/uima-ducc-pullservice/src:
main/java/org/apache/uima/ducc/ps/sd/task/
main/java/org/apache/uima/ducc/ps/sd/task/transport/
main/java/org/apache/uima/ducc/ps/service/protocol/builtin/
main/java/org/a...
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java Wed May 1 13:42:39 2019
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps.transpor
import java.io.BufferedReader;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.HashMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -46,189 +47,219 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
public class JunitTransportTestCase {
- private Server server;
- private final static String app="test";
- private int httpPort = 12222;
- private int maxThreads = 20;
- private int getJettyPort() {
- while(true) {
- ServerSocket socket=null;
- try {
- socket = new ServerSocket(httpPort);
- break;
- } catch( IOException e) {
- httpPort++;
- } finally {
- if ( socket != null ) {
- try {
- socket.close();
- } catch( Exception ee) {}
-
- }
- }
- }
- return httpPort;
- }
- private int getPort() {
-
- return httpPort;
- }
- @Before
- public void startJetty() throws Exception
- {
-
-
- QueuedThreadPool threadPool = new QueuedThreadPool();
- if (maxThreads < threadPool.getMinThreads()) {
- // logger.warn("JobDriver", jobid,
- // "Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to "+threadPool.getMinThreads()+". Defaulting to jettyMaxThreads="+threadPool.getMaxThreads());
- threadPool.setMaxThreads(threadPool.getMinThreads());
- } else {
- threadPool.setMaxThreads(maxThreads);
- }
-
- server = new Server(threadPool);
-
- // Server connector
- ServerConnector connector = new ServerConnector(server);
- connector.setPort(getJettyPort());
- server.setConnectors(new Connector[] { connector });
- System.out.println("launching Jetty on Port:"+connector.getPort());
- ServletContextHandler context = new ServletContextHandler(
- ServletContextHandler.SESSIONS);
- context.setContextPath("/");
- server.setHandler(context);
-
- context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/"+app);
-
-
- server.start();
- System.out.println("Jetty Started - Waiting for Messages ...");
- }
-
- @After
- public void stopJetty()
- {
- try
- {
- server.stop();
+ private Server server;
+
+ private final static String app = "test";
+
+ private int httpPort = 12222;
+
+ private int maxThreads = 20;
+
+ private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long, XStream>>() {
+ @Override
+ protected HashMap<Long, XStream> initialValue() {
+ return new HashMap<Long, XStream>();
+ }
+ };
+
+ private int getJettyPort() {
+ while (true) {
+ ServerSocket socket = null;
+ try {
+ socket = new ServerSocket(httpPort);
+ break;
+ } catch (IOException e) {
+ httpPort++;
+ } finally {
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (Exception ee) {
+ }
+
}
- catch (Exception e)
- {
- e.printStackTrace();
+ }
+ }
+ return httpPort;
+ }
+
+ private int getPort() {
+
+ return httpPort;
+ }
+
+ @Before
+ public void startJetty() throws Exception {
+
+ QueuedThreadPool threadPool = new QueuedThreadPool();
+ if (maxThreads < threadPool.getMinThreads()) {
+ // logger.warn("JobDriver", jobid,
+ // "Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to
+ // "+threadPool.getMinThreads()+". Defaulting to
+ // jettyMaxThreads="+threadPool.getMaxThreads());
+ threadPool.setMaxThreads(threadPool.getMinThreads());
+ } else {
+ threadPool.setMaxThreads(maxThreads);
+ }
+
+ server = new Server(threadPool);
+
+ // Server connector
+ ServerConnector connector = new ServerConnector(server);
+ connector.setPort(getJettyPort());
+ server.setConnectors(new Connector[] { connector });
+ System.out.println("launching Jetty on Port:" + connector.getPort());
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+ server.setHandler(context);
+
+ context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/" + app);
+
+ server.start();
+ System.out.println("Jetty Started - Waiting for Messages ...");
+ }
+
+ @After
+ public void stopJetty() {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ System.out.println("Jetty Stopped");
+ }
+
+ private void wait(DefaultRegistryClient registryClient) {
+ synchronized (registryClient) {
+ try {
+ registryClient.wait(5 * 1000);
+
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @Test
+ public void testTransportBasicConnectivity() throws Exception {
+ int scaleout = 12;
+ ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app);
+ DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+ HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+ transport.initialize();
+ // String response = transport.getWork("Test");
+ // System.out.println("Test Received Response:"+response);
+
+ // assertThat("Response Code", http.getResponseCode(), (equal((HttpStatus.OK_200)));
+ }
+
+ @Test
+ public void testTransportIOException() throws Exception {
+ localXStream.get().clear();
+ if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+ localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver()));
+ }
+
+ System.out.println(".... Test::testTransportIOException");
+ int scaleout = 12;
+ ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app);
+ DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+ HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+ transport.initialize();
+ System.setProperty("MockHttpPostError", ERROR.IOException.name());
+ IMetaTaskTransaction transaction = new MetaTaskTransaction();
+ // transport.dispatch(XStreamUtils.marshall(transaction),localXStream);
+ transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction),
+ localXStream);
+
+ wait(registryClient);
+ }
+
+ @Test
+ public void testTransportNoRoutToHostException() throws Exception {
+ localXStream.get().clear();
+ if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+ localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver()));
+ }
+
+ System.out.println(".... Test::testTransportNoRoutToHostException");
+ int scaleout = 12;
+ ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app);
+ DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+ HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+ transport.initialize();
+ System.setProperty("MockHttpPostError", ERROR.NoRouteToHostException.name());
+ IMetaTaskTransaction transaction = new MetaTaskTransaction();
+ // transport.dispatch(XStreamUtils.marshall(transaction),localXStream);
+ transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction),
+ localXStream);
+ wait(registryClient);
+
+ }
+
+ @Test
+ public void testTransportURISyntaxException() throws Exception {
+ System.out.println(".... Test::testTransportURISyntaxException");
+ localXStream.get().clear();
+ if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+ localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver()));
+ }
+
+ int scaleout = 12;
+ ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app);
+ DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+ HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+ transport.initialize();
+ System.setProperty("MockHttpPostError", ERROR.URISyntaxException.name());
+ IMetaTaskTransaction transaction = new MetaTaskTransaction();
+ // transport.dispatch(XStreamUtils.marshall(transaction), localXStream);
+ transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction),
+ localXStream);
+ wait(registryClient);
+
+ }
+
+ public class TaskHandlerServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ public TaskHandlerServlet() {
+ }
+
+ protected void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ try {
+ System.out.println("Handling HTTP Post Request");
+ // long post_stime = System.nanoTime();
+ StringBuilder sb = new StringBuilder();
+ BufferedReader reader = request.getReader();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line);
}
- System.out.println("Jetty Stopped");
+ String content = sb.toString().trim();
+
+ // System.out.println( "Http Request Body:::"+content);
+
+ String nodeIP = request.getHeader("IP");
+ String nodeName = request.getHeader("Hostname");
+ String threadID = request.getHeader("ThreadID");
+ String pid = request.getHeader("PID");
+ System.out.println("Sender ID:::Node IP" + nodeIP + " Node Name:" + nodeName + " PID:" + pid
+ + " ThreadID:" + threadID);
+ IMetaTaskTransaction transaction = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
+ transaction.setMetaTask(new MetaTask(1, "", null));
+ transaction.getMetaTask().setUserSpaceTask("Test Message");
+ String serializedResponse = XStreamUtils.marshall(transaction);
+ // System.out.println(serializedResponse);
+ response.getWriter().write(serializedResponse);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new ServletException(e);
+ }
}
- private void wait(DefaultRegistryClient registryClient) {
- synchronized(registryClient) {
- try {
- registryClient.wait(5*1000);
-
- } catch( InterruptedException e) {}
- }
- }
- @Test
- public void testTransportBasicConnectivity() throws Exception
- {
- int scaleout = 12;
- ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app);
- DefaultRegistryClient registryClient =
- new DefaultRegistryClient(targetUrl);
- HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
- transport.initialize();
- //String response = transport.getWork("Test");
- //System.out.println("Test Received Response:"+response);
-
-// assertThat("Response Code", http.getResponseCode(), (equal((HttpStatus.OK_200)));
- }
- @Test
- public void testTransportIOException() throws Exception
- {
- System.out.println(".... Test::testTransportIOException");
- int scaleout = 12;
- ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app);
- DefaultRegistryClient registryClient =
- new DefaultRegistryClient(targetUrl);
- HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
- transport.initialize();
- System.setProperty("MockHttpPostError", ERROR.IOException.name());
- IMetaTaskTransaction transaction = new MetaTaskTransaction();
- transport.dispatch(XStreamUtils.marshall(transaction));
-
- wait(registryClient);
- }
- @Test
- public void testTransportNoRoutToHostException() throws Exception
- {
- System.out.println(".... Test::testTransportNoRoutToHostException");
- int scaleout = 12;
- ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app);
- DefaultRegistryClient registryClient =
- new DefaultRegistryClient(targetUrl);
- HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
- transport.initialize();
- System.setProperty("MockHttpPostError", ERROR.NoRouteToHostException.name());
- IMetaTaskTransaction transaction = new MetaTaskTransaction();
- transport.dispatch(XStreamUtils.marshall(transaction));
- wait(registryClient);
-
- }
- @Test
- public void testTransportURISyntaxException() throws Exception
- {
- System.out.println(".... Test::testTransportURISyntaxException");
- int scaleout = 12;
- ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app);
- DefaultRegistryClient registryClient =
- new DefaultRegistryClient(targetUrl);
- HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
- transport.initialize();
- System.setProperty("MockHttpPostError", ERROR.URISyntaxException.name());
- IMetaTaskTransaction transaction = new MetaTaskTransaction();
- transport.dispatch(XStreamUtils.marshall(transaction));
- wait(registryClient);
-
- }
- public class TaskHandlerServlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
-
- public TaskHandlerServlet() {
- }
-
- protected void doPost(HttpServletRequest request,
- HttpServletResponse response) throws ServletException,
- IOException {
- try {
- System.out.println("Handling HTTP Post Request");
- //long post_stime = System.nanoTime();
- StringBuilder sb = new StringBuilder();
- BufferedReader reader = request.getReader();
- String line;
- while ((line = reader.readLine()) != null) {
- sb.append(line);
- }
- String content = sb.toString().trim();
-
- //System.out.println( "Http Request Body:::"+content);
-
-
- String nodeIP = request.getHeader("IP");
- String nodeName = request.getHeader("Hostname");
- String threadID = request.getHeader("ThreadID");
- String pid = request.getHeader("PID");
- System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+" ThreadID:"+threadID);
- IMetaTaskTransaction transaction = (IMetaTaskTransaction)XStreamUtils.unmarshall(content);
- transaction.setMetaTask(new MetaTask(1,"",null));
- transaction.getMetaTask().setUserSpaceTask("Test Message");
- String serializedResponse = XStreamUtils.marshall(transaction);
- //System.out.println(serializedResponse);
- response.getWriter().write(serializedResponse);
- } catch (Throwable e) {
- e.printStackTrace();
- throw new ServletException(e);
- }
- }
- }
+ }
}