You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/28 17:27:35 UTC

svn commit: r798563 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Author: cwiklik
Date: Tue Jul 28 15:27:32 2009
New Revision: 798563

URL: http://svn.apache.org/viewvc?rev=798563&view=rev
Log:
UIMA-1436 Replaced wait-notify synchronization with semaphores

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=798563&r1=798562&r2=798563&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Jul 28 15:27:32 2009
@@ -30,7 +30,6 @@
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
@@ -103,8 +102,6 @@
 
 	protected AsynchAECasManager asynchManager;
 
-	protected Object metadataReplyMonitor = new Object();
-
 	protected boolean remoteService = false;
 
 	protected CollectionReader collectionReader = null;
@@ -138,8 +135,6 @@
 
 	protected Exception exc;
 
-	protected long howManySent = 0;
-
 	// Counter maintaining a number of CASes sent to a service. The counter
 	// is incremented every time a CAS is sent and decremented when the CAS
 	// reply is received. It is also adjusted down in case of a timeout or
@@ -148,10 +143,6 @@
 
 	protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
 
-	protected volatile boolean receivedMetaReply;
-
-	protected volatile boolean receivedCpcReply;
-
 	protected MessageConsumer consumer = null;
 
 	protected String serializationStrategy = "xmi";
@@ -262,9 +253,9 @@
 	}
 		
 	private void addMessage(PendingMessage msg ) {
-    receivedCpcReply = false;
     pendingMessageQueue.add(msg);
 	}
+
 	protected void acquireCpcReadySemaphore() {
 	   try {
 	      //  Acquire cpcReady semaphore to block sending CPC request until
@@ -355,10 +346,7 @@
       Long key = (Long)it.next();
       CasQueueEntry entry = threadRegistrar.get(key);
       if ( entry != null ) {
-        synchronized( entry.getMonitor() ) {
-          entry.signal();
-          entry.getMonitor().notifyAll();
-        }
+        entry.getSemaphore().release();
       }
    }
 	}
@@ -378,6 +366,9 @@
 	    running = false;
 	    casQueueProducerReady = false;
 	    uimaSerializer.reset();
+      if ( serviceDelegate != null ) {
+        serviceDelegate.cancelDelegateTimer();
+      }
 	    try
 	    {
 	      try {
@@ -387,10 +378,6 @@
 	        ex.printStackTrace();
 	      }
 	      
-        synchronized( threadQueue ) {
-          threadQueue.notifyAll();
-        }
-
 	      //  Unblock threads
 	      if( threadMonitorMap.size() > 0 )
 	      {
@@ -404,24 +391,15 @@
 	          {
 	            continue;
 	          }
-	          synchronized( threadMonitor.getMonitor())
-	          {
-	            threadMonitor.setWasSignaled();
-	            threadMonitor.getMonitor().notifyAll();
-	          }
+	          threadMonitor.getMonitor().release();
 	        }
 	      }
 	      cpcReadySemaphore.release();
         outstandingCasRequests.set(0); // reset global counter of outstanding requests
         
         cpcReplySemaphore.release();
-        receivedCpcReply = true;
+        getMetaSemaphore.release();
 
-        synchronized(metadataReplyMonitor)
-	      {
-	        receivedMetaReply = true;
-	        metadataReplyMonitor.notifyAll();
-	      }
 	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
 	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO", new Object[] {});
 	      }
