You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/11/10 21:05:22 UTC

svn commit: r834637 - /incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java

Author: cwiklik
Date: Tue Nov 10 20:05:22 2009
New Revision: 834637

URL: http://svn.apache.org/viewvc?rev=834637&view=rev
Log:
UIMA-1643 Added 5 new testcases to test client reconnect on broker failure

Modified:
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=834637&r1=834636&r2=834637&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Tue Nov 10 20:05:22 2009
@@ -28,6 +28,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -59,8 +60,10 @@
 import org.apache.uima.collection.CollectionReaderDescription;
 import org.apache.uima.collection.EntityProcessStatus;
 import org.apache.uima.ee.test.utils.BaseTestSupport;
+import org.apache.uima.ee.test.utils.BaseTestSupport.SynchRunner;
 import org.apache.uima.internal.util.XMLUtils;
 import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceProcessException;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
 import org.apache.uima.util.Level;
@@ -181,8 +184,303 @@
       }
     }
   }
+  /**
+   * This test starts a broker on port 8200, starts NoOp Annotator, and
+   * using synchronous sendAndReceive() sends 10 CASes for analysis. Before sending 11th, the test
+   * stops the broker and sends 5 more CASes. All CASes sent after
+   * the broker shutdown result in GetMeta ping and a subsequent timeout.
+   * @throws Exception
+   */
+  public void testSyncClientRecoveryFromBrokerStop() throws Exception  {
+    System.out.println("-------------- testSyncClientRecoveryFromBrokerStop -------------");
+     // Instantiate Uima AS Client
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      
+      BrokerService broker = createBroker(8200, false);
+      broker.start();
+      System.setProperty("BrokerURL", "tcp://localhost:8200");
+      
+      // Deploy Uima AS Primitive Service
+      deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+      Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+              "NoOpAnnotatorQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+      appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+      int errorCount = 0;
+      for (int i = 0; i < 15; i++) {
+        
+        if ( i == 10 ) {
+          //  Stop the broker
+          broker.stop();
+          synchronized(this) {
+            wait(3000);   // allow broker some time to fully stop  
+          }
+        }
+        CAS cas = uimaAsEngine.getCAS();
+        cas.setDocumentText("Some Text");
+        System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+        try {
+          uimaAsEngine.sendAndReceiveCAS(cas);
+        } catch( Exception e) {
+          errorCount++;
+          System.out.println("Client Received Expected Error on CAS:"+(i+1));
+        } finally {
+          cas.release();
+        }
+      }
+      
+      uimaAsEngine.stop();
+      //  expecting 5 failures due to broker missing
+      if ( errorCount != 5 ) {
+        fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
+      }
+      
+      
+  }
+  /**
+   * This test starts a broker on port 8200, starts NoOp Annotator, and
+   * using synchronous sendAndReceive() sends 5 CASes for analysis. Before sending 6th, the test
+   * stops the broker and sends 5 more CASes. All CASes sent after
+   * the broker shutdown result in GetMeta ping and a subsequent timeout. Before
+   * sending 11th CAS the test starts the broker again and sends 10 more CASes 
+   * @throws Exception
+   */
+  public void testSyncClientRecoveryFromBrokerStopAndRestart() throws Exception  {
+    System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
+     // Instantiate Uima AS Client
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      
+      BrokerService broker = createBroker(8200, false);
+      broker.start();
+      System.setProperty("BrokerURL", "tcp://localhost:8200");
+      // Deploy Uima AS Primitive Service
+      deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+      Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+              "NoOpAnnotatorQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+      appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+      int errorCount=0;
+      for (int i = 0; i < 20; i++) {
+        
+        if ( i == 5 ) {
+          broker.stop();
+          synchronized(this) {
+            wait(3000);   // allow broker some time to stop 
+          }
+        } else if ( i == 10 ) {
+          //  restart the broker 
+          broker = createBroker(8200, false);
+          broker.start();
+          synchronized(this) {
+            wait(3000);   // allow broker some time to start  
+          }
+        }
+        CAS cas = uimaAsEngine.getCAS();
+        cas.setDocumentText("Some Text");
+        System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+        try {
+          uimaAsEngine.sendAndReceiveCAS(cas);
+        } catch( Exception e) {
+          errorCount++;
+          System.out.println("Client Received Expected Error on CAS:"+(i+1));
+        } finally {
+          cas.release();
+        }
+      }
+      
+      uimaAsEngine.stop();
+      broker.stop();
 
+      //  expecting 5 failures due to broker missing
+      if ( errorCount != 5 ) {
+        fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
+      }
+      
+      synchronized(this) {
+        wait(3000);   // allow broker some time to stop  
+      }
+  }
+  /**
+   * This test creates 4 UIMA AS clients and runs each in a separate thread. There is a single 
+   * shared jms connection to a broker that each client uses. After initialization a client
+   * sends 1000 CASes to a remote service. While clients are processing the test kills 
+   * the broker, waits for 4 seconds and restarts it. While the broker is down, clients
+   * keep trying sending CASes, receiving Ping timeouts. Once the broker is available again
+   * all clients should recover and begin processing CASes again. This tests recovery of a 
+   * shared connection.
+   * 
+   * @throws Exception
+   */
+  public void testMultipleSyncClientsRecoveryFromBrokerStopAndRestart() throws Exception  {
+    System.out.println("-------------- testMultipleSyncClientsRecoveryFromBrokerStopAndRestart -------------");
+    BrokerService broker = createBroker(8200, false);
+    broker.start();
+    System.setProperty("BrokerURL", "tcp://localhost:8200");
+    final CountDownLatch latch = new CountDownLatch(4);
+    Thread[] clientThreads = new Thread[4];
+    
+    //  Create 4 Uima AS clients each running in a separate thread
+    for(int i=0; i < 4; i++) {
+      clientThreads[i] = new Thread() {
+        public void run() {
+          BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+          try {
+            // Deploy Uima AS Primitive Service
+            deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+            Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+                    "NoOpAnnotatorQueue");
+            appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+            appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+            initialize(uimaAsEngine, appCtx);
+            waitUntilInitialized();
+            int errorCount=0;
+            for (int i = 0; i < 1000; i++) {
+              if ( i == 5 ) {
+                latch.countDown();   // indicate that some CASes were processed
+              }
+              CAS cas = uimaAsEngine.getCAS();
+              cas.setDocumentText("Some Text");
+              System.out.println("UIMA AS Client#"+ Thread.currentThread().getId()+" Sending CAS#"+(i + 1) + " Request to a Service");
+              try {
+                uimaAsEngine.sendAndReceiveCAS(cas);
+              } catch( Exception e) {
+                System.out.println("Client Received Expected Error on CAS:"+(i+1));
+              } finally {
+                cas.release();
+              }
+            }
+            System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()");
+            uimaAsEngine.stop();
+          } catch( Exception e) {
+            e.printStackTrace();
+            return;
+          }
+        }
+      };
+      clientThreads[i].start();
+    }
+    
+    try {
+      latch.await();  // wait for all threads to process a few CASes
+      broker.stop();
+      System.out.println("Stopping Broker - wait ...");
+      synchronized(this) {
+          wait(4000);   // allow broker some time to stop 
+      }
+      System.out.println("Restarting Broker - wait ...");
+      //  restart the broker 
+      broker = createBroker(8200, false);
+      broker.start();
+      synchronized(this) {
+        wait(3000);   // allow broker some time to start  
+      }
+    } catch ( Exception e ) {
+      
+    }
+    for(int i=0; i < 4; i++ ) {
+      clientThreads[i].join();
+    }
+    System.out.println("Stopping Broker - wait ...");
+    broker.stop();
+    synchronized(this) {
+       wait(3000);   // allow broker some time to stop  
+    }
+}
+
+  /**
+   * This test starts a broker on port 8200, starts NoOp Annotator, and
+   * using asynchronous send() sends a total of 15 CASes for analysis. After processing 11th
+   * the test stops the broker and sends 4 more CASes which fails due to broker not running.
+   * 
+   * @throws Exception
+   */
+  public void testAsyncClientRecoveryFromBrokerStop() throws Exception  {
+    System.out.println("-------------- testAsyncClientRecoveryFromBrokerStop -------------");
+    System.setProperty("BrokerURL", "tcp://localhost:8200");
+    BrokerService broker = createBroker(8200, false);
+    broker.start();
+     // Instantiate Uima AS Client
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+      Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+      "NoOpAnnotatorQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+      appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+      
+      for (int i = 0; i < 15; i++) {
+        
+        if ( i == 10 ) {
+          broker.stop();
+          synchronized(this) {
+            wait(3000);   // allow broker some time to start  
+          }
+        }
+        CAS cas = uimaAsEngine.getCAS();
+        cas.setDocumentText("Some Text");
+        System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+        uimaAsEngine.sendCAS(cas);
+      }
+      
+      uimaAsEngine.stop();
+      //  expecting 4 failures due to broker missing
+      if ( failedCasCountDueToBrokerFailure != 4 ) {
+        fail("Expected 4 failures due to broker down, instead received:"+failedCasCountDueToBrokerFailure+" failures");
+      }
+        
+  }
+  
+  public void testAsyncClientRecoveryFromBrokerStopAndRestart() throws Exception  {
+    System.out.println("-------------- testAsyncClientRecoveryFromBrokerStopAndRestart -------------");
+    System.setProperty("BrokerURL", "tcp://localhost:8200");
+    BrokerService broker = createBroker(8200, false);
+    broker.start();
+     // Instantiate Uima AS Client
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      // Deploy Uima AS Primitive Service
+      deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+      Map<String, Object> appCtx = buildContext("tcp://localhost:8200",
+      "NoOpAnnotatorQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+      appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
 
+      for (int i = 0; i < 150; i++) {
+        if ( i == 10 ) {
+          broker.stop();
+          synchronized(this) {
+            wait(3000);   // allow broker some time to stop  
+          }
+        } else if ( i == 20 ) {
+          broker = createBroker(8200, false);
+          broker.start();
+          synchronized(this) {
+            wait(3000);   // allow broker some time to start  
+          }
+        }
+        CAS cas = uimaAsEngine.getCAS();
+        cas.setDocumentText("Some Text");
+        System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
+        uimaAsEngine.sendCAS(cas);
+      }
+      
+      uimaAsEngine.stop();
+      broker.stop();
+      //  expecting 9 failures due to broker missing
+//      if ( failedCasCountDueToBrokerFailure != 9 ) {
+//        fail("Expected 9 failures due to broker down, instead received:"+failedCasCountDueToBrokerFailure+" failures");
+//      }
+
+      synchronized(this) {
+        wait(2000);   // allow broker some time to stop  
+      }
+  }
   public void testClientProcess() throws Exception {
     System.out.println("-------------- testClientProcess -------------");
     
@@ -192,16 +490,18 @@
     deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
     Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()),
             "PersonTitleAnnotatorQueue");
