You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/05/01 13:42:39 UTC

svn commit: r1858488 [3/3] - in /uima/uima-ducc/trunk/uima-ducc-pullservice/src: main/java/org/apache/uima/ducc/ps/sd/task/ main/java/org/apache/uima/ducc/ps/sd/task/transport/ main/java/org/apache/uima/ducc/ps/service/protocol/builtin/ main/java/org/a...

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java Wed May  1 13:42:39 2019
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps.transpor
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.HashMap;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -46,189 +47,219 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
 public class JunitTransportTestCase {
-	private Server server;
-	private final static String app="test";
-	private int httpPort = 12222;
-	private int maxThreads = 20;
-	private int getJettyPort() {
-		while(true) {
-			ServerSocket socket=null;
-			try {
-				socket = new ServerSocket(httpPort);
-				break;
-			} catch( IOException e) {
-				httpPort++;
-			} finally {
-				if ( socket != null ) {
-					try {
-						socket.close();
-					} catch( Exception ee) {}
-					
-				}
-			}
-		}
-		return httpPort;
-	}
-	private int getPort() {
-
-		return httpPort;
-	}
-    @Before
-    public void startJetty() throws Exception
-    {
-
-        
-		QueuedThreadPool threadPool = new QueuedThreadPool();
-		if (maxThreads < threadPool.getMinThreads()) {
-			// logger.warn("JobDriver", jobid,
-			// "Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to "+threadPool.getMinThreads()+". Defaulting to jettyMaxThreads="+threadPool.getMaxThreads());
-			threadPool.setMaxThreads(threadPool.getMinThreads());
-		} else {
-			threadPool.setMaxThreads(maxThreads);
-		}
-
-	    server = new Server(threadPool);
-
-		// Server connector
-		ServerConnector connector = new ServerConnector(server);
-		connector.setPort(getJettyPort());
-		server.setConnectors(new Connector[] { connector });
-        System.out.println("launching Jetty on Port:"+connector.getPort());
-		ServletContextHandler context = new ServletContextHandler(
-				ServletContextHandler.SESSIONS);
-		context.setContextPath("/");
-		server.setHandler(context);
-
-		context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/"+app);
-        
-        
-		server.start();
-        System.out.println("Jetty Started - Waiting for Messages ...");
-    }
-
-    @After
-    public void stopJetty()
-    {
-        try
-        {
-            server.stop();
+  private Server server;
+
+  private final static String app = "test";
+
+  private int httpPort = 12222;
+
+  private int maxThreads = 20;
+
+  private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long, XStream>>() {
+    @Override
+    protected HashMap<Long, XStream> initialValue() {
+      return new HashMap<Long, XStream>();
+    }
+  };
+
+  private int getJettyPort() {
+    while (true) {
+      ServerSocket socket = null;
+      try {
+        socket = new ServerSocket(httpPort);
+        break;
+      } catch (IOException e) {
+        httpPort++;
+      } finally {
+        if (socket != null) {
+          try {
+            socket.close();
+          } catch (Exception ee) {
+          }
+
         }
-        catch (Exception e)
-        {
-            e.printStackTrace();
+      }
+    }
+    return httpPort;
+  }
+
+  private int getPort() {
+
+    return httpPort;
+  }
+
+  @Before
+  public void startJetty() throws Exception {
+
+    QueuedThreadPool threadPool = new QueuedThreadPool();
+    if (maxThreads < threadPool.getMinThreads()) {
+      // logger.warn("JobDriver", jobid,
+      // "Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to
+      // "+threadPool.getMinThreads()+". Defaulting to
+      // jettyMaxThreads="+threadPool.getMaxThreads());
+      threadPool.setMaxThreads(threadPool.getMinThreads());
+    } else {
+      threadPool.setMaxThreads(maxThreads);
+    }
+
+    server = new Server(threadPool);
+
+    // Server connector
+    ServerConnector connector = new ServerConnector(server);
+    connector.setPort(getJettyPort());
+    server.setConnectors(new Connector[] { connector });
+    System.out.println("launching Jetty on Port:" + connector.getPort());
+    ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+    context.setContextPath("/");
+    server.setHandler(context);
+
+    context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/" + app);
+
+    server.start();
+    System.out.println("Jetty Started - Waiting for Messages ...");
+  }
+
+  @After
+  public void stopJetty() {
+    try {
+      server.stop();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    System.out.println("Jetty Stopped");
+  }
+
+  private void wait(DefaultRegistryClient registryClient) {
+    synchronized (registryClient) {
+      try {
+        registryClient.wait(5 * 1000);
+
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  @Test
+  public void testTransportBasicConnectivity() throws Exception {
+    int scaleout = 12;
+    ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app);
+    DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+    HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+    transport.initialize();
+    // String response = transport.getWork("Test");
+    // System.out.println("Test Received Response:"+response);
+
+    // assertThat("Response Code", http.getResponseCode(), (equal((HttpStatus.OK_200)));
+  }
+
+  @Test
+  public void testTransportIOException() throws Exception {
+    localXStream.get().clear();
+    if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+      localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver()));
+    }
+
+    System.out.println(".... Test::testTransportIOException");
+    int scaleout = 12;
+    ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app);
+    DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+    HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+    transport.initialize();
+    System.setProperty("MockHttpPostError", ERROR.IOException.name());
+    IMetaTaskTransaction transaction = new MetaTaskTransaction();
+    // transport.dispatch(XStreamUtils.marshall(transaction),localXStream);
+    transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction),
+            localXStream);
+
+    wait(registryClient);
+  }
+
+  @Test
+  public void testTransportNoRoutToHostException() throws Exception {
+    localXStream.get().clear();
+    if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+      localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver()));
+    }
+
+    System.out.println(".... Test::testTransportNoRoutToHostException");
+    int scaleout = 12;
+    ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app);
+    DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+    HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+    transport.initialize();
+    System.setProperty("MockHttpPostError", ERROR.NoRouteToHostException.name());
+    IMetaTaskTransaction transaction = new MetaTaskTransaction();
+    // transport.dispatch(XStreamUtils.marshall(transaction),localXStream);
+    transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction),
+            localXStream);
+    wait(registryClient);
+
+  }
+
+  @Test
+  public void testTransportURISyntaxException() throws Exception {
+    System.out.println(".... Test::testTransportURISyntaxException");
+    localXStream.get().clear();
+    if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+      localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver()));
+    }
+
+    int scaleout = 12;
+    ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app);
+    DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+    HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+    transport.initialize();
+    System.setProperty("MockHttpPostError", ERROR.URISyntaxException.name());
+    IMetaTaskTransaction transaction = new MetaTaskTransaction();
+    // transport.dispatch(XStreamUtils.marshall(transaction), localXStream);
+    transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction),
+            localXStream);
+    wait(registryClient);
+
+  }
+
+  public class TaskHandlerServlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    public TaskHandlerServlet() {
+    }
+
+    protected void doPost(HttpServletRequest request, HttpServletResponse response)
+            throws ServletException, IOException {
+      try {
+        System.out.println("Handling HTTP Post Request");
+        // long post_stime = System.nanoTime();
+        StringBuilder sb = new StringBuilder();
+        BufferedReader reader = request.getReader();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          sb.append(line);
         }
-        System.out.println("Jetty Stopped");
+        String content = sb.toString().trim();
+
+        // System.out.println( "Http Request Body:::"+content);
+
+        String nodeIP = request.getHeader("IP");
+        String nodeName = request.getHeader("Hostname");
+        String threadID = request.getHeader("ThreadID");
+        String pid = request.getHeader("PID");
+        System.out.println("Sender ID:::Node IP" + nodeIP + " Node Name:" + nodeName + " PID:" + pid
+                + " ThreadID:" + threadID);
+        IMetaTaskTransaction transaction = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
+        transaction.setMetaTask(new MetaTask(1, "", null));
+        transaction.getMetaTask().setUserSpaceTask("Test Message");
+        String serializedResponse = XStreamUtils.marshall(transaction);
+        // System.out.println(serializedResponse);
+        response.getWriter().write(serializedResponse);
+      } catch (Throwable e) {
+        e.printStackTrace();
+        throw new ServletException(e);
+      }
     }