@@ -495,11 +473,12 @@
                   new Object[] {"Time Waiting for CAS", (double)waitingTime / (double)1000000});
             }
             if ( running ) { // only if the client is still running handle the new cas
-              //  Associate the CAS with the entry and wake up the Consumer thread
-              entry.setCas(cas);
-              synchronized( entry.getMonitor() ) {
-                entry.signal();
-                entry.getMonitor().notifyAll();
+              try {
+                entry.getSemaphore().acquire();
+                //  Associate the CAS with the entry and wake up the Consumer thread
+                entry.setCas(cas);
+              } finally {
+                entry.getSemaphore().release();
               }
             } else {
               return; // Client is terminating
@@ -538,16 +517,23 @@
     CasQueueEntry entry = getQueueEntry( Thread.currentThread().getId());
     //  Add this thread entry to the queue of threads waiting for a CAS
     threadQueue.add(entry);
-    //  Wait until the CAS producer adds the CAS to the CasQueueEntry and
-    //  signals CAS availability.
-    while( running && !entry.signaled() ) {
-      //  Wait until the producer is ready
-      synchronized( entry.getMonitor()) {
-        entry.getMonitor().wait(100);
-      }
+    if ( entry != null ) {
+      while (running) {
+        try {
+          // Wait until the CAS producer adds the CAS to the CasQueueEntry and
+          // signals CAS availability.
+          entry.getSemaphore().acquire();
+          if (entry.getCas() == null) {
+            continue;
+          } else {
+            return entry.getCas();
+          }
+        } finally {
+          entry.getSemaphore().release();
+        }   
+      } // while
     }
-    //  This may return null *if* the client is terminating
-    return entry.getCas();
+    return null;   // client has terminated
 	}
 	
 	private CasQueueEntry getQueueEntry(long aThreadId ) {
@@ -564,17 +550,12 @@
 	 return entry;  
 	}
 	
-	protected void reset()
-	{
-		receivedCpcReply = false;
-		receivedMetaReply = false;
+	protected void reset() {
 	}
 
 	private  class CasQueueEntry {
 	  private CAS cas;
-	  private Object monitor = new Object();
 	  private Semaphore semaphore = new Semaphore(1);
-	  private volatile boolean signaled = false;
     public CAS getCas() {
       return cas;
     }
@@ -587,20 +568,7 @@
     public Semaphore getSemaphore() {
       return semaphore;
     }
-        public Object getMonitor() {
-      return monitor;
-    }
-    public void setMonitor(Object monitor) {
-      this.monitor = monitor;
-    }
-    public boolean signaled() {
-      return signaled;
-    }
-    public void signal() {
-      signaled = true;
-    }
     public void reset() {
-      signaled = false;
       cas = null;
     }
 	  
@@ -636,23 +604,20 @@
 	    cpcReplySemaphore.release();
 	  }
 	}
-
+	/**
+	 * Blocks while trying to acquire a semaphore awaiting receipt of GetMeta Reply.
+	 * When the GetMeta is received, or there is a timeout, or the client stops the
+	 * semaphore will be released. 
+	 */
 	protected void waitForMetadataReply()
 	{
-		synchronized (metadataReplyMonitor)
-		{
-			while (!receivedMetaReply)
-			{
-				try
-				{
-					// This monitor is dedicated to single purpose event.
-					metadataReplyMonitor.wait();
-				}
-				catch (Exception e)
-				{
-				}
-			}
-		}
+	  try {
+      getMetaSemaphore.acquire();
+    } catch( InterruptedException e) {
+      
+    } finally {
+      getMetaSemaphore.release();
+    }
 	}
 
 	public String getPerformanceReport()
@@ -789,7 +754,7 @@
 	      }
         // Incremented number of outstanding CASes sent to a service. When a reply comes
         // this counter is decremented
-        outstandingCasRequests.incrementAndGet();
+        long outstandingCasCount = outstandingCasRequests.incrementAndGet();
 	      //  Add message to the pending queue
 	      addMessage(msg);
 	    }
@@ -861,7 +826,6 @@
 			cleanup(); //Make the receiving thread to complete
 			// Release the semaphore acquired in collectionProcessingComplete()
 			cpcReplySemaphore.release();
-      receivedCpcReply = true;
 		}
 	}
 
@@ -882,7 +846,6 @@
     if ( message.getJMSReplyTo() != null ) {
       serviceDelegate.setFreeCasDestination(message.getJMSReplyTo());
     }
-
 		//  Check if this is a reply for Ping sent in response to a timeout
 		if ( serviceDelegate.isAwaitingPingReply() ) {
       serviceDelegate.resetAwaitingPingReply();
@@ -900,10 +863,7 @@
         Iterator it = threadMonitorMap.entrySet().iterator();
         while( it.hasNext() ) {
           threadMonitor = ((Entry<Long, ThreadMonitor>)it.next()).getValue();
-          synchronized(threadMonitor.getMonitor()) {
-            //  Awake the send thread
-            threadMonitor.getMonitor().notifyAll();
-          }
+          threadMonitor.getMonitor().release();
         }
       } else {
         //  Asynch API used for sending outgoing messages.
@@ -925,59 +885,57 @@
 		}
 		int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
 		removeFromCache(uniqueIdentifier);
