You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/03/16 19:14:33 UTC

svn commit: r1735276 - /uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java

Author: cwiklik
Date: Wed Mar 16 18:14:32 2016
New Revision: 1735276

URL: http://svn.apache.org/viewvc?rev=1735276&view=rev
Log:
UIMA-4830 added new testcase to test many uima-as clients

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1735276&r1=1735275&r2=1735276&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Wed Mar 16 18:14:32 2016
@@ -37,6 +37,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -134,6 +135,9 @@ public class TestUimaASExtended extends
 	  }
 */
   
+
+
+ 
   
   /**
    * Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to emulate usage from
@@ -601,98 +605,6 @@ public class TestUimaASExtended extends
 	}
   
 
-  public void testMultipleASClients() throws Exception  {
-	    System.out.println("-------------- testMultipleSyncClientsWithMultipleBrokers -------------");
-	    
-	    class RunnableClient implements Runnable {
-	    	String brokerURL;
-	    	BaseTestSupport testSupport;
-            BaseUIMAAsynchronousEngine_impl uimaAsEngine;
-            String serviceEndpoint;
-
-            
-            RunnableClient(BaseTestSupport testSupport, String brokerURL,String serviceEndpoint) {
-	    		this.brokerURL = brokerURL;
-	    		this.testSupport = testSupport;
-	    		this.serviceEndpoint = serviceEndpoint;
-	    		uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
-	    	}
-	    	public BaseUIMAAsynchronousEngine_impl getUimaAsClient() {
-	    		return uimaAsEngine;
-	    	}
-	    	public void initialize() throws Exception {
-	    		@SuppressWarnings("unchecked")
-			  Map<String, Object> appCtx = buildContext(brokerURL, serviceEndpoint);
-		  	  appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
-		  	  appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
-		  	  testSupport.initialize(getUimaAsClient(), appCtx);
-		  	  waitUntilInitialized();
-		  	  
-	    	}
-			public void run() {
-				try {
-					initialize();
-					System.out.println("Thread:"+Thread.currentThread().getId()+" Completed GetMeta() broker:"+brokerURL);
-				} catch( Exception e) {
-					e.printStackTrace();
-				} finally {
-					try {
-				        uimaAsEngine.stop();
-					} catch( Exception e) {
-						e.printStackTrace();
-					}
-				}
-
-			}
-	    	
-	    }
-	    
-	    ExecutorService executor = Executors.newCachedThreadPool();
-
-	    //	change broker URl in system properties
-	    System.setProperty("BrokerURL", getMasterConnectorURI(broker).toString());
-	    
-	    RunnableClient client1 = 
-	    		new RunnableClient(this, getMasterConnectorURI(broker), "NoOpAnnotatorQueue");
-	    BaseUIMAAsynchronousEngine_impl engine = client1.getUimaAsClient();
-	    deployService(engine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
-
-	    final BrokerService broker2 = setupSecondaryBroker(true);
-	    //	change broker URl in system properties
-	    System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
-	    
-	    RunnableClient client2 = 
-	    		new RunnableClient(this, broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
-	    BaseUIMAAsynchronousEngine_impl engine2 = client2.getUimaAsClient();
-	    deployService(engine2, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
-
-	    
-	    for( int x = 0; x < 100; x++) {
-		    List<Future<?>> list = new ArrayList<Future<?>>();
-		    String b;
-		    if ( x % 2 == 0 ) {
-		    	b = getMasterConnectorURI(broker);
-		    } else {
-		    	b = broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
-		    }
-		    for (int i = 0; i < 50; i++) {
-			    RunnableClient client = 
-			    		new RunnableClient(this, b, "NoOpAnnotatorQueue");
-			    list.add(executor.submit(client));;
-		    }
-		    for (int i = 0; i < 50; i++) {
-		    	list.get(i).get();
-		    }
-	    }
-	    executor.shutdownNow();
-	    while( !executor.isShutdown() ) {
-	    	synchronized(broker) {
-	    		broker.wait(0);
-	    	}
-	    }
-	    broker2.stop();
-	}
-
   
   
   public void testAggregateHttpTunnelling() throws Exception {
@@ -3404,6 +3316,141 @@ public class TestUimaASExtended extends
               "TopLevelTaeQueue", 1, PROCESS_LATCH);
     }
   }
+  public void testMultipleASClients() throws Exception  {
+	    System.out.println("-------------- testMultipleSyncClientsWithMultipleBrokers -------------");
+	    
+	    class RunnableClient implements Runnable {
+	    	String brokerURL;
+	    	BaseTestSupport testSupport;
+          BaseUIMAAsynchronousEngine_impl uimaAsEngine;
+          String serviceEndpoint;
+          
+          
+          RunnableClient(BaseTestSupport testSupport, String brokerURL,String serviceEndpoint) {
+	    		this.brokerURL = brokerURL;
+	    		this.testSupport = testSupport;
+	    		this.serviceEndpoint = serviceEndpoint;
+	    		uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+	    	}
+	    	public BaseUIMAAsynchronousEngine_impl getUimaAsClient() {
+	    		return uimaAsEngine;
+	    	}
+	    	public void initialize() throws Exception {
+	    		@SuppressWarnings("unchecked")
+			  Map<String, Object> appCtx = buildContext(brokerURL, serviceEndpoint);
+	    		appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+		  	  appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+		  	  testSupport.initialize(getUimaAsClient(), appCtx);
+		  	  waitUntilInitialized();
+		  	  
+	    	}
+			public void run() {
+				try {
+					initialize();
+					System.out.println("Thread:"+Thread.currentThread().getId()+" Completed GetMeta() broker:"+brokerURL);
+				} catch( Exception e) {
+					e.printStackTrace();
+				} finally {
+					try {
+				        uimaAsEngine.stop();
+					} catch( Exception e) {
+						e.printStackTrace();
+					}
+				}
+
+			}
+	    	
+	    }
+	    
+	    ExecutorService executor = Executors.newCachedThreadPool();
+      String serviceId1;
+      String serviceId2;
+
+	    //	change broker URl in system properties
+	    System.setProperty("BrokerURL", getMasterConnectorURI(broker).toString());
+	    
+	    RunnableClient client1 = 
+	    		new RunnableClient(this, getMasterConnectorURI(broker), "NoOpAnnotatorQueue");
+	    BaseUIMAAsynchronousEngine_impl engine = client1.getUimaAsClient();
+	    serviceId1 = deployService(engine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+
+	    final BrokerService broker2 = setupSecondaryBroker(true);
+	    //	change broker URl in system properties
+	    System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
+	    
+	    RunnableClient client2 = 
+	    		new RunnableClient(this, "failover:tcp://f5n633:51514,tcp://f12n1133:51514","NoOpAnnotatorQueue");//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
+	    		//new RunnableClient(this, "failover:ssl://f5n6:51514,ssl://f12n11:51514","NoOpAnnotatorQueue");//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
+	    BaseUIMAAsynchronousEngine_impl engine2 = client2.getUimaAsClient();
+//	    serviceId2 = deployService(engine2, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+
+	    
+	    for( int x = 0; x < 100; x++) {
+		    List<Future<?>> list1 = new ArrayList<Future<?>>();
+		    List<Future<?>> list2 = new ArrayList<Future<?>>();
+		    String b;
+		  /*
+		    if ( x % 2 == 0 ) {
+		    	b = getMasterConnectorURI(broker);
+		    } else {
+		    	b = "failover:ssl://f5n6:51514,ssl://f12n11:51514";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
+		    }
+		    */
+		    List<Future<?>> list = new ArrayList<Future<?>>();
+		    for (int i = 0; i < 20; i++) {
+		    	  if ( i % 2 == 0 ) {
+				    	b = getMasterConnectorURI(broker);
+				    	list = list1;
+				    } else {
+				    	b = "failover:tcp://f5n633:51514,tcp://f12n1133:51514?maxReconnectAttempts=2&timeout=300&transport.maxReconnectAttempts=2&transport.timeout=300&startupMaxReconnectAttempts=1";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
+//				    	b = "failover:ssl://f5n6:51514,ssl://f12n11:51514?maxReconnectAttempts=2&timeout=300&transport.maxReconnectAttempts=2&transport.timeout=300&startupMaxReconnectAttempts=1";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
+				    	list = list2;
+				    }
+			    RunnableClient client = 
+			    		new RunnableClient(this, b, "NoOpAnnotatorQueue");
+			    list.add(executor.submit(client));;
+		    }
+		    /*
+		    
+			for (int i = 0; i < 10; i++) {
+		    	try {
+			    	list.get(i).get();//1, TimeUnit.SECONDS);
+		    	} catch( Exception e) {
+		    		e.printStackTrace();
+			    	list.get(i).cancel(true);
+		    	}
+		    }
+		    */
+
+	    Worker worker1 = new Worker(list1);
+	    Worker worker2 = new Worker(list1);
+	    Thread t1 = new Thread(worker1);
+	    Thread t2 = new Thread(worker2);
+	    t1.start();
+	    t2.start();
+	    
+	    t1.join();
+	    t2.join();
+	    
+	    list.clear();
+	    
+	    }
+	//    engine2.undeploy(serviceId2);
+	    engine.undeploy(serviceId1);
+	    
+	    //engine2.stop();
+	    executor.shutdownNow();
+	    while( !executor.isShutdown() ) {
+	    	synchronized(broker) {
+	    		broker.wait(100);
+	    	}
+	    }
+	    broker2.stop();
+	    broker2.waitUntilStopped();
+	    //broker.stop();
+	    //broker.waitUntilStopped();
+	    //System.out.println("Done");
+	}
 
   private Exception getCause(Throwable e) {
     Exception cause = (Exception) e;
@@ -3554,4 +3601,27 @@ public class TestUimaASExtended extends
       System.out.println("Stopping TestListener Callback Listener Thread");
     }
   }
+  
+  private class Worker implements Runnable {
+
+	  
+	List<Future<?>> list = new ArrayList<Future<?>>();
+	
+	public Worker(List<Future<?>> list ) {
+		this.list = list;
+	}
+	@Override
+	public void run() {
+		for (int i = 0; i < list.size(); i++) {
+	    	try {
+		    	list.get(i).get();//1, TimeUnit.SECONDS);
+	    	} catch( Exception e) {
+	    		e.printStackTrace();
+		    	list.get(i).cancel(true);
+	    	}
+	    }
+
+	}
+	  
+  }
 }