-    private void wait(DefaultRegistryClient registryClient) {
-    	synchronized(registryClient) {
-    		try {
-        		registryClient.wait(5*1000); 
-    			
-    		} catch( InterruptedException e) {}
-    	}
-    }
-    @Test
-    public void testTransportBasicConnectivity() throws Exception
-    { 
-    	int scaleout = 12;
-    	ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app);
-    	DefaultRegistryClient registryClient =
-    			new DefaultRegistryClient(targetUrl);
-    	HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
-    	transport.initialize();
-    	//String response = transport.getWork("Test");
-    	//System.out.println("Test Received Response:"+response);
-
-//        assertThat("Response Code", http.getResponseCode(), (equal((HttpStatus.OK_200)));
-    }
-    @Test
-    public void testTransportIOException() throws Exception
-    { 
-    	System.out.println(".... Test::testTransportIOException");
-    	int scaleout = 12;
-    	ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app);
-    	DefaultRegistryClient registryClient =
-    			new DefaultRegistryClient(targetUrl);
-   	    HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
-    	transport.initialize();
-    	System.setProperty("MockHttpPostError", ERROR.IOException.name());
-    	IMetaTaskTransaction transaction = new MetaTaskTransaction();
-    	transport.dispatch(XStreamUtils.marshall(transaction));
- 
-    	wait(registryClient);
-    }
-    @Test
-    public void testTransportNoRoutToHostException() throws Exception
-    { 
-    	System.out.println(".... Test::testTransportNoRoutToHostException");
-    	int scaleout = 12;
-    	ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app);
-    	DefaultRegistryClient registryClient =
-    			new DefaultRegistryClient(targetUrl);
-    	HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
-    	transport.initialize();
-    	System.setProperty("MockHttpPostError", ERROR.NoRouteToHostException.name());
-    	IMetaTaskTransaction transaction = new MetaTaskTransaction();
-    	transport.dispatch(XStreamUtils.marshall(transaction));
-    	wait(registryClient);
-
-    }
-    @Test
-    public void testTransportURISyntaxException() throws Exception
-    { 
-    	System.out.println(".... Test::testTransportURISyntaxException");
-    	int scaleout = 12;
-    	ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app);
-    	DefaultRegistryClient registryClient =
-    			new DefaultRegistryClient(targetUrl);
-    	HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
-    	transport.initialize();
-    	System.setProperty("MockHttpPostError", ERROR.URISyntaxException.name());
-    	IMetaTaskTransaction transaction = new MetaTaskTransaction();
-    	transport.dispatch(XStreamUtils.marshall(transaction));
-    	wait(registryClient);
-
-    }
-	public class TaskHandlerServlet extends HttpServlet {
-		private static final long serialVersionUID = 1L;
-
-		public TaskHandlerServlet() {
-		}
-
-		protected void doPost(HttpServletRequest request,
-				HttpServletResponse response) throws ServletException,
-				IOException {
-			try {
-				System.out.println("Handling HTTP Post Request");
-				//long post_stime = System.nanoTime();
-				StringBuilder sb = new StringBuilder();
-				BufferedReader reader = request.getReader();
-				String line;
-				while ((line = reader.readLine()) != null) {
-					sb.append(line);
-				}
-				String content = sb.toString().trim();
-
-				//System.out.println( "Http Request Body:::"+content);
-				
-				
-	    		 String nodeIP = request.getHeader("IP");
-	             String nodeName = request.getHeader("Hostname");
-	             String threadID = request.getHeader("ThreadID");
-	             String pid = request.getHeader("PID");
-				System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+" ThreadID:"+threadID);
-				IMetaTaskTransaction transaction = (IMetaTaskTransaction)XStreamUtils.unmarshall(content);
-				transaction.setMetaTask(new MetaTask(1,"",null));
-				transaction.getMetaTask().setUserSpaceTask("Test Message");
-				String serializedResponse = XStreamUtils.marshall(transaction);
-				//System.out.println(serializedResponse);
-				response.getWriter().write(serializedResponse);
-			} catch (Throwable e) {
-				e.printStackTrace();
-				throw new ServletException(e);
-			}
-		}
 
-	}
+  }
 }