-
+    appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+    appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
     initialize(uimaAsEngine, appCtx);
     waitUntilInitialized();
 
-    for (int i = 0; i < 100; i++) {
+    for (int i = 0; i < 50; i++) {
       CAS cas = uimaAsEngine.getCAS();
       cas.setDocumentText("Some Text");
       System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
       uimaAsEngine.sendCAS(cas);
     }
+    
     uimaAsEngine.collectionProcessingComplete();
     uimaAsEngine.stop();
   }
@@ -213,7 +513,7 @@
     // Deploy Uima AS Primitive Service
     deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
 
-    System.setProperty( "defaultBrokerURL3", broker.getMasterConnectorURI());
+    System.setProperty( "defaultBrokerURL", broker.getMasterConnectorURI());
     Map<String, Object> appCtx = buildContext("${defaultBrokerURL}","PersonTitleAnnotatorQueue");
 
     initialize(uimaAsEngine, appCtx);
@@ -569,7 +869,6 @@
     spinShutdownThread(eeUimaEngine, 2000, containerId, SpringContainerDeployer.QUIESCE_AND_STOP);
     runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()),
             "TopLevelTaeQueue", 2, EXCEPTION_LATCH);
-
   }
 
   public void testStopNow() throws Exception {
@@ -584,10 +883,12 @@
     appCtx.put(UimaAsynchronousEngine.Timeout, 4000);
     appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
     spinShutdownThread(eeUimaEngine, 3000, containerId, SpringContainerDeployer.STOP_NOW);
+    //  send may fail since we forcefully stop the service. Tolerate
+    //  ResourceProcessException
+    addExceptionToignore(ResourceProcessException.class); 
     runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()),
             "TopLevelTaeQueue", 10, EXCEPTION_LATCH);
   }
