You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/11/10 21:00:31 UTC

svn commit: r834635 - /incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java

Author: cwiklik
Date: Tue Nov 10 20:00:30 2009
New Revision: 834635

URL: http://svn.apache.org/viewvc?rev=834635&view=rev
Log:
UIMA-1643 Added new listener. Modified to detect broker failure.

Modified:
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=834635&r1=834634&r2=834635&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Tue Nov 10 20:00:30 2009
@@ -45,6 +45,7 @@
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.collection.EntityProcessStatus;
+import org.apache.uima.jms.error.handler.BrokerConnectionException;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.resource.ResourceProcessException;
 import org.apache.uima.util.Level;
@@ -103,6 +104,12 @@
 
   protected boolean receivedExpectedParentReferenceId = false;
 
+  protected int maxPingRetryCount = 4;
+  
+  protected volatile boolean countPingRetries = false;
+
+  protected long failedCasCountDueToBrokerFailure = 0;
+  
   protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
           String aDeploymentDescriptorPath) throws Exception {
     String defaultBrokerURL = System.getProperty("BrokerURL");
@@ -289,7 +296,7 @@
 
   protected void runTestWithMultipleThreads(String serviceDeplyDescriptor, String queueName,
           int howManyCASesPerRunningThread, int howManyRunningThreads, int timeout,
-          int aGetMetaTimeout, boolean failOnTimeout) throws Exception {
+          int aGetMetaTimeout, boolean failOnTimeout ) throws Exception {
     // Instantiate Uima EE Client
     isStopped = false;
     isStopping = false;
@@ -338,7 +345,7 @@
 
     // Spin runner threads and start sending CASes
     for (int i = 0; i < howManyRunningThreads; i++) {
-      SynchRunner runner = new SynchRunner(eeUimaEngine, howManyCASesPerRunningThread);
+      SynchRunner runner = new SynchRunner(eeUimaEngine, howManyCASesPerRunningThread, listener);
       Thread runnerThread = new Thread(runner, "Runner" + i);
       runnerThread.start();
       System.out.println("runTest: Started Runner Thread::Id=" + runnerThread.getId());
@@ -447,7 +454,9 @@
         long startTime = System.currentTimeMillis();
         if (!isStopped) {
           // Send an in CAS to the top level service
-          sendCAS(aUimaEeEngine, howMany, sendCasAsynchronously);
+          try {
+            sendCAS(aUimaEeEngine, howMany, sendCasAsynchronously);
+          } catch( Exception e) {}
         }
         // Wait until ALL CASes return from the service
         if (t2 != null) {
@@ -618,7 +627,7 @@
   protected class UimaAsTestCallbackListener extends UimaAsBaseCallbackListener {
 
     private String casSent = null;
-
+    private int pingTimeoutCount=0;
     public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
       casSent = status.getCasReferenceId();
       System.out.println("runTest: Received onBeforeMessageSend() Notification With CAS:"
@@ -662,7 +671,10 @@
 
         for (int i = 0; i < list.size(); i++) {
           Exception e = (Exception) list.get(i);
-          if (e instanceof ServiceShutdownException
+          if ( e instanceof BrokerConnectionException ) {
+            System.out.println("Client Reported Broker Connection Failure");
+            failedCasCountDueToBrokerFailure++;
+          } else if (e instanceof ServiceShutdownException
                   || (e.getCause() != null && e.getCause() instanceof ServiceShutdownException)) {
             serviceShutdownException = true;
             isStopping = true;
@@ -681,6 +693,22 @@
             }
             isStopping = true;
             engine.stop();
+          } else if ( engine != null && e instanceof UimaASProcessCasTimeout) {
+            if ( e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) {
+              if ( countPingRetries ) {
+                if ( pingTimeoutCount > maxPingRetryCount ) {
+                  if (cpcLatch != null) {
+                    cpcLatch.countDown();
+                  }
+                  isStopping = true;
+                  engine.stop();
+                  
+                } else {
+                  pingTimeoutCount++;
+                }
+                
+              }
+            }
           }
           if (!expectedException) {
             e.printStackTrace();
@@ -841,20 +869,60 @@
     }
   }
 
+
+  public class SimpleCallbackListener extends UimaAsTestCallbackListener {
+
+    private String casSent = null;
+
+
+    /**
+     * Callback method which is called by Uima EE client when a reply to process CAS is received.
+     * The reply contains either the CAS or an exception that occurred while processing the CAS.
+     */
+    public synchronized void entityProcessComplete(CAS aCAS, EntityProcessStatus aProcessStatus) {
+      String casReferenceId = null;
+
+      if (isStopping) {
+        System.out
+                .println(">>>>> runTest: Ignoring entityProcessComplete callback as engine is shutting down");
+        return;
+      }
+      
+      if (aProcessStatus instanceof UimaASProcessStatus) {
+        if (  aProcessStatus.isException() ) {
+          System.out.println("--------- Got Exception While Processing CAS:"+casReferenceId);
+        } else {
+          casReferenceId = ((UimaASProcessStatus) aProcessStatus).getCasReferenceId();
+          System.out.println("Client Received Reply - CAS:"+casReferenceId);
+        }
+      }
+      processCountLatch.countDown();
+
+    }
+
+  }
+  
+  
+  
+  
   /**
    * A Runnable class used to test concurrency support in Uima EE client. Each instance of this
    * class will start and send specified number of CASes to a service using synchronous
    * sendAndReceive API. Each thread sends a CAS and waits for a reply.
    * 
    */
-  protected class SynchRunner implements Runnable {
+  public class SynchRunner implements Runnable {
     private BaseUIMAAsynchronousEngine_impl uimaClient = null;
 
     private long howManyCASes = 1;
-
+    private UimaAsTestCallbackListener callbackListener;
     public SynchRunner(BaseUIMAAsynchronousEngine_impl aUimaClient, int howMany) {
+      this(aUimaClient, howMany, null);
+    }
+    public SynchRunner(BaseUIMAAsynchronousEngine_impl aUimaClient, int howMany, UimaAsTestCallbackListener aListener) {
       uimaClient = aUimaClient;
       howManyCASes = howMany;
+      callbackListener = aListener;
     }
 
     // Run until All CASes are sent
@@ -871,10 +939,13 @@
             String casReferenceId = uimaClient.sendAndReceiveCAS(cas, pt);
             status = new UimaASProcessStatusImpl(pt, casReferenceId);
           } catch (ResourceProcessException rpe) {
+            //rpe.printStackTrace();
             status = new UimaASProcessStatusImpl(pt);
             status.addEventStatus("Process", "Failed", rpe);
           } finally {
-            listener.entityProcessComplete(cas, status);
+            if ( callbackListener != null ) {
+              callbackListener.entityProcessComplete(cas, status);
+            }
             cas.release();
           }
         }