You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/11/10 21:00:31 UTC
svn commit: r834635 -
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
Author: cwiklik
Date: Tue Nov 10 20:00:30 2009
New Revision: 834635
URL: http://svn.apache.org/viewvc?rev=834635&view=rev
Log:
UIMA-1643 Added new listener. Modified to detect broker failure.
Modified:
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=834635&r1=834634&r2=834635&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Tue Nov 10 20:00:30 2009
@@ -45,6 +45,7 @@
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.EntityProcessStatus;
+import org.apache.uima.jms.error.handler.BrokerConnectionException;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.util.Level;
@@ -103,6 +104,12 @@
protected boolean receivedExpectedParentReferenceId = false;
+ protected int maxPingRetryCount = 4;
+
+ protected volatile boolean countPingRetries = false;
+
+ protected long failedCasCountDueToBrokerFailure = 0;
+
protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
String aDeploymentDescriptorPath) throws Exception {
String defaultBrokerURL = System.getProperty("BrokerURL");
@@ -289,7 +296,7 @@
protected void runTestWithMultipleThreads(String serviceDeplyDescriptor, String queueName,
int howManyCASesPerRunningThread, int howManyRunningThreads, int timeout,
- int aGetMetaTimeout, boolean failOnTimeout) throws Exception {
+ int aGetMetaTimeout, boolean failOnTimeout ) throws Exception {
// Instantiate Uima EE Client
isStopped = false;
isStopping = false;
@@ -338,7 +345,7 @@
// Spin runner threads and start sending CASes
for (int i = 0; i < howManyRunningThreads; i++) {
- SynchRunner runner = new SynchRunner(eeUimaEngine, howManyCASesPerRunningThread);
+ SynchRunner runner = new SynchRunner(eeUimaEngine, howManyCASesPerRunningThread, listener);
Thread runnerThread = new Thread(runner, "Runner" + i);
runnerThread.start();
System.out.println("runTest: Started Runner Thread::Id=" + runnerThread.getId());
@@ -447,7 +454,9 @@
long startTime = System.currentTimeMillis();
if (!isStopped) {
// Send an in CAS to the top level service
- sendCAS(aUimaEeEngine, howMany, sendCasAsynchronously);
+ try {
+ sendCAS(aUimaEeEngine, howMany, sendCasAsynchronously);
+ } catch( Exception e) {}
}
// Wait until ALL CASes return from the service
if (t2 != null) {
@@ -618,7 +627,7 @@
protected class UimaAsTestCallbackListener extends UimaAsBaseCallbackListener {
private String casSent = null;
-
+ private int pingTimeoutCount=0;
public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
casSent = status.getCasReferenceId();
System.out.println("runTest: Received onBeforeMessageSend() Notification With CAS:"
@@ -662,7 +671,10 @@
for (int i = 0; i < list.size(); i++) {
Exception e = (Exception) list.get(i);
- if (e instanceof ServiceShutdownException
+ if ( e instanceof BrokerConnectionException ) {
+ System.out.println("Client Reported Broker Connection Failure");
+ failedCasCountDueToBrokerFailure++;
+ } else if (e instanceof ServiceShutdownException
|| (e.getCause() != null && e.getCause() instanceof ServiceShutdownException)) {
serviceShutdownException = true;
isStopping = true;
@@ -681,6 +693,22 @@
}
isStopping = true;
engine.stop();
+ } else if ( engine != null && e instanceof UimaASProcessCasTimeout) {
+ if ( e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) {
+ if ( countPingRetries ) {
+ if ( pingTimeoutCount > maxPingRetryCount ) {
+ if (cpcLatch != null) {
+ cpcLatch.countDown();
+ }
+ isStopping = true;
+ engine.stop();
+
+ } else {
+ pingTimeoutCount++;
+ }
+
+ }
+ }
}
if (!expectedException) {
e.printStackTrace();
@@ -841,20 +869,60 @@
}
}
+
+ public class SimpleCallbackListener extends UimaAsTestCallbackListener {
+
+ private String casSent = null;
+
+
+ /**
+ * Callback method which is called by Uima EE client when a reply to process CAS is received.
+ * The reply contains either the CAS or an exception that occurred while processing the CAS.
+ */
+ public synchronized void entityProcessComplete(CAS aCAS, EntityProcessStatus aProcessStatus) {
+ String casReferenceId = null;
+
+ if (isStopping) {
+ System.out
+ .println(">>>>> runTest: Ignoring entityProcessComplete callback as engine is shutting down");
+ return;
+ }
+
+ if (aProcessStatus instanceof UimaASProcessStatus) {
+ if ( aProcessStatus.isException() ) {
+ System.out.println("--------- Got Exception While Processing CAS:"+casReferenceId);
+ } else {
+ casReferenceId = ((UimaASProcessStatus) aProcessStatus).getCasReferenceId();
+ System.out.println("Client Received Reply - CAS:"+casReferenceId);
+ }
+ }
+ processCountLatch.countDown();
+
+ }
+
+ }
+
+
+
+
/**
* A Runnable class used to test concurrency support in Uima EE client. Each instance of this
* class will start and send specified number of CASes to a service using synchronous
* sendAndReceive API. Each thread sends a CAS and waits for a reply.
*
*/
- protected class SynchRunner implements Runnable {
+ public class SynchRunner implements Runnable {
private BaseUIMAAsynchronousEngine_impl uimaClient = null;
private long howManyCASes = 1;
-
+ private UimaAsTestCallbackListener callbackListener;
public SynchRunner(BaseUIMAAsynchronousEngine_impl aUimaClient, int howMany) {
+ this(aUimaClient, howMany, null);
+ }
+ public SynchRunner(BaseUIMAAsynchronousEngine_impl aUimaClient, int howMany, UimaAsTestCallbackListener aListener) {
uimaClient = aUimaClient;
howManyCASes = howMany;
+ callbackListener = aListener;
}
// Run until All CASes are sent
@@ -871,10 +939,13 @@
String casReferenceId = uimaClient.sendAndReceiveCAS(cas, pt);
status = new UimaASProcessStatusImpl(pt, casReferenceId);
} catch (ResourceProcessException rpe) {
+ //rpe.printStackTrace();
status = new UimaASProcessStatusImpl(pt);
status.addEventStatus("Process", "Failed", rpe);
} finally {
- listener.entityProcessComplete(cas, status);
+ if ( callbackListener != null ) {
+ callbackListener.entityProcessComplete(cas, status);
+ }
cas.release();
}
}