You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/03/16 19:14:33 UTC
svn commit: r1735276 -
/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
Author: cwiklik
Date: Wed Mar 16 18:14:32 2016
New Revision: 1735276
URL: http://svn.apache.org/viewvc?rev=1735276&view=rev
Log:
UIMA-4830 added new testcase to test many uima-as clients
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1735276&r1=1735275&r2=1735276&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Wed Mar 16 18:14:32 2016
@@ -37,6 +37,7 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
@@ -134,6 +135,9 @@ public class TestUimaASExtended extends
}
*/
+
+
+
/**
* Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to emulate usage from
@@ -601,98 +605,6 @@ public class TestUimaASExtended extends
}
- public void testMultipleASClients() throws Exception {
- System.out.println("-------------- testMultipleSyncClientsWithMultipleBrokers -------------");
-
- class RunnableClient implements Runnable {
- String brokerURL;
- BaseTestSupport testSupport;
- BaseUIMAAsynchronousEngine_impl uimaAsEngine;
- String serviceEndpoint;
-
-
- RunnableClient(BaseTestSupport testSupport, String brokerURL,String serviceEndpoint) {
- this.brokerURL = brokerURL;
- this.testSupport = testSupport;
- this.serviceEndpoint = serviceEndpoint;
- uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
- }
- public BaseUIMAAsynchronousEngine_impl getUimaAsClient() {
- return uimaAsEngine;
- }
- public void initialize() throws Exception {
- @SuppressWarnings("unchecked")
- Map<String, Object> appCtx = buildContext(brokerURL, serviceEndpoint);
- appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
- appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
- testSupport.initialize(getUimaAsClient(), appCtx);
- waitUntilInitialized();
-
- }
- public void run() {
- try {
- initialize();
- System.out.println("Thread:"+Thread.currentThread().getId()+" Completed GetMeta() broker:"+brokerURL);
- } catch( Exception e) {
- e.printStackTrace();
- } finally {
- try {
- uimaAsEngine.stop();
- } catch( Exception e) {
- e.printStackTrace();
- }
- }
-
- }
-
- }
-
- ExecutorService executor = Executors.newCachedThreadPool();
-
- // change broker URl in system properties
- System.setProperty("BrokerURL", getMasterConnectorURI(broker).toString());
-
- RunnableClient client1 =
- new RunnableClient(this, getMasterConnectorURI(broker), "NoOpAnnotatorQueue");
- BaseUIMAAsynchronousEngine_impl engine = client1.getUimaAsClient();
- deployService(engine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
-
- final BrokerService broker2 = setupSecondaryBroker(true);
- // change broker URl in system properties
- System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
-
- RunnableClient client2 =
- new RunnableClient(this, broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
- BaseUIMAAsynchronousEngine_impl engine2 = client2.getUimaAsClient();
- deployService(engine2, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
-
-
- for( int x = 0; x < 100; x++) {
- List<Future<?>> list = new ArrayList<Future<?>>();
- String b;
- if ( x % 2 == 0 ) {
- b = getMasterConnectorURI(broker);
- } else {
- b = broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
- }
- for (int i = 0; i < 50; i++) {
- RunnableClient client =
- new RunnableClient(this, b, "NoOpAnnotatorQueue");
- list.add(executor.submit(client));;
- }
- for (int i = 0; i < 50; i++) {
- list.get(i).get();
- }
- }
- executor.shutdownNow();
- while( !executor.isShutdown() ) {
- synchronized(broker) {
- broker.wait(0);
- }
- }
- broker2.stop();
- }
-
public void testAggregateHttpTunnelling() throws Exception {
@@ -3404,6 +3316,141 @@ public class TestUimaASExtended extends
"TopLevelTaeQueue", 1, PROCESS_LATCH);
}
}
+ public void testMultipleASClients() throws Exception {
+ System.out.println("-------------- testMultipleSyncClientsWithMultipleBrokers -------------");
+
+ class RunnableClient implements Runnable {
+ String brokerURL;
+ BaseTestSupport testSupport;
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine;
+ String serviceEndpoint;
+
+
+ RunnableClient(BaseTestSupport testSupport, String brokerURL,String serviceEndpoint) {
+ this.brokerURL = brokerURL;
+ this.testSupport = testSupport;
+ this.serviceEndpoint = serviceEndpoint;
+ uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ }
+ public BaseUIMAAsynchronousEngine_impl getUimaAsClient() {
+ return uimaAsEngine;
+ }
+ public void initialize() throws Exception {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> appCtx = buildContext(brokerURL, serviceEndpoint);
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ testSupport.initialize(getUimaAsClient(), appCtx);
+ waitUntilInitialized();
+
+ }
+ public void run() {
+ try {
+ initialize();
+ System.out.println("Thread:"+Thread.currentThread().getId()+" Completed GetMeta() broker:"+brokerURL);
+ } catch( Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ uimaAsEngine.stop();
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ }
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ String serviceId1;
+ String serviceId2;
+
+ // change broker URl in system properties
+ System.setProperty("BrokerURL", getMasterConnectorURI(broker).toString());
+
+ RunnableClient client1 =
+ new RunnableClient(this, getMasterConnectorURI(broker), "NoOpAnnotatorQueue");
+ BaseUIMAAsynchronousEngine_impl engine = client1.getUimaAsClient();
+ serviceId1 = deployService(engine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+
+ final BrokerService broker2 = setupSecondaryBroker(true);
+ // change broker URl in system properties
+ System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
+
+ RunnableClient client2 =
+ new RunnableClient(this, "failover:tcp://f5n633:51514,tcp://f12n1133:51514","NoOpAnnotatorQueue");//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
+ //new RunnableClient(this, "failover:ssl://f5n6:51514,ssl://f12n11:51514","NoOpAnnotatorQueue");//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
+ BaseUIMAAsynchronousEngine_impl engine2 = client2.getUimaAsClient();
+// serviceId2 = deployService(engine2, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+
+
+ for( int x = 0; x < 100; x++) {
+ List<Future<?>> list1 = new ArrayList<Future<?>>();
+ List<Future<?>> list2 = new ArrayList<Future<?>>();
+ String b;
+ /*
+ if ( x % 2 == 0 ) {
+ b = getMasterConnectorURI(broker);
+ } else {
+ b = "failover:ssl://f5n6:51514,ssl://f12n11:51514";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
+ }
+ */
+ List<Future<?>> list = new ArrayList<Future<?>>();
+ for (int i = 0; i < 20; i++) {
+ if ( i % 2 == 0 ) {
+ b = getMasterConnectorURI(broker);
+ list = list1;
+ } else {
+ b = "failover:tcp://f5n633:51514,tcp://f12n1133:51514?maxReconnectAttempts=2&timeout=300&transport.maxReconnectAttempts=2&transport.timeout=300&startupMaxReconnectAttempts=1";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
+// b = "failover:ssl://f5n6:51514,ssl://f12n11:51514?maxReconnectAttempts=2&timeout=300&transport.maxReconnectAttempts=2&transport.timeout=300&startupMaxReconnectAttempts=1";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
+ list = list2;
+ }
+ RunnableClient client =
+ new RunnableClient(this, b, "NoOpAnnotatorQueue");
+ list.add(executor.submit(client));;
+ }
+ /*
+
+ for (int i = 0; i < 10; i++) {
+ try {
+ list.get(i).get();//1, TimeUnit.SECONDS);
+ } catch( Exception e) {
+ e.printStackTrace();
+ list.get(i).cancel(true);
+ }
+ }
+ */
+
+ Worker worker1 = new Worker(list1);
+ Worker worker2 = new Worker(list1);
+ Thread t1 = new Thread(worker1);
+ Thread t2 = new Thread(worker2);
+ t1.start();
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ list.clear();
+
+ }
+ // engine2.undeploy(serviceId2);
+ engine.undeploy(serviceId1);
+
+ //engine2.stop();
+ executor.shutdownNow();
+ while( !executor.isShutdown() ) {
+ synchronized(broker) {
+ broker.wait(100);
+ }
+ }
+ broker2.stop();
+ broker2.waitUntilStopped();
+ //broker.stop();
+ //broker.waitUntilStopped();
+ //System.out.println("Done");
+ }
private Exception getCause(Throwable e) {
Exception cause = (Exception) e;
@@ -3554,4 +3601,27 @@ public class TestUimaASExtended extends
System.out.println("Stopping TestListener Callback Listener Thread");
}
}
+
+ private class Worker implements Runnable {
+
+
+ List<Future<?>> list = new ArrayList<Future<?>>();
+
+ public Worker(List<Future<?>> list ) {
+ this.list = list;
+ }
+ @Override
+ public void run() {
+ for (int i = 0; i < list.size(); i++) {
+ try {
+ list.get(i).get();//1, TimeUnit.SECONDS);
+ } catch( Exception e) {
+ e.printStackTrace();
+ list.get(i).cancel(true);
+ }
+ }
+
+ }
+
+ }
}