You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2010/05/14 15:56:30 UTC
svn commit: r944262 - in /uima/uima-as/branches/mavenAlign:
uimaj-as-activemq/ uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/
uimaj-as-jms/ uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/
Author: schor
Date: Fri May 14 13:56:29 2010
New Revision: 944262
URL: http://svn.apache.org/viewvc?rev=944262&view=rev
Log:
[UIMA-1756] merge trunk -> branch
Modified:
uima/uima-as/branches/mavenAlign/uimaj-as-activemq/ (props changed)
uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
uima/uima-as/branches/mavenAlign/uimaj-as-jms/ (props changed)
uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Propchange: uima/uima-as/branches/mavenAlign/uimaj-as-activemq/
------------------------------------------------------------------------------
svn:mergeinfo = /uima/uima-as/trunk/uimaj-as-activemq:943952-943956
Modified: uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=944262&r1=944261&r2=944262&view=diff
==============================================================================
--- uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Fri May 14 13:56:29 2010
@@ -86,6 +86,7 @@ public class TestUimaASExtended extends
public BaseTestSupport superRef = null;
+
/**
* Tests Broker startup and shutdown
*/
@@ -747,7 +748,35 @@ public class TestUimaASExtended extends
wait(3000); // allow broker some time to stop
}
}
-
+ /**
+ * Tests sending CPC after CAS timeout. The service is a Primitive taking
+ * 6 seconds to process a CAS. The client waits for 5 secs to force
+ * a timeout. This test forces the client to send GetMeta Ping and to
+ * 'hold' the subsequent CPC request.
+ *
+ * @throws Exception
+ */
+ public void testCpcAfterCasTimeout() throws Exception {
+ System.out.println("-------------- testCpcAfterCasTimeout -------------");
+ // Instantiate Uima AS Client
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorAWithLongDelay.xml");
+ Map<String, Object> appCtx = buildContext("tcp://localhost:8118",
+ "NoOpAnnotatorAQueue");
+ appCtx.put(UimaAsynchronousEngine.Timeout, 5000);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ initialize(uimaAsEngine, appCtx);
+ waitUntilInitialized();
+
+ for( int i=0; i < 3; i++ ) {
+ CAS cas = uimaAsEngine.getCAS();
+ cas.setDocumentText("Some Text");
+ uimaAsEngine.sendCAS(cas); // will timeout after 5 secs
+ uimaAsEngine.collectionProcessingComplete(); // the CPC should not
+ // be sent to a service until the timeout occurs.
+ }
+ uimaAsEngine.stop();
+ }
public void testClientProcess() throws Exception {
System.out.println("-------------- testClientProcess -------------");
@@ -1128,7 +1157,7 @@ public class TestUimaASExtended extends
Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue");
runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue",
- 2, PROCESS_LATCH);
+ 1, PROCESS_LATCH);
}
public void testScaledSyncAggregateProcess() throws Exception {
Propchange: uima/uima-as/branches/mavenAlign/uimaj-as-jms/
------------------------------------------------------------------------------
svn:mergeinfo = /uima/uima-as/trunk/uimaj-as-jms:943952-943956
Modified: uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=944262&r1=944261&r2=944262&view=diff
==============================================================================
--- uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri May 14 13:56:29 2010
@@ -323,7 +323,7 @@ public abstract class BaseUIMAAsynchrono
// The semaphore is initially acquired in the initialize(Map) method and typically
// released when the number of CASes sent equals the number of CASes received. Since
// no CASes were sent we must do the release here to be able to continue.
- if (totalCasRequestsSentBetweenCpCs.get() == 0) {
+ if (totalCasRequestsSentBetweenCpCs.get() == 0 && !serviceDelegate.isAwaitingPingReply()) {
cpcReadySemaphore.release();
}
// The cpcReadySemaphore was initially acquired in the initialize() method
@@ -752,6 +752,7 @@ public abstract class BaseUIMAAsynchrono
return null;
}
+ clientCache.put(casReferenceId, requestToCache);
PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
long t1 = System.nanoTime();
if (serializationStrategy.equals("xmi")) {
@@ -780,12 +781,17 @@ public abstract class BaseUIMAAsynchrono
requestToCache.setProcessTimeout(processTimeout);
requestToCache.clearTimeoutException();
- clientCache.put(casReferenceId, requestToCache);
// The sendCAS() method is synchronized no need to synchronize the code below
if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
// Send Ping to service as getMeta request
if ( !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) {
serviceDelegate.setAwaitingPingReply();
+ // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+ serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
+ if ( cpcReadySemaphore.availablePermits() > 0 ) {
+ acquireCpcReadySemaphore();
+ }
+
// Send PING Request to check delegate's availability
sendMetaRequest();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -793,8 +799,6 @@ public abstract class BaseUIMAAsynchrono
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
new Object[] { serviceDelegate.getKey() });
}
- // Add the cas to a list of CASes pending reply. Also start the timer if necessary
- serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
return casReferenceId;
} else {
if ( !requestToCache.isSynchronousInvocation() ) {
@@ -824,8 +828,10 @@ public abstract class BaseUIMAAsynchrono
// Add message to the pending queue
addMessage(msg);
} catch (ResourceProcessException e) {
+ clientCache.remove(casReferenceId);
throw e;
} catch (Exception e) {
+ clientCache.remove(casReferenceId);
throw new ResourceProcessException(e);
}
return casReferenceId;