-		if (AsynchAEMessage.Exception == payload)
-		{
-			ProcessTrace pt = new ProcessTrace_impl();
-			UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
-			Exception exception = retrieveExceptionFromMessage(message);
-			clientSideJmxStats.incrementMetaErrorCount();
-			status.addEventStatus("GetMeta", "Failed", exception);
-			notifyListeners(null, status, AsynchAEMessage.GetMeta);
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
-					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
-      }
-			synchronized( metadataReplyMonitor )
-			{
-				abort = true;
-				receivedMetaReply = true; // not really but simulate receiving the meta so that we unblock the monitor
-				initialized = false;
-				metadataReplyMonitor.notifyAll();
-			}
-		}
-		else
-		{
-		  //  Check serialization supported by the service against client configuration.
-		  //  If the client is configured to use Binary serialization *but* the service
-		  //  doesnt support it, change the client serialization to xmi. Old services will
-		  //  not return in a reply the type of serialization supported which implies "xmi".
-		  //  New services *always* return "binary" as a default serialization. The client
-		  //  however may still want to serialize messages using xmi though. 
-		  if ( !message.propertyExists(AsynchAEMessage.Serialization)) {
-        //  Dealing with an old service here, check if there is a mismatch with the 
-		    //  client configuration. If the client is configured with binary serialization
-		    //  override this and change serialization to "xmi".
-		    if ( getSerializationStrategy().equalsIgnoreCase("binary")) {
-          System.out.println("\n\t***** WARNING: Service Doesn't Support Binary Serialization. Client Defaulting to XMI Serialization\n");
-          //  Override configured serialization
-          setSerializationStrategy("xmi");
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] { });
-        }
-		  }
-			String meta = ((TextMessage) message).getText();
-			ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
-			XMLInputSource in1 = new XMLInputSource(bis, null);
-			// Adam - store ResouceMetaData in field so we can return it from getMetaData().
-			resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser().parseResourceMetaData(in1);
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_meta_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
-      }
-			asynchManager.addMetadata(resourceMetadata);
-			synchronized (metadataReplyMonitor)
-			{
-				receivedMetaReply = true;
-				metadataReplyMonitor.notifyAll();
-			}
+		
+		try {
+	    if (AsynchAEMessage.Exception == payload)
+	    {
+	      ProcessTrace pt = new ProcessTrace_impl();
+	      UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+	      Exception exception = retrieveExceptionFromMessage(message);
+	      clientSideJmxStats.incrementMetaErrorCount();
+	      status.addEventStatus("GetMeta", "Failed", exception);
+	      notifyListeners(null, status, AsynchAEMessage.GetMeta);
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
+	          new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
+	      }
+        abort = true;
+        initialized = false;
+	    }
+	    else
+	    {
+	      //  Check serialization supported by the service against client configuration.
+	      //  If the client is configured to use Binary serialization *but* the service
+	      //  doesnt support it, change the client serialization to xmi. Old services will
+	      //  not return in a reply the type of serialization supported which implies "xmi".
+	      //  New services *always* return "binary" as a default serialization. The client
+	      //  however may still want to serialize messages using xmi though. 
+	      if ( !message.propertyExists(AsynchAEMessage.Serialization)) {
+	        //  Dealing with an old service here, check if there is a mismatch with the 
+	        //  client configuration. If the client is configured with binary serialization
+	        //  override this and change serialization to "xmi".
+	        if ( getSerializationStrategy().equalsIgnoreCase("binary")) {
+	          System.out.println("\n\t***** WARNING: Service Doesn't Support Binary Serialization. Client Defaulting to XMI Serialization\n");
+	          //  Override configured serialization
+	          setSerializationStrategy("xmi");
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] { });
+	        }
+	      }
+	      String meta = ((TextMessage) message).getText();
+	      ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
+	      XMLInputSource in1 = new XMLInputSource(bis, null);
+	      // Adam - store ResouceMetaData in field so we can return it from getMetaData().
+	      resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser().parseResourceMetaData(in1);
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_meta_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
+	      }
+	      asynchManager.addMetadata(resourceMetadata);
+	    }
+		  
+		} catch( Exception e) {
+		  throw e;
+		} finally {
+      getMetaSemaphore.release();
 		}
 	}
 