-
   public void testCMAggregateClientStopRequest() throws Exception {
     System.out.println("-------------- testCMAggregateClientStopRequest -------------");
     final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
@@ -1057,6 +1358,8 @@
       }
     }.start();
 
+    super.countPingRetries=true;
+    
     try {
       runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()),
               "PersonTitleAnnotatorQueue", 500, EXCEPTION_LATCH);
@@ -1064,6 +1367,7 @@
       System.out.println(">>> runtest generated exception: " + e);
       e.printStackTrace(System.out);
     }
+    super.countPingRetries=false;
 
   }
 
@@ -1815,15 +2119,19 @@
   public void testAsynchronousTerminate() throws Exception {
     System.out.println("-------------- testAsynchronousTerminate -------------");
     BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
-    deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithDelay.xml");
-    deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
-    deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
-
     Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()),
-            "TopLevelTaeQueue");
-    initialize(eeUimaEngine, appCtx);
-    // Wait until the top level service returns its metadata
-    waitUntilInitialized();
+    "TopLevelTaeQueue");
+    try {
+      deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithDelay.xml");
+      deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
+      deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
+      initialize(eeUimaEngine, appCtx);
+      // Wait until the top level service returns its metadata
+      waitUntilInitialized();
+    } catch( Exception e) {
+			throw e;
+    }
+
 
     CAS cas = eeUimaEngine.getCAS();
     System.out.println(" Sending CAS to kick off aggregate w/colocated CasMultiplier");
@@ -1835,6 +2143,7 @@
     System.out.println(" Trying to stop service");
     eeUimaEngine.stop();
     System.out.println(" stop() returned!");
+    
   }
 
   public void testCallbackListenerOnFailure() throws Exception {