You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/24 16:11:18 UTC

svn commit: r797468 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java

Author: cwiklik
Date: Fri Jul 24 14:11:18 2009
New Revision: 797468

URL: http://svn.apache.org/viewvc?rev=797468&view=rev
Log:
UIMA-1436 Replaced List with LinkedBlockingQueue to remove wait-notify synchronization

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=797468&r1=797467&r2=797468&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Fri Jul 24 14:11:18 2009
@@ -20,6 +20,9 @@
 package org.apache.uima.adapter.jms.client;
 
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -56,7 +59,8 @@
 
 	// A reference to a shared queue where application threads enqueue messages
 	// to be sent
-	protected List pendingMessageList = null;
+  protected BlockingQueue<PendingMessage> messageQueue = 
+    new LinkedBlockingQueue<PendingMessage>();
 	// Global flag controlling lifecycle of this thread. It will be set to true
 	// when the
 	// uima ee engine calls doStop()
@@ -86,10 +90,15 @@
 
 	private MessageProducer producer = null;
 	
-	public BaseMessageSender(List aPendingMessageList,
-			BaseUIMAAsynchronousEngineCommon_impl anEngine) {
-		pendingMessageList = aPendingMessageList;
+	
+	public BaseMessageSender(	BaseUIMAAsynchronousEngineCommon_impl anEngine) {
+	  messageQueue = anEngine.pendingMessageQueue;
 		engine = anEngine;
+		try {
+		  //  Acquire a shared lock. Release it in the run() method once we initialize
+		  //  the producer.
+	    engine.producerSemaphore.acquire();
+		} catch( InterruptedException e) {}
 	}
 
   /**
@@ -97,11 +106,9 @@
    */
   public void doStop() {
     done = true;
-    synchronized (pendingMessageList) {
-      // Notify the worker thread. It is waiting for a signal. If this is
-      // not done the thread may hang forever!
-      pendingMessageList.notifyAll();
-    }
+    //  Create an empty message to deliver to the queue that is blocking
+    PendingMessage emptyMessage  = new PendingMessage(0);
+    messageQueue.add(emptyMessage);
   }
 	/**
 	 * Return the Exception that caused the failure in this worker thread
@@ -142,70 +149,42 @@
 	 */
 	public void run() {
 		String destination = null;
+		
+		//  Create and initialize the producer. 
 		try {
 			initializeProducer();
+      destination = getDestinationEndpoint();
+      if (destination == null) {
+        throw new InvalidDestinationException(
+            "Unable to determine the destination");
+      }
 		} catch (Exception e) {
 			workerThreadFailed = true;
 			exception = e;
 			e.printStackTrace();
-			// Signal to unblock any client object waiting for initialization of
-			// the worker thread
-			signal();
 			return;
-		}
 
-		try {
-			destination = getDestinationEndpoint();
-			if (destination == null) {
-				throw new InvalidDestinationException(
-						"Unable to determine the destination");
-			}
-		} catch (Exception e) {
-			workerThreadFailed = true;
-			exception = e;
-			e.printStackTrace();
-			return;
+		} finally {
+      engine.producerSemaphore.release();
 		}
 
-		// Signal the uime ee client engine that the message producer is fully
-		// initialized and
-		// ready to consume messages
 		engine.onProducerInitialized();
-		signal();
 
 		producer = getMessageProducer();
-		int counter=0;
+		
 		// Wait for messages from application threads. The uima ee client engine
 		// will call doStop() which sets the global flag 'done' to true.
 		PendingMessage pm = null;
 		while (!done) {
-			synchronized (pendingMessageList) {
-				// First check if there are any pending messages in the shared
-				// 	'queue'
-				while (pendingMessageList.size() == 0) {
-					// Block waiting for a message
-					try {
-						pendingMessageList.wait(100);
-					} catch (InterruptedException e) {
-					}
-					//	Check if the engine is terminating. When the client is stopping
-					//	it will signal 'pendingMessageList'. Check the state of the client
-					//	and break out from the wait loop if the client is stopping
-					if (done) {
-						break; // done in this loop
-					}
-				}
-				// Check if the uima as client is in stopped state. If it is, don't read
-				//	a message from the queue and just break out from the while loop. When
-				//	the client is stopped, the 'pendingMessageList' is signaled but there
-				//	is no message to read. The signal is done to force this thread to
-				//	break out of wait().
-				if (done) {
-					break; // done here
-				}
-				// Remove the oldest message from the shared 'queue'
-				pm = (PendingMessage) pendingMessageList.remove(0);
-			}
+      // Remove the oldest message from the shared 'queue'
+//		  //  Wait for a new message
+      try {
+        pm = messageQueue.take();
+      } catch ( InterruptedException e) {
+      }
+      if (done) {
+        break; // done in this loop
+      }
 
 			try {
 				//	Request JMS Message from the concrete implementation