You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/24 16:06:50 UTC
svn commit: r797466 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
Author: cwiklik
Date: Fri Jul 24 14:06:50 2009
New Revision: 797466
URL: http://svn.apache.org/viewvc?rev=797466&view=rev
Log:
UIMA-1436 Modified to use semaphore instead of wait-notify when initializing JMS producer
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=797466&r1=797465&r2=797466&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Fri Jul 24 14:06:50 2009
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Semaphore;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -88,17 +89,15 @@
private Session session = null;
private Session consumerSession = null;
- //private Connection connection = null;
private volatile boolean serviceInitializationException;
private volatile boolean serviceInitializationCompleted;
- private Object serviceMonitor = new Object();
+ private Semaphore serviceSemaphore = new Semaphore(1);
private Queue consumerDestination = null;
private Session producerSession = null;
private JmxManager jmxManager = null;
private String applicationName = "UimaASClient";
- //private volatile boolean usesSharedConnection = false;
private static SharedConnection sharedConnection = null;
private Object stopMux = new Object();
@@ -352,9 +351,12 @@
// jms queue) and the worker thread consumes them. The worker thread is not
// serialializing CASes. This work is done in application threads.
- // create a worker object. This doesnt start the thread yet
+ // create a Message Dispatcher object. In its constructor it acquires a shared
+ // semaphore producerSemaphore and holds it until the producer is created an
+ // and initialized. Once this happens or there is an error, the semaphore is
+ // released.
sender =
- new ActiveMQMessageSender( aConnection, super.pendingMessageList, aQueueName, this);
+ new ActiveMQMessageSender( aConnection, aQueueName, this);
producerInitialized = false;
Thread t = new Thread( (BaseMessageSender) sender);
// Start the worker thread. The jms session and message producer are created. Once
@@ -366,16 +368,18 @@
// pendingMessageList and it is sent to a destination.
t.start();
- // Wait until the worker thread is fully initialized
- synchronized( sender )
- {
- while( !producerInitialized )
- {
- // blocks here. The worker thread will signal when it is fully initialized
- sender.wait();
- }
- }
- // Check if the worker thread failed to initialize
+
+ try {
+ // Block waiting for the Sender to complete initializing the Producer.
+ // The sender will release the lock once it instantiates and initializes
+ // the Producer object or if there is an error
+ producerSemaphore.acquire();
+ } catch ( InterruptedException ex ) {
+
+ } finally {
+ producerSemaphore.release();
+ }
+ // Check if the worker thread failed to initialize.
if ( sender.failed())
{
// Worker thread failed to initialize. Log the reason and stop the uima ee client
@@ -439,7 +443,6 @@
{
rm = UIMAFramework.newDefaultResourceManager();
}
-
if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE))
{
String cas_initial_heap_size = (String) anApplicationContext.get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
@@ -488,7 +491,7 @@
super.serviceDelegate = new ClientServiceDelegate(endpoint,applicationName,this);
super.serviceDelegate.setCasProcessTimeout(processTimeout);
super.serviceDelegate.setGetMetaTimeout(metadataTimeout);
- try
+ try
{
// Generate unique identifier
String uuid = UUIDGenerator.generate();
@@ -510,10 +513,13 @@
initializeProducer(brokerURI, endpoint);
initializeConsumer(brokerURI);
}
+
// Increment number of client instances. SharedConnection object is a static
// and is used to share a single JMS connection. The connection is closed
// when the last client finishes processing and calls stop().
- sharedConnection.incrementClientCount();
+ if ( sharedConnection != null ) {
+ sharedConnection.incrementClientCount();
+ }
running = true;
sendMetaRequest();
waitForMetadataReply();
@@ -788,26 +794,44 @@
protected void waitForServiceNotification() throws Exception
{
-
- synchronized( serviceMonitor )
- {
- while( !serviceInitializationCompleted )
- {
- if ( serviceInitializationException )
- {
- throw new ResourceInitializationException();
- }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
- }
-
- serviceMonitor.wait();
- if ( serviceInitializationException )
- {
- throw new ResourceInitializationException();
- }
- }
- }
+ while( !serviceInitializationCompleted ) {
+ if ( serviceInitializationException ) {
+ throw new ResourceInitializationException();
+ }
+ if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
+ }
+ try {
+ serviceSemaphore.acquire();
+ } catch( InterruptedException e) {
+ } finally {
+ serviceSemaphore.release();
+ }
+ if ( serviceInitializationException ) {
+ throw new ResourceInitializationException();
+ }
+ }
+//
+//
+// synchronized( serviceMonitor )
+// {
+// while( !serviceInitializationCompleted )
+// {
+// if ( serviceInitializationException )
+// {
+// throw new ResourceInitializationException();
+// }
+// if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
+// UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
+// }
+//
+// serviceMonitor.wait();
+// if ( serviceInitializationException )
+// {
+// throw new ResourceInitializationException();
+// }
+// }
+// }
}
@@ -861,18 +885,12 @@
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
}
- synchronized(serviceMonitor)
- {
- serviceMonitor.notifyAll();
- }
+ serviceSemaphore.release();
}
public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
serviceInitializationCompleted = true;
- synchronized(serviceMonitor)
- {
- serviceMonitor.notifyAll();
- }
+ serviceSemaphore.release();
}
public void notifyOnTermination(String message) {