You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2010/05/14 15:56:30 UTC

svn commit: r944262 - in /uima/uima-as/branches/mavenAlign: uimaj-as-activemq/ uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/ uimaj-as-jms/ uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/

Author: schor
Date: Fri May 14 13:56:29 2010
New Revision: 944262

URL: http://svn.apache.org/viewvc?rev=944262&view=rev
Log:
[UIMA-1756] merge trunk -> branch

Modified:
    uima/uima-as/branches/mavenAlign/uimaj-as-activemq/   (props changed)
    uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    uima/uima-as/branches/mavenAlign/uimaj-as-jms/   (props changed)
    uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Propchange: uima/uima-as/branches/mavenAlign/uimaj-as-activemq/
------------------------------------------------------------------------------
    svn:mergeinfo = /uima/uima-as/trunk/uimaj-as-activemq:943952-943956

Modified: uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=944262&r1=944261&r2=944262&view=diff
==============================================================================
--- uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original)
+++ uima/uima-as/branches/mavenAlign/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Fri May 14 13:56:29 2010
@@ -86,6 +86,7 @@ public class TestUimaASExtended extends 
 
   public BaseTestSupport superRef = null;
 
+  
   /**
    * Tests Broker startup and shutdown
    */
@@ -747,7 +748,35 @@ public class TestUimaASExtended extends 
         wait(3000);   // allow broker some time to stop  
       }
   }
-  
+  /**
+   * Tests sending CPC after CAS timeout. The service is a Primitive taking 
+   * 6 seconds to process a CAS. The client waits for 5 secs to force
+   * a timeout. This test forces the client to send GetMeta Ping and to
+   * 'hold' the subsequent CPC request.
+   *   
+   * @throws Exception
+   */
+  public void testCpcAfterCasTimeout() throws Exception  {
+    System.out.println("-------------- testCpcAfterCasTimeout -------------");
+     // Instantiate Uima AS Client
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+      deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorAWithLongDelay.xml");
+      Map<String, Object> appCtx = buildContext("tcp://localhost:8118",
+      "NoOpAnnotatorAQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 5000);
+      appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+      
+      for( int i=0; i < 3; i++ ) {
+        CAS cas = uimaAsEngine.getCAS();
+        cas.setDocumentText("Some Text");
+        uimaAsEngine.sendCAS(cas);  // will timeout after 5 secs
+        uimaAsEngine.collectionProcessingComplete();  // the CPC should not
+        // be sent to a service until the timeout occurs.
+      }
+      uimaAsEngine.stop();
+  }
   
   public void testClientProcess() throws Exception {
     System.out.println("-------------- testClientProcess -------------");
@@ -1128,7 +1157,7 @@ public class TestUimaASExtended extends 
 
     Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()),"TopLevelTaeQueue");
     runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue",
-            2, PROCESS_LATCH);
+            1, PROCESS_LATCH);
   }
   
   public void testScaledSyncAggregateProcess() throws Exception {

Propchange: uima/uima-as/branches/mavenAlign/uimaj-as-jms/
------------------------------------------------------------------------------
    svn:mergeinfo = /uima/uima-as/trunk/uimaj-as-jms:943952-943956

Modified: uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=944262&r1=944261&r2=944262&view=diff
==============================================================================
--- uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/branches/mavenAlign/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri May 14 13:56:29 2010
@@ -323,7 +323,7 @@ public abstract class BaseUIMAAsynchrono
       // The semaphore is initially acquired in the initialize(Map) method and typically
       // released when the number of CASes sent equals the number of CASes received. Since
       // no CASes were sent we must do the release here to be able to continue.
-      if (totalCasRequestsSentBetweenCpCs.get() == 0) {
+      if (totalCasRequestsSentBetweenCpCs.get() == 0 && !serviceDelegate.isAwaitingPingReply()) {
         cpcReadySemaphore.release();
       }
       // The cpcReadySemaphore was initially acquired in the initialize() method
@@ -752,6 +752,7 @@ public abstract class BaseUIMAAsynchrono
           return null;
         }
 
+        clientCache.put(casReferenceId, requestToCache);
         PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
         long t1 = System.nanoTime();
         if (serializationStrategy.equals("xmi")) {
@@ -780,12 +781,17 @@ public abstract class BaseUIMAAsynchrono
         requestToCache.setProcessTimeout(processTimeout);
         requestToCache.clearTimeoutException();
 
-        clientCache.put(casReferenceId, requestToCache);
         // The sendCAS() method is synchronized no need to synchronize the code below
         if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
           //  Send Ping to service as getMeta request
           if ( !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) {
             serviceDelegate.setAwaitingPingReply();
+            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+            serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
+            if ( cpcReadySemaphore.availablePermits() > 0 ) {
+              acquireCpcReadySemaphore();
+            }
+
             // Send PING Request to check delegate's availability
             sendMetaRequest();
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -793,8 +799,6 @@ public abstract class BaseUIMAAsynchrono
                       JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
                       new Object[] { serviceDelegate.getKey() });
             }
-            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
-            serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
             return casReferenceId;
           } else {
             if ( !requestToCache.isSynchronousInvocation() ) {
@@ -824,8 +828,10 @@ public abstract class BaseUIMAAsynchrono
         // Add message to the pending queue
         addMessage(msg);
       } catch (ResourceProcessException e) {
+        clientCache.remove(casReferenceId);
         throw e;
       } catch (Exception e) {
+        clientCache.remove(casReferenceId);
         throw new ResourceProcessException(e);
       }
       return casReferenceId;