You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2012/01/20 17:13:41 UTC
svn commit: r1233979 -
/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
Author: cwiklik
Date: Fri Jan 20 16:13:41 2012
New Revision: 1233979
URL: http://svn.apache.org/viewvc?rev=1233979&view=rev
Log:
UIMA-2354 blocks receiving thread when invokeProcess() returns, to prevent it from getting another CAS while the previous CAS is still in-play
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1233979&r1=1233978&r2=1233979&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Fri Jan 20 16:13:41 2012
@@ -20,6 +20,7 @@
package org.apache.uima.aae.handler.input;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.SerializerCache;
@@ -52,6 +53,13 @@ import org.apache.uima.util.Level;
public class ProcessRequestHandler_impl extends HandlerBase {
private static final Class CLASS_NAME = ProcessRequestHandler_impl.class;
+ /*
+ * Declare a semaphore which is used to block UIMA AS aggregate receiving thread until
+ * a CAS is fully processed. This semaphore prevents the receiving thread from grabbing
+ * another CAS from an input queue while a CAS it received previously is still
+ * in-play. Fixes load balancing across multiple UIMA AS aggregate processes.
+ */
+ final ThreadLocal<Semaphore> threadCompletionMonitor = new ThreadLocal<Semaphore>();
private Object mux = new Object();
@@ -225,7 +233,26 @@ public class ProcessRequestHandler_impl
// deserSharedData, casReferenceId);
entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData,
casReferenceId, marker, acceptsDeltaCas);
-
+
+ /*
+ * In UIMA AS Aggregate the receiving thread must be blocked until a CAS is fully
+ * processed. This is to prevent the receiving thread from grabbing another CAS
+ * breaking prefetch throttling. The receiving thread takes a CAS from service queue,
+ * deserializes CAS, asks the FC for the next step and enqueues the CAS
+ * onto delegate's queue. Once the enqueue completes, the thread is done
+ * and ready to get more CASes from the service queue. The receiving must
+ * therefor be blocked right after it enqueues the CAS on delegates queue.
+ * To that end, while handling a new CAS, create a shared semaphore and
+ * associate it with a current thread as ThreadLocal variable. Also, associate the
+ * same semaphore with a CAS so that when the CAS is sent back to the client the
+ * the receiving thread is unblocked.
+ */
+ if ( !getController().isPrimitive() ) {
+ Semaphore semaphore = new Semaphore(0);
+ // threadCompletionMonitor is a ThreadLocal var
+ threadCompletionMonitor.set(semaphore);
+ entry.setThreadCompletionSemaphore(semaphore);
+ }
long timeToDeserializeCAS = getController().getCpuTime() - t1;
getController().incrementDeserializationTime(timeToDeserializeCAS);
LongNumericStatistic statistic;
@@ -482,6 +509,26 @@ public class ProcessRequestHandler_impl
// *****************************************************************
invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext,
newCASProducedBy);
+
+ /**
+ * Below comments apply to UIMA AS aggregate only.
+ * CAS has been handed off to a delegate. Now block the receiving thread until
+ * the CAS is processed or there is a timeout or error. Fetch this thread's ThreadLocal
+ * semaphore to block the thread. It will be unblocked when the aggregate is done with
+ * the CAS.
+ */
+ if (!getController().isPrimitive() ) {
+ Semaphore completionSemaphore = threadCompletionMonitor.get();
+ try {
+ // Block until the CAS is fully processed or there is an error
+ completionSemaphore.acquire();
+ } catch( InterruptedException ex) {
+ } finally {
+ // remove ThreadLocal semaphore
+ threadCompletionMonitor.remove();
+ }
+ }
+
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),