You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2012/01/20 17:13:41 UTC

svn commit: r1233979 - /uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java

Author: cwiklik
Date: Fri Jan 20 16:13:41 2012
New Revision: 1233979

URL: http://svn.apache.org/viewvc?rev=1233979&view=rev
Log:
UIMA-2354 blocks receiving thread when invokeProcess() returns, to prevent it from getting another CAS while the previous CAS is still in-play

Modified:
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1233979&r1=1233978&r2=1233979&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Fri Jan 20 16:13:41 2012
@@ -20,6 +20,7 @@
 package org.apache.uima.aae.handler.input;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.SerializerCache;
@@ -52,6 +53,13 @@ import org.apache.uima.util.Level;
 
 public class ProcessRequestHandler_impl extends HandlerBase {
   private static final Class CLASS_NAME = ProcessRequestHandler_impl.class;
+  /*
+   * Declare a semaphore which is used to block UIMA AS aggregate receiving thread until 
+   * a CAS is fully processed. This semaphore prevents the receiving thread from grabbing
+   * another CAS from an input queue while a CAS it received previously is still 
+   * in-play. Fixes load balancing across multiple UIMA AS aggregate processes.
+   */
+  final ThreadLocal<Semaphore> threadCompletionMonitor = new ThreadLocal<Semaphore>();
 
   private Object mux = new Object();
 
@@ -225,7 +233,26 @@ public class ProcessRequestHandler_impl 
       // deserSharedData, casReferenceId);
       entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData,
               casReferenceId, marker, acceptsDeltaCas);
-
+      
+      /*
+       * In UIMA AS Aggregate the receiving thread must be blocked until a CAS is fully
+       * processed. This is to prevent the receiving thread from grabbing another CAS
+       * breaking prefetch throttling. The receiving thread takes a CAS from service queue,
+       * deserializes CAS, asks the FC for the next step and enqueues the CAS
+       * onto delegate's queue. Once the enqueue completes, the thread is done
+       * and ready to get more CASes from the service queue. The receiving must 
+       * therefor be blocked right after it enqueues the CAS on delegates queue. 
+       * To that end, while handling a new CAS, create a shared semaphore and
+       * associate it with a current thread as ThreadLocal variable. Also, associate the
+       * same semaphore with a CAS so that when the CAS is sent back to the client the
+       * the receiving thread is unblocked.
+      */
+      if ( !getController().isPrimitive() ) {
+        Semaphore semaphore = new Semaphore(0);
+        //  threadCompletionMonitor is a ThreadLocal var
+        threadCompletionMonitor.set(semaphore);
+        entry.setThreadCompletionSemaphore(semaphore);
+      }
       long timeToDeserializeCAS = getController().getCpuTime() - t1;
       getController().incrementDeserializationTime(timeToDeserializeCAS);
       LongNumericStatistic statistic;
@@ -482,6 +509,26 @@ public class ProcessRequestHandler_impl 
         // *****************************************************************
         invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext,
                 newCASProducedBy);
+        
+        /**
+         * Below comments apply to UIMA AS aggregate only.
+         * CAS has been handed off to a delegate. Now block the receiving thread until
+         * the CAS is processed or there is a timeout or error. Fetch this thread's ThreadLocal
+         * semaphore to block the thread. It will be unblocked when the aggregate is done with
+         * the CAS.
+         */
+        if (!getController().isPrimitive() ) {
+          Semaphore completionSemaphore = threadCompletionMonitor.get();
+          try {
+            //  Block until the CAS is fully processed or there is an error
+            completionSemaphore.acquire();
+          } catch( InterruptedException ex) {
+          } finally {
+            //  remove ThreadLocal semaphore
+            threadCompletionMonitor.remove();
+          }
+        }
+        
       } else {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),