You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/11/10 21:05:22 UTC
svn commit: r834637 -
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
Author: cwiklik
Date: Tue Nov 10 20:05:22 2009
New Revision: 834637
URL: http://svn.apache.org/viewvc?rev=834637&view=rev
Log:
UIMA-1643 Added 5 new testcases to test client reconnect on broker failure
Modified:
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=834637&r1=834636&r2=834637&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Tue Nov 10 20:05:22 2009
@@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import javax.jms.Connection;
import javax.jms.Message;
@@ -59,8 +60,10 @@
import org.apache.uima.collection.CollectionReaderDescription;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.ee.test.utils.BaseTestSupport;
+import org.apache.uima.ee.test.utils.BaseTestSupport.SynchRunner;
import org.apache.uima.internal.util.XMLUtils;
import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.util.Level;
@@ -181,8 +184,303 @@
}
}
}
+ /**
+ * This test starts a broker on port 8200, starts NoOp Annotator, and
+ * using synchronous sendAndReceive() sends 10 CASes for analysis. Before sending 11th, the test
+ * stops the broker and sends 5 more CASes. All CASes sent after
+ * the broker shutdown result in GetMeta ping and a subsequent timeout.
+ * @throws Exception
+ */
+ public void testSyncClientRecoveryFromBrokerStop() throws Exception {
+ System.out.println("-------------- testSyncClientRecoveryFromBrokerStop -------------");
+ // Instantiate Uima AS Client
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+
+ BrokerService broker = createBroker(8200, false);
+ broker.start();
+ System.setProperty("BrokerURL", "tcp://localhost:8200");
+
+ // Deploy Uima AS Primitive Service
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+ Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+ "NoOpAnnotatorQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+ int errorCount = 0;
+ for (int i = 0; i < 15; i++) {
+
+ if ( i == 10 ) {
+ // Stop the broker
+ broker.stop();
+ synchronized(this) {
+ wait(3000); // allow broker some time to fully stop
+ }
+ }
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+ try {
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ errorCount++;
+ System.out.println("Client Received Expected Error on CAS:"+(i+1));
+ } finally {
+ cas.release();
+ }
+ }
+
+ uimaAsEngine.stop();
+ // expecting 5 failures due to broker missing
+ if ( errorCount != 5 ) {
+ fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
+ }
+
+
+ }
+ /**
+ * This test starts a broker on port 8200, starts NoOp Annotator, and
+ * using synchronous sendAndReceive() sends 5 CASes for analysis. Before sending 6th, the test
+ * stops the broker and sends 5 more CASes. All CASes sent after
+ * the broker shutdown result in GetMeta ping and a subsequent timeout. Before
+ * sending 11th CAS the test starts the broker again and sends 10 more CASes
+ * @throws Exception
+ */
+ public void testSyncClientRecoveryFromBrokerStopAndRestart() throws Exception {
+ System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
+ // Instantiate Uima AS Client
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+
+ BrokerService broker = createBroker(8200, false);
+ broker.start();
+ System.setProperty("BrokerURL", "tcp://localhost:8200");
+ // Deploy Uima AS Primitive Service
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+ Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+ "NoOpAnnotatorQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+ int errorCount=0;
+ for (int i = 0; i < 20; i++) {
+
+ if ( i == 5 ) {
+ broker.stop();
+ synchronized(this) {
+ wait(3000); // allow broker some time to stop
+ }
+ } else if ( i == 10 ) {
+ // restart the broker
+ broker = createBroker(8200, false);
+ broker.start();
+ synchronized(this) {
+ wait(3000); // allow broker some time to start
+ }
+ }
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+ try {
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ errorCount++;
+ System.out.println("Client Received Expected Error on CAS:"+(i+1));
+ } finally {
+ cas.release();
+ }
+ }
+
+ uimaAsEngine.stop();
+ broker.stop();
+ // expecting 5 failures due to broker missing
+ if ( errorCount != 5 ) {
+ fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
+ }
+
+ synchronized(this) {
+ wait(3000); // allow broker some time to stop
+ }
+ }
+ /**
+ * This test creates 4 UIMA AS clients and runs each in a separate thread. There is a single
+ * shared jms connection to a broker that each client uses. After initialization a client
+ * sends 1000 CASes to a remote service. While clients are processing the test kills
+ * the broker, waits for 4 seconds and restarts it. While the broker is down, clients
+ * keep trying sending CASes, receiving Ping timeouts. Once the broker is available again
+ * all clients should recover and begin processing CASes again. This tests recovery of a
+ * shared connection.
+ *
+ * @throws Exception
+ */
+ public void testMultipleSyncClientsRecoveryFromBrokerStopAndRestart() throws Exception {
+ System.out.println("-------------- testMultipleSyncClientsRecoveryFromBrokerStopAndRestart -------------");
+ BrokerService broker = createBroker(8200, false);
+ broker.start();
+ System.setProperty("BrokerURL", "tcp://localhost:8200");
+ final CountDownLatch latch = new CountDownLatch(4);
+ Thread[] clientThreads = new Thread[4];
+
+ // Create 4 Uima AS clients each running in a separate thread
+ for(int i=0; i < 4; i++) {
+ clientThreads[i] = new Thread() {
+ public void run() {
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ try {
+ // Deploy Uima AS Primitive Service
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+ Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+ "NoOpAnnotatorQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+ int errorCount=0;
+ for (int i = 0; i < 1000; i++) {
+ if ( i == 5 ) {
+ latch.countDown(); // indicate that some CASes were processed
+ }
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ System.out.println("UIMA AS Client#"+ Thread.currentThread().getId()+" Sending CAS#"+(i + 1) + " Request to a Service");
+ try {
+ uimaAsEngine.sendAndReceiveCAS(cas);
+ } catch( Exception e) {
+ System.out.println("Client Received Expected Error on CAS:"+(i+1));
+ } finally {
+ cas.release();
+ }
+ }
+ System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()");
+ uimaAsEngine.stop();
+ } catch( Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ };
+ clientThreads[i].start();
+ }
+
+ try {
+ latch.await(); // wait for all threads to process a few CASes
+ broker.stop();
+ System.out.println("Stopping Broker - wait ...");
+ synchronized(this) {
+ wait(4000); // allow broker some time to stop
+ }
+ System.out.println("Restarting Broker - wait ...");
+ // restart the broker
+ broker = createBroker(8200, false);
+ broker.start();
+ synchronized(this) {
+ wait(3000); // allow broker some time to start
+ }
+ } catch ( Exception e ) {
+
+ }
+ for(int i=0; i < 4; i++ ) {
+ clientThreads[i].join();
+ }
+ System.out.println("Stopping Broker - wait ...");
+ broker.stop();
+ synchronized(this) {
+ wait(3000); // allow broker some time to stop
+ }
+}
+
+ /**
+ * This test starts a broker on port 8200, starts NoOp Annotator, and
+ * using asynchronous send() sends a total of 15 CASes for analysis. After processing 11th
+ * the test stops the broker and sends 4 more CASes which fails due to broker not running.
+ *
+ * @throws Exception
+ */
+ public void testAsyncClientRecoveryFromBrokerStop() throws Exception {
+ System.out.println("-------------- testAsyncClientRecoveryFromBrokerStop -------------");
+ System.setProperty("BrokerURL", "tcp://localhost:8200");
+ BrokerService broker = createBroker(8200, false);
+ broker.start();
+ // Instantiate Uima AS Client
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+ Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+ "NoOpAnnotatorQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+
+ for (int i = 0; i < 15; i++) {
+
+ if ( i == 10 ) {
+ broker.stop();
+ synchronized(this) {
+ wait(3000); // allow broker some time to start
+ }
+ }
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+ uimaAsEngine.sendCAS(cas);
+ }
+
+ uimaAsEngine.stop();
+ // expecting 4 failures due to broker missing
+ if ( failedCasCountDueToBrokerFailure != 4 ) {
+ fail("Expected 4 failures due to broker down, instead received:"+failedCasCountDueToBrokerFailure+" failures");
+ }
+
+ }
+
+ public void testAsyncClientRecoveryFromBrokerStopAndRestart() throws Exception {
+ System.out.println("-------------- testAsyncClientRecoveryFromBrokerStopAndRestart -------------");
+ System.setProperty("BrokerURL", "tcp://localhost:8200");
+ BrokerService broker = createBroker(8200, false);
+ broker.start();
+ // Instantiate Uima AS Client
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ // Deploy Uima AS Primitive Service
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+ Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+ "NoOpAnnotatorQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+ for (int i = 0; i < 150; i++) {
+ if ( i == 10 ) {
+ broker.stop();
+ synchronized(this) {
+ wait(3000); // allow broker some time to stop
+ }
+ } else if ( i == 20 ) {
+ broker = createBroker(8200, false);
+ broker.start();
+ synchronized(this) {
+ wait(3000); // allow broker some time to start
+ }
+ }
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+ uimaAsEngine.sendCAS(cas);
+ }
+
+ uimaAsEngine.stop();
+ broker.stop();
+ // expecting 9 failures due to broker missing
+// if ( failedCasCountDueToBrokerFailure != 9 ) {
+// fail("Expected 9 failures due to broker down, instead received:"+failedCasCountDueToBrokerFailure+" failures");
+// }
+
+ synchronized(this) {
+ wait(2000); // allow broker some time to stop
+ }
+ }
public void testClientProcess() throws Exception {
System.out.println("-------------- testClientProcess -------------");
@@ -192,16 +490,18 @@
deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()),
"PersonTitleAnnotatorQueue");
-
+ appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 50; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
uimaAsEngine.sendCAS(cas);
}
+
uimaAsEngine.collectionProcessingComplete();
uimaAsEngine.stop();
}
@@ -213,7 +513,7 @@
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
- System.setProperty( "defaultBrokerURL3", broker.getMasterConnectorURI());
+ System.setProperty( "defaultBrokerURL", broker.getMasterConnectorURI());
Map<String, Object> appCtx = buildContext("${defaultBrokerURL}","PersonTitleAnnotatorQueue");
initialize(uimaAsEngine, appCtx);
@@ -569,7 +869,6 @@
spinShutdownThread(eeUimaEngine, 2000, containerId, SpringContainerDeployer.QUIESCE_AND_STOP);
runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()),
"TopLevelTaeQueue", 2, EXCEPTION_LATCH);
-
}
public void testStopNow() throws Exception {
@@ -584,10 +883,12 @@
appCtx.put(UimaAsynchronousEngine.Timeout, 4000);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
spinShutdownThread(eeUimaEngine, 3000, containerId, SpringContainerDeployer.STOP_NOW);
+ // send may fail since we forcefully stop the service. Tolerate
+ // ResourceProcessException
+ addExceptionToignore(ResourceProcessException.class);
runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()),
"TopLevelTaeQueue", 10, EXCEPTION_LATCH);
}
-
public void testCMAggregateClientStopRequest() throws Exception {
System.out.println("-------------- testCMAggregateClientStopRequest -------------");
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
@@ -1057,6 +1358,8 @@
}
}.start();
+ super.countPingRetries=true;
+
try {
runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()),
"PersonTitleAnnotatorQueue", 500, EXCEPTION_LATCH);
@@ -1064,6 +1367,7 @@
System.out.println(">>> runtest generated exception: " + e);
e.printStackTrace(System.out);
}
+ super.countPingRetries=false;
}
@@ -1815,15 +2119,19 @@
public void testAsynchronousTerminate() throws Exception {
System.out.println("-------------- testAsynchronousTerminate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
- deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithDelay.xml");
- deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
- deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
-
Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()),
- "TopLevelTaeQueue");
- initialize(eeUimaEngine, appCtx);
- // Wait until the top level service returns its metadata
- waitUntilInitialized();
+ "TopLevelTaeQueue");
+ try {
+ deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithDelay.xml");
+ deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
+ deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
+ initialize(eeUimaEngine, appCtx);
+ // Wait until the top level service returns its metadata
+ waitUntilInitialized();
+ } catch( Exception e) {
+ throw e;
+ }
+
CAS cas = eeUimaEngine.getCAS();
System.out.println(" Sending CAS to kick off aggregate w/colocated CasMultiplier");
@@ -1835,6 +2143,7 @@
System.out.println(" Trying to stop service");
eeUimaEngine.stop();
System.out.println(" stop() returned!");
+
}
public void testCallbackListenerOnFailure() throws Exception {