You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2020/06/10 13:12:43 UTC

[uima-async-scaleout] 15/17: [UIMA-1756] merge trunk -> branch

This is an automated email from the ASF dual-hosted git repository.

cwiklik pushed a commit to branch mavenAlign
in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git

commit 3fb865ba6244e3de88cc752282790d60d13a8dcc
Author: schor <schor>
AuthorDate: Fri May 14 13:56:29 2010 +0000

    [UIMA-1756] merge trunk -> branch
---
 .../apache/uima/ee/test/TestUimaASExtended.java    | 33 ++++++++++++++++++++--
 .../BaseUIMAAsynchronousEngineCommon_impl.java     | 14 ++++++---
 2 files changed, 41 insertions(+), 6 deletions(-)

diff --git a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
index f03c157..6e07ccc 100644
--- a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
+++ b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
@@ -86,6 +86,7 @@ public class TestUimaASExtended extends BaseTestSupport {
 
   public BaseTestSupport superRef = null;
 
+  
   /**
    * Tests Broker startup and shutdown
    */
@@ -747,7 +748,35 @@ public class TestUimaASExtended extends BaseTestSupport {
         wait(3000);   // allow broker some time to stop  
       }
   }
-  
+  /**
+   * Tests sending CPC after CAS timeout. The service is a Primitive taking 
+   * 6 seconds to process a CAS. The client waits for 5 secs to force
+   * a timeout. This test forces the client to send GetMeta Ping and to
+   * 'hold' the subsequent CPC request.
+   *   
+   * @throws Exception
+   */
+  public void testCpcAfterCasTimeout() throws Exception  {
+    System.out.println("-------------- testCpcAfterCasTimeout -------------");
+     // Instantiate Uima AS Client
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorAWithLongDelay.xml");
+      Map<String, Object> appCtx = buildContext("tcp://localhost:8118",
+      "NoOpAnnotatorAQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 5000);
+      appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+      
+      for( int i=0; i < 3; i++ ) {
+        CAS cas = uimaAsEngine.getCAS();
+        cas.setDocumentText("Some Text");
+        uimaAsEngine.sendCAS(cas);  // will timeout after 5 secs
+        uimaAsEngine.collectionProcessingComplete();  // the CPC should not
+        // be sent to a service until the timeout occurs.
+      }
+      uimaAsEngine.stop();
+  }
   
   public void testClientProcess() throws Exception {
     System.out.println("-------------- testClientProcess -------------");
@@ -1128,7 +1157,7 @@ public class TestUimaASExtended extends BaseTestSupport {
 
     Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue");
     runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue",
-            2, PROCESS_LATCH);
+            1, PROCESS_LATCH);
   }
   
   public void testScaledSyncAggregateProcess() throws Exception {
diff --git a/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java b/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
index c98028d..fd9be1a 100644
--- a/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
+++ b/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
@@ -323,7 +323,7 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
       // The semaphore is initially acquired in the initialize(Map) method and typically
       // released when the number of CASes sent equals the number of CASes received. Since
       // no CASes were sent we must do the release here to be able to continue.
-      if (totalCasRequestsSentBetweenCpCs.get() == 0) {
+      if (totalCasRequestsSentBetweenCpCs.get() == 0 && !serviceDelegate.isAwaitingPingReply()) {
         cpcReadySemaphore.release();
       }
       // The cpcReadySemaphore was initially acquired in the initialize() method
@@ -752,6 +752,7 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
           return null;
         }
 
+        clientCache.put(casReferenceId, requestToCache);
         PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
         long t1 = System.nanoTime();
         if (serializationStrategy.equals("xmi")) {
@@ -780,12 +781,17 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
         requestToCache.setProcessTimeout(processTimeout);
         requestToCache.clearTimeoutException();
 
-        clientCache.put(casReferenceId, requestToCache);
         // The sendCAS() method is synchronized no need to synchronize the code below
         if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
           //  Send Ping to service as getMeta request
           if ( !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) {
             serviceDelegate.setAwaitingPingReply();
+            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+            serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
+            if ( cpcReadySemaphore.availablePermits() > 0 ) {
+              acquireCpcReadySemaphore();
+            }
+
             // Send PING Request to check delegate's availability
             sendMetaRequest();
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -793,8 +799,6 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
                       JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
                       new Object[] { serviceDelegate.getKey() });
             }
-            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
-            serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
             return casReferenceId;
           } else {
             if ( !requestToCache.isSynchronousInvocation() ) {
@@ -824,8 +828,10 @@ public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsync
         // Add message to the pending queue
         addMessage(msg);
       } catch (ResourceProcessException e) {
+        clientCache.remove(casReferenceId);
         throw e;
       } catch (Exception e) {
+        clientCache.remove(casReferenceId);
         throw new ResourceProcessException(e);
       }
       return casReferenceId;