You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/24 16:11:18 UTC
svn commit: r797468 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
Author: cwiklik
Date: Fri Jul 24 14:11:18 2009
New Revision: 797468
URL: http://svn.apache.org/viewvc?rev=797468&view=rev
Log:
UIMA-1436 Replaced List with LinkedBlockingQueue to remove wait-notify synchronization
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=797468&r1=797467&r2=797468&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Fri Jul 24 14:11:18 2009
@@ -20,6 +20,9 @@
package org.apache.uima.adapter.jms.client;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -56,7 +59,8 @@
// A reference to a shared queue where application threads enqueue messages
// to be sent
- protected List pendingMessageList = null;
+ protected BlockingQueue<PendingMessage> messageQueue =
+ new LinkedBlockingQueue<PendingMessage>();
// Global flag controlling lifecycle of this thread. It will be set to true
// when the
// uima ee engine calls doStop()
@@ -86,10 +90,15 @@
private MessageProducer producer = null;
- public BaseMessageSender(List aPendingMessageList,
- BaseUIMAAsynchronousEngineCommon_impl anEngine) {
- pendingMessageList = aPendingMessageList;
+
+ public BaseMessageSender( BaseUIMAAsynchronousEngineCommon_impl anEngine) {
+ messageQueue = anEngine.pendingMessageQueue;
engine = anEngine;
+ try {
+ // Acquire a shared lock. Release it in the run() method once we initialize
+ // the producer.
+ engine.producerSemaphore.acquire();
+ } catch( InterruptedException e) {}
}
/**
@@ -97,11 +106,9 @@
*/
public void doStop() {
done = true;
- synchronized (pendingMessageList) {
- // Notify the worker thread. It is waiting for a signal. If this is
- // not done the thread may hang forever!
- pendingMessageList.notifyAll();
- }
+ // Create an empty message to deliver to the queue that is blocking
+ PendingMessage emptyMessage = new PendingMessage(0);
+ messageQueue.add(emptyMessage);
}
/**
* Return the Exception that caused the failure in this worker thread
@@ -142,70 +149,42 @@
*/
public void run() {
String destination = null;
+
+ // Create and initialize the producer.
try {
initializeProducer();
+ destination = getDestinationEndpoint();
+ if (destination == null) {
+ throw new InvalidDestinationException(
+ "Unable to determine the destination");
+ }
} catch (Exception e) {
workerThreadFailed = true;
exception = e;
e.printStackTrace();
- // Signal to unblock any client object waiting for initialization of
- // the worker thread
- signal();
return;
- }
- try {
- destination = getDestinationEndpoint();
- if (destination == null) {
- throw new InvalidDestinationException(
- "Unable to determine the destination");
- }
- } catch (Exception e) {
- workerThreadFailed = true;
- exception = e;
- e.printStackTrace();
- return;
+ } finally {
+ engine.producerSemaphore.release();
}
- // Signal the uime ee client engine that the message producer is fully
- // initialized and
- // ready to consume messages
engine.onProducerInitialized();
- signal();
producer = getMessageProducer();
- int counter=0;
+
// Wait for messages from application threads. The uima ee client engine
// will call doStop() which sets the global flag 'done' to true.
PendingMessage pm = null;
while (!done) {
- synchronized (pendingMessageList) {
- // First check if there are any pending messages in the shared
- // 'queue'
- while (pendingMessageList.size() == 0) {
- // Block waiting for a message
- try {
- pendingMessageList.wait(100);
- } catch (InterruptedException e) {
- }
- // Check if the engine is terminating. When the client is stopping
- // it will signal 'pendingMessageList'. Check the state of the client
- // and break out from the wait loop if the client is stopping
- if (done) {
- break; // done in this loop
- }
- }
- // Check if the uima as client is in stopped state. If it is, don't read
- // a message from the queue and just break out from the while loop. When
- // the client is stopped, the 'pendingMessageList' is signaled but there
- // is no message to read. The signal is done to force this thread to
- // break out of wait().
- if (done) {
- break; // done here
- }
- // Remove the oldest message from the shared 'queue'
- pm = (PendingMessage) pendingMessageList.remove(0);
- }
+ // Remove the oldest message from the shared 'queue'
+// // Wait for a new message
+ try {
+ pm = messageQueue.take();
+ } catch ( InterruptedException e) {
+ }
+ if (done) {
+ break; // done in this loop
+ }
try {
// Request JMS Message from the concrete implementation