@@ -1061,12 +1019,9 @@
       //  of the reply. The message has been stored in the cache and 
       //  when the thread wakes up due to notification below, it will
       //  retrieve the reply and process it.
-      if ( threadMonitor != null && threadMonitor.getMonitor() != null) {
-        synchronized( threadMonitor.getMonitor() )
-        {
-          threadMonitor.setWasSignaled();
-          threadMonitor.getMonitor().notifyAll();
-        }
+      if ( threadMonitor != null ) {
+        cachedRequest.setReceivedProcessCasReply();
+        threadMonitor.getMonitor().release();
       }
     }
 	  
@@ -1117,9 +1072,6 @@
 		}
 		int payload = -1;
 		String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
-		
-		
-		
 		//	Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
 		if (message.propertyExists(AsynchAEMessage.Payload))
 		{
@@ -1132,8 +1084,16 @@
     
     if ( casReferenceId != null ) {
       cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
+      //  Incremente number of replies
+      if ( cachedRequest != null && casReferenceId.equals(cachedRequest.getCasReferenceId()) )
+      {
+        // Received a reply, decrement number of outstanding CASes
+        long outstandingCasCount = outstandingCasRequests.decrementAndGet();
+        if ( outstandingCasCount == 0) {
+          cpcReadySemaphore.release();
+        } 
+      }
     }
-
     if (AsynchAEMessage.Exception == payload)
 		{
 			handleException(message, cachedRequest, true);
@@ -1286,9 +1246,7 @@
 	      cachedRequest.setProcessException();
 	    }
 		}
-		receivedCpcReply = true; // change state as if the CPC reply came in. This is done to prevent a hang on CPC request 
 		//  release the semaphore acquired in collectionProcessingComplete
-		cpcReplySemaphore.release();
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
 				new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
@@ -1348,15 +1306,6 @@
       }
       removeFromCache(casReferenceId);
       serviceDelegate.removeCasFromOutstandingList(casReferenceId);
-      //  Check if CASes sent == CASes received
-      long outstandingCasCount = 0;
-      if ( ( outstandingCasCount = outstandingCasRequests.get()) == 0) {
-        //  Received ALL replies, lower CPC latch so that a CPC request 
-        //  is sent
-        cpcReadySemaphore.release();
-      } else {
-        System.out.println("UIMA AS Client Outstanding CAS Count="+outstandingCasCount);
-      }
     }
 	}
 	private void completeProcessingReply( CAS cas, String casReferenceId, int payload, boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt  )
@@ -1368,13 +1317,6 @@
 			{
 				pt = new ProcessTrace_impl();
 			}
-			//	Incremente number of replies
-			if ( casReferenceId.equals(cachedRequest.getCasReferenceId()) )
-			{
-			  // Received a reply, decrement number of outstanding CASes
-			  outstandingCasRequests.decrementAndGet();
-			}
-
 			try
 			{
 				//	Log stats and populate ProcessTrace object
@@ -1407,14 +1349,6 @@
 					}
 				}
 				removeFromCache(casReferenceId);
-				long outstandingCasCount = 0;
-	      if ( ( outstandingCasCount = outstandingCasRequests.get()) == 0) {
-	        System.out.println("UIMA AS Client Releasing cpcReadySemaphore");
-	        cpcReadySemaphore.release();
-//	        cpcReadyLatch.countDown();
-	      } else {
-	        System.out.println("UIMA AS Client Outstanding CAS Count="+outstandingCasCount);
-	      }
 			}
 		}
 	}
@@ -1687,51 +1621,66 @@
 
     ClientRequest cachedRequest = produceNewClientRequestObject();
     cachedRequest.setSynchronousInvocation();
-    
+    //  This is synchronous call, acquire and hold the semaphore before
+    //  dispatching a CAS to a service. The semaphore will be released
+    //  iff: 
+    //  a) reply is received (success or failure with exception)
+    //  b) timeout occurs
+    //  c) client is stopped
+    //  Once the semaphore is acquired and the CAS is dispatched
+    //  the thread will block in trying to acquire the semaphore again
+    //  below.
+    try {
+      threadMonitor.getMonitor().acquire();
+    } catch( InterruptedException e) {
+      System.out.println("UIMA AS Client Received Interrrupt While Acquiring Monitor Semaphore in sendAndReceive()");
+    }
     // send CAS. This call does not block. Instead we will block the sending thread below.
     casReferenceId = sendCAS(aCAS, cachedRequest);
     if (threadMonitor != null && threadMonitor.getMonitor() != null) {
-      // Block here waiting for reply
-      synchronized (threadMonitor.getMonitor()) {
-        //  Block sending thread until a reply is received. The thread
-        //  will be signaled either when a reply to the request just
-        //  sent is received OR a Ping reply was received. The latter
-        //  is necessary to allow handling of CASes delayed due to
-        //  a timeout. A previous request timed out and the service
-        //  state was changed to TIMEDOUT. While the service is in this
-        //  state all sending threads add outstanding CASes to the list
-        //  of CASes pending dispatch and each waits until the state
-        //  of the service changes to OK. The state is changed to OK
-        //  when the client receives a reply to a PING request. When
-        //  the Ping reply comes, the client will signal this thread.
-        //  The thread checks the list of CASes pending dispatch trying
-        //  to find an entry that matches ID of the CAS previously 
-        //  delayed. If the CAS is found in the delayed list, it will 
-        //  be removed from the list and send to the service for 
-        //  processing. The 'wasSignaled' flag is only set when the  
-        //  CAS reply is received. Ping reply logic does not change
-        //  this flag.
-        while (!threadMonitor.wasSignaled && running) {
-          try {
-            threadMonitor.getMonitor().wait();
-            //  Send thread was awoken by either process reply or ping reply 
-            //  If there service is in the ok state and the CAS is in the
-            //  list of CASes pending dispatch, remove the CAS from the list
-            //  and send it to the service.
-            if (cachedRequest.isTimeoutException() || 
-                cachedRequest.isProcessException() ) {
-              // Handled below
-              break;
-            }
-            if ( running && serviceDelegate.getState() == Delegate.OK_STATE && 
-                 serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
-              sendCAS(aCAS, cachedRequest);
-            }
-          } catch (InterruptedException e) {
+      while( running ) {
+        try {
+          //  Block sending thread until a reply is received. The thread
+          //  will be signaled either when a reply to the request just
+          //  sent is received OR a Ping reply was received. The latter
+          //  is necessary to allow handling of CASes delayed due to
+          //  a timeout. A previous request timed out and the service
+          //  state was changed to TIMEDOUT. While the service is in this
+          //  state all sending threads add outstanding CASes to the list
+          //  of CASes pending dispatch and each waits until the state
+          //  of the service changes to OK. The state is changed to OK
+          //  when the client receives a reply to a PING request. When
+          //  the Ping reply comes, the client will signal this thread.
+          //  The thread checks the list of CASes pending dispatch trying
+          //  to find an entry that matches ID of the CAS previously 
+          //  delayed. If the CAS is found in the delayed list, it will 
+          //  be removed from the list and send to the service for 
+          //  processing. The 'wasSignaled' flag is only set when the  
+          //  CAS reply is received. Ping reply logic does not change
+          //  this flag.
+          threadMonitor.getMonitor().acquire();
+          //  Send thread was awoken by either process reply or ping reply 
+          //  If there service is in the ok state and the CAS is in the
+          //  list of CASes pending dispatch, remove the CAS from the list
+          //  and send it to the service.
+          if (cachedRequest.isTimeoutException() || cachedRequest.isProcessException() ) {
+            // Handled outside of the while-loop below
+            break;
           }
+          if ( running && serviceDelegate.getState() == Delegate.OK_STATE && 
+               serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
+            sendCAS(aCAS, cachedRequest);
+          } else {
+            break; // done here, received a reply or the client is not running
+          }
+        } catch( InterruptedException e ) {
+          
+        } finally {
+          threadMonitor.getMonitor().release();
         }
       }
-    }
+    }  // if
+    
     if ( abort ) {
       throw new ResourceProcessException(new RuntimeException("Uima AS Client API Stopping"));
     }
@@ -1754,10 +1703,7 @@
       throw rpe;
     } catch (Exception e) {
       throw new ResourceProcessException(e);
-    } finally {
-      //  reset 'wasSignaled' flag
-      threadMonitor.reset();
-    }
+    } 
     return casReferenceId;
   }
 	private void deserializeAndCompleteProcessingReply( String casReferenceId, Message message, ClientRequest cachedRequest, ProcessTrace pt, boolean doNotify ) throws Exception
@@ -1810,11 +1756,8 @@
       }
 			status.addEventStatus("GetMeta", "Failed", new UimaASMetaRequestTimeout());
 			notifyListeners(null, status, AsynchAEMessage.GetMeta);
-			synchronized (metadataReplyMonitor)
-			{
-				abort = true;
-				metadataReplyMonitor.notifyAll();
-			}
+      abort = true;
+      getMetaSemaphore.release();
 			break;
     case (PingTimeout):
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1830,11 +1773,7 @@
           anyCasRequest.getCAS().release();
         }
       }
-      synchronized (metadataReplyMonitor)
-      {
-        abort = true;
-        metadataReplyMonitor.notifyAll();
-      }
+      abort = true;
       break;
 		case (CpCTimeout):
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1842,7 +1781,6 @@
       }
 			status.addEventStatus("CpC", "Failed", new UimaASCollectionProcessCompleteTimeout());
 			notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
-			receivedCpcReply = true;
 			// release the semaphore acquired in collectionProcessingComplete()
 			cpcReplySemaphore.release();
 			break;
@@ -1875,12 +1813,9 @@
         {
           ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
           //  Unblock the sending thread so that it can complete processing with an error
-          if ( threadMonitor != null && threadMonitor.getMonitor() != null) {
-            synchronized( threadMonitor.getMonitor() )
-            {
-              threadMonitor.setWasSignaled();
-              threadMonitor.getMonitor().notifyAll();
-            }
+          if ( threadMonitor != null ) {
+            threadMonitor.getMonitor().release();
+            cachedRequest.setReceivedProcessCasReply(); // should not be needed
           }
         }
       }
@@ -1894,9 +1829,12 @@
 		  cachedRequest.removeEntry(casReferenceId);
 		  serviceDelegate.removeCasFromOutstandingList(casReferenceId);
 		  //  Check if all replies have been received
-		  if ( outstandingCasRequests.get() == 0) {
+		  long outstandingCasCount = outstandingCasRequests.decrementAndGet();
+		  if ( outstandingCasCount == 0) {
         cpcReadySemaphore.release();
-      } 
+      } else {
+         System.out.println("UIMA AS Client Received Process Timeout - Outstanding CAS Count="+outstandingCasCount);
+      }
 		  break;
 		}  // case
  	}
@@ -1935,6 +1873,7 @@
 
 		private String endpoint;
 
+		private volatile boolean receivedProcessCasReply = false;
 		private long threadId=-1;
 		
 		private Message message;
@@ -2068,6 +2007,10 @@
 		{
 			return threadId;
 		}
+		public void setReceivedProcessCasReply()
+		{
+			receivedProcessCasReply = true;
+		}
 		public void setMetadataTimeout( int aTimeout )
 		{
 			metadataTimeout = aTimeout;
@@ -2185,16 +2128,11 @@
 						abort = true;
 						metaTimeoutErrorCount++;
 						clientSideJmxStats.incrementMetaTimeoutErrorCount();
-						synchronized( metadataReplyMonitor )
-						{
-							receivedMetaReply = true; // not really but simulate receving the meta so that we unblock the monitor
-							metadataReplyMonitor.notifyAll();
-						}
+						getMetaSemaphore.release();
 					}
 					else if (isCPCRequest())
 					{
 						timeOutKind = CpCTimeout;
-						receivedCpcReply = true;// not really but simulate receving the meta so that we unblock the monitor
 						cpcReadySemaphore.release();
 					}
 					else
@@ -2282,32 +2220,19 @@
 	protected class ThreadMonitor
 	{
 		private long threadId;
-		private Object monitor = new Object();
-		private volatile boolean wasSignaled = false;
+		private Semaphore monitor = new Semaphore(1);
 		public ThreadMonitor( long aThreadId )
 		{
 			threadId = aThreadId;
 		}
-		public void reset()
-		{
-			wasSignaled = false;
-		}
 		public long getThreadId()
 		{
 			return threadId;
 		}
-		public Object getMonitor()
+		public Semaphore getMonitor()
 		{
 			return monitor;
 		}
-		public void setWasSignaled()
-		{
-			wasSignaled = true;
-		}
-		public boolean wasSignaled()
-		{
-			return wasSignaled;
-		}
 	}