You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/27 18:59:49 UTC

svn commit: r798217 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Author: cwiklik
Date: Mon Jul 27 16:59:49 2009
New Revision: 798217

URL: http://svn.apache.org/viewvc?rev=798217&view=rev
Log:
UIMA-1436 Replaced wait-notify with a semaphores to signal receipt of GetMeta and CPC replies. Simplified code with respect to signaling when the client is ready to send CPC. Replaced two counters with one to determine number of outstanding CASes. The code used casesSent and casesReceived counters. Replaced both with AtomicLong counter that maintains number of outstanding CASes 

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=798217&r1=798216&r2=798217&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Jul 27 16:59:49 2009
@@ -30,8 +30,10 @@
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -101,22 +103,14 @@
 
 	protected AsynchAECasManager asynchManager;
 
-	protected Object endOfCollectionMonitor = new Object();
-
 	protected Object metadataReplyMonitor = new Object();
 
 	protected boolean remoteService = false;
 
-	protected Object gater = new Object();
-
-	protected int howManyBeforeReplySeen = 0;
-
 	protected CollectionReader collectionReader = null;
 
 	protected volatile boolean running = false;
 
-	protected final Object sendAndReceiveCasMonitor = new Object();
-
 	protected ProcessingResourceMetaData resourceMetadata;
 
 	protected CAS sendAndReceiveCAS = null;
@@ -146,9 +140,11 @@
 
 	protected long howManySent = 0;
 
-	protected long howManyRecvd = 0;
-
-	protected Object cpcGate = new Object();
+	// Counter maintaining a number of CASes sent to a service. The counter
+	// is incremented every time a CAS is sent and decremented when the CAS
+	// reply is received. It is also adjusted down in case of a timeout or
+	// error.
+	protected AtomicLong outstandingCasRequests = new AtomicLong();
 
 	protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
 
@@ -186,7 +182,18 @@
 	
 	// Create Semaphore that will signal when the producer object is initialized
 	protected Semaphore producerSemaphore = new Semaphore(1);
+  // Create Semaphore that will signal when CPC reply has been received 
+  protected Semaphore cpcSemaphore = new Semaphore(1);
+  // Create Semaphore that will signal when GetMeta reply has been received 
+  protected Semaphore getMetaSemaphore = new Semaphore(1);
+  //  Signals when the client is ready to send CPC request
+  protected Semaphore cpcReadySemaphore = new Semaphore(1);
+  // Signals receipt of a CPC reply
+  protected Semaphore cpcReplySemaphore =
+    new Semaphore(1);
+    
 	protected volatile boolean producerInitialized;
+	
 	abstract public String getEndPointName() throws Exception;
   abstract protected TextMessage createTextMessage() throws Exception;
   abstract protected BytesMessage createBytesMessage() throws Exception;
@@ -258,6 +265,15 @@
     receivedCpcReply = false;
     pendingMessageQueue.add(msg);
 	}
+	protected void acquireCpcReadySemaphore() {
+	   try {
+	      //  Acquire cpcReady semaphore to block sending CPC request until
+	      //  ALL outstanding CASes are received.
+	      cpcReadySemaphore.acquire();
+	    } catch( InterruptedException e) { 
+	      System.out.println("UIMA AS Client Interrupted While Attempting To Acquire cpcReadySemaphore in initialize()");
+	    }
+	}
 	public synchronized void collectionProcessingComplete() throws ResourceProcessException
 	{
 		try
@@ -265,15 +281,11 @@
 		  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
 	      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_app_cpc_request_FINEST", new Object[] {});
 		  }
+      //  The cpcReadySemaphore was initially acquired in the initialize() method
+      //  so below we wait until ALL CASes are processed. Once all
+		  //  CASes are received the semaphore will be released
+		  acquireCpcReadySemaphore();
 
-				synchronized (cpcGate)
-				{
-					while (howManySent > 0 && howManyRecvd < howManySent)
-					{
-						// This monitor is dedicated to single purpose event.
-						cpcGate.wait();
-					}
-				}
 			if (!running)
 			{
 		     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -299,13 +311,17 @@
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_started_cpc_request_timer_FINEST", new Object[] {});
       }
-      //  Add message to the pending queue
+      //  Add CPC message to the pending queue
       addMessage(msg);
-      
-
-			// Wait for CPC Reply. This blocks!
+      //  Acquire cpc semaphore. When a CPC reply comes or there is a timeout or the client
+      //  is stopped, the semaphore will be released.
+      try {
+        cpcReplySemaphore.acquire();
+      } catch( InterruptedException ex) {
+        System.out.println("UIMA AS Cllient Interrupted While Acquiring cpcReplySemaphore");
+      } 
+			// Wait for CPC Reply. This blocks on the cpcReplySemaphore
 			waitForCpcReply();
-
 			cancelTimer(uniqueIdentifier);
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cancelled_cpc_request_timer_FINEST", new Object[] {});
@@ -395,17 +411,13 @@
 	          }
 	        }
 	      }
-        synchronized (cpcGate)
-        {
-           howManySent = 0;
-           cpcGate.notifyAll();
-        }
-	      synchronized(endOfCollectionMonitor)
-	      {
-	        receivedCpcReply = true;
-	        endOfCollectionMonitor.notifyAll();
-	      }
-	      synchronized(metadataReplyMonitor)
+	      cpcReadySemaphore.release();
+        outstandingCasRequests.set(0); // reset global counter of outstanding requests
+        
+        cpcReplySemaphore.release();
+        receivedCpcReply = true;
+
+        synchronized(metadataReplyMonitor)
 	      {
 	        receivedMetaReply = true;
 	        metadataReplyMonitor.notifyAll();
@@ -428,7 +440,7 @@
 	      // may be sitting in the threadQueue.take() method. The reader will
 	      // first check the state of the 'running' flag and find it false which
 	      // will cause the reader to exit run() method
-	      threadQueue.add(new CasQueueEntry());
+	      threadQueue.add(new CasQueueEntry(new Semaphore(1)));
 	      threadRegistrar.clear();
 	    }
 	    catch (Exception e)
@@ -546,7 +558,7 @@
 	     entry.reset();
 	   }
 	 } else {
-	   entry = new CasQueueEntry();
+	   entry = new CasQueueEntry(new Semaphore(1));
 	   threadRegistrar.put(aThreadId, entry);
 	 }
 	 return entry;  
@@ -561,6 +573,7 @@
 	private  class CasQueueEntry {
 	  private CAS cas;
 	  private Object monitor = new Object();
+	  private Semaphore semaphore = new Semaphore(1);
 	  private volatile boolean signaled = false;
     public CAS getCas() {
       return cas;
@@ -568,7 +581,13 @@
     public void setCas(CAS cas) {
       this.cas = cas;
     }
-    public Object getMonitor() {
+    public CasQueueEntry( Semaphore aSharedSemaphore ) {
+      semaphore = aSharedSemaphore;
+    }
+    public Semaphore getSemaphore() {
+      return semaphore;
+    }
+        public Object getMonitor() {
       return monitor;
     }
     public void setMonitor(Object monitor) {
@@ -608,21 +627,14 @@
 	}
 	protected void waitForCpcReply()
 	{
-		synchronized (endOfCollectionMonitor)
-		{
-			while (!receivedCpcReply)
-			{
-				try
-				{
-					// This monitor is dedicated to single purpose event.
-					endOfCollectionMonitor.wait();
-				}
-				catch (Exception e)
-				{
-				}
-			}
-		}
-
+	  try {
+	    // wait for CPC reply
+	    cpcReplySemaphore.acquire();
+	  } catch( InterruptedException e) {
+	    
+	  } finally {
+	    cpcReplySemaphore.release();
+	  }
 	}
 
 	protected void waitForMetadataReply()
@@ -775,13 +787,11 @@
 	        //  delegate.
 	        return casReferenceId;
 	      }
+        // Incremented number of outstanding CASes sent to a service. When a reply comes
+        // this counter is decremented
+        outstandingCasRequests.incrementAndGet();
 	      //  Add message to the pending queue
 	      addMessage(msg);
-	      
-	      synchronized (cpcGate)
-	      {
-	        howManySent++;
-	      }
 	    }
 	    catch (Exception e)
 	    {
@@ -849,13 +859,9 @@
 		  //  After receiving CPC reply there may be cleanup to do. Delegate this
 		  //  to platform specific implementation (ActiveMQ or WAS)
 			cleanup(); //Make the receiving thread to complete
-			synchronized (endOfCollectionMonitor)
-			{
-			  // Notify sleeping thread that the CPC reply was received
-				receivedCpcReply = true;
-				endOfCollectionMonitor.notifyAll();
-			}
-			
+			// Release the semaphore acquired in collectionProcessingComplete()
+			cpcReplySemaphore.release();
+      receivedCpcReply = true;
 		}
 	}
 
@@ -917,7 +923,6 @@
       //  Handled Ping reply
       return;
 		}
-    //cancelTimer(uniqueIdentifier);
 		int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
 		removeFromCache(uniqueIdentifier);
 		if (AsynchAEMessage.Exception == payload)
@@ -1282,10 +1287,8 @@
 	    }
 		}
 		receivedCpcReply = true; // change state as if the CPC reply came in. This is done to prevent a hang on CPC request 
-		synchronized(endOfCollectionMonitor)
-		{
-			endOfCollectionMonitor.notifyAll();
-		}
+		//  release the semaphore acquired in collectionProcessingComplete
+		cpcReplySemaphore.release();
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
 				new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
@@ -1345,12 +1348,14 @@
       }
       removeFromCache(casReferenceId);
       serviceDelegate.removeCasFromOutstandingList(casReferenceId);
-      if (howManyRecvd == howManySent)
-      {
-        synchronized (cpcGate)
-        {
-          cpcGate.notifyAll();
-        }
+      //  Check if CASes sent == CASes received
+      long outstandingCasCount = 0;
+      if ( ( outstandingCasCount = outstandingCasRequests.get()) == 0) {
+        //  Received ALL replies, lower CPC latch so that a CPC request 
+        //  is sent
+        cpcReadySemaphore.release();
+      } else {
+        System.out.println("UIMA AS Client Outstanding CAS Count="+outstandingCasCount);
       }
     }
 	}
@@ -1366,12 +1371,8 @@
 			//	Incremente number of replies
 			if ( casReferenceId.equals(cachedRequest.getCasReferenceId()) )
 			{
-				synchronized(cpcGate)
-				{
-					//	increment number of replies received
-					howManyRecvd++;
-					cpcGate.notifyAll();
-				}
+			  // Received a reply, decrement number of outstanding CASes
+			  outstandingCasRequests.decrementAndGet();
 			}
 
 			try
@@ -1406,13 +1407,14 @@
 					}
 				}
 				removeFromCache(casReferenceId);
-				if (howManyRecvd == howManySent)
-				{
-					synchronized (cpcGate)
-					{
-						cpcGate.notifyAll();
-					}
-				}
+				long outstandingCasCount = 0;
+	      if ( ( outstandingCasCount = outstandingCasRequests.get()) == 0) {
+	        System.out.println("UIMA AS Client Releasing cpcReadySemaphore");
+	        cpcReadySemaphore.release();
+//	        cpcReadyLatch.countDown();
+	      } else {
+	        System.out.println("UIMA AS Client Outstanding CAS Count="+outstandingCasCount);
+	      }
 			}
 		}
 	}
@@ -1841,10 +1843,8 @@
 			status.addEventStatus("CpC", "Failed", new UimaASCollectionProcessCompleteTimeout());
 			notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
 			receivedCpcReply = true;
-			synchronized(endOfCollectionMonitor)
-			{
-				endOfCollectionMonitor.notifyAll();
-			}
+			// release the semaphore acquired in collectionProcessingComplete()
+			cpcReplySemaphore.release();
 			break;
 
 		case (ProcessTimeout):
@@ -1893,22 +1893,10 @@
 		  }
 		  cachedRequest.removeEntry(casReferenceId);
 		  serviceDelegate.removeCasFromOutstandingList(casReferenceId);
-		  synchronized (gater) 
-		  {
-			  if (howManyBeforeReplySeen > 0) 
-			  {
-				  howManyBeforeReplySeen--;
-			  }
-			  gater.notifyAll();
-			  howManyRecvd++; // increment global counter to enable CPC request to be sent when howManySent = howManyRecvd
-	      if (howManyRecvd == howManySent)
-	      {
-	        synchronized (cpcGate)
-	        {
-	          cpcGate.notifyAll();
-	        }
-	      }
-		  }
+		  //  Check if all replies have been received
+		  if ( outstandingCasRequests.get() == 0) {
+        cpcReadySemaphore.release();
+      } 
 		  break;
 		}  // case
  	}
@@ -2207,11 +2195,7 @@
 					{
 						timeOutKind = CpCTimeout;
 						receivedCpcReply = true;// not really but simulate receving the meta so that we unblock the monitor
-						
-						synchronized( cpcGate )
-						{
-							cpcGate.notifyAll();
-						}
+						cpcReadySemaphore.release();
 					}
 					else
 					{