You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/24 16:06:50 UTC

svn commit: r797466 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java

Author: cwiklik
Date: Fri Jul 24 14:06:50 2009
New Revision: 797466

URL: http://svn.apache.org/viewvc?rev=797466&view=rev
Log:
UIMA-1436 Modified to use semaphore instead of wait-notify when initializing JMS producer

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=797466&r1=797465&r2=797466&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Fri Jul 24 14:06:50 2009
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Semaphore;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -88,17 +89,15 @@
 	private Session session = null;
 	private Session consumerSession = null;
 
-	//private Connection connection = null;
 	private volatile boolean serviceInitializationException;
 	private volatile boolean serviceInitializationCompleted;
 	
-	private Object serviceMonitor = new Object();
+	private Semaphore serviceSemaphore = new Semaphore(1);
 	
 	private Queue consumerDestination = null;
 	private Session producerSession = null;
 	private JmxManager jmxManager = null;
 	private String applicationName = "UimaASClient";
-	//private volatile boolean usesSharedConnection = false;
 	private static SharedConnection sharedConnection = null;
 	private Object stopMux = new Object();
 	
@@ -352,9 +351,12 @@
 		//  jms queue) and the worker thread consumes them. The worker thread is not 
 		//	serialializing CASes. This work is done in application threads. 
 
-		//	create a worker object. This doesnt start the thread yet
+		//	create a Message Dispatcher object. In its constructor it acquires a shared
+		//  semaphore producerSemaphore and holds it until the producer is created an
+		//  and initialized. Once this happens or there is an error, the semaphore is
+		//  released.
 		sender = 
-			new ActiveMQMessageSender( aConnection, super.pendingMessageList, aQueueName, this);
+			new ActiveMQMessageSender( aConnection, aQueueName, this);
 		producerInitialized = false;
 		Thread t = new Thread( (BaseMessageSender) sender);
 		//	Start the worker thread. The jms session and message producer are created. Once
@@ -366,16 +368,18 @@
 		//	pendingMessageList and it is sent to a destination.
 		
 		t.start();
-		//	Wait until the worker thread is fully initialized
-			synchronized( sender )
-			{
-				while( !producerInitialized )
-				{
-					//	blocks here. The worker thread will signal when it is fully initialized 
-					sender.wait();
-				}
-			}
-		//	Check if the worker thread failed to initialize
+
+		try {
+	    //  Block waiting for the Sender to complete initializing the Producer.
+		  //  The sender will release the lock once it instantiates and initializes
+		  //  the Producer object or if there is an error
+	    producerSemaphore.acquire();
+		} catch ( InterruptedException ex ) {
+		  
+		} finally {
+		  producerSemaphore.release();
+		}
+		//	Check if the worker thread failed to initialize.
 		if ( sender.failed())
 		{
 			//	Worker thread failed to initialize. Log the reason and stop the uima ee client
@@ -439,7 +443,6 @@
 		{
 			rm = UIMAFramework.newDefaultResourceManager();
 		}
-
 		if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE))
 		{
 			String cas_initial_heap_size = (String) anApplicationContext.get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
@@ -488,7 +491,7 @@
     super.serviceDelegate = new ClientServiceDelegate(endpoint,applicationName,this);
     super.serviceDelegate.setCasProcessTimeout(processTimeout);
     super.serviceDelegate.setGetMetaTimeout(metadataTimeout);
-		try
+    try
 		{
 		  //  Generate unique identifier
 		  String uuid = UUIDGenerator.generate();
@@ -510,10 +513,13 @@
 	      initializeProducer(brokerURI, endpoint);
 	      initializeConsumer(brokerURI);
 	    }
+
 	    // Increment number of client instances. SharedConnection object is a static
 	    // and is used to share a single JMS connection. The connection is closed 
 	    // when the last client finishes processing and calls stop().
-	    sharedConnection.incrementClientCount();
+	    if ( sharedConnection != null ) {
+	      sharedConnection.incrementClientCount();
+	    }
 			running = true;
 			sendMetaRequest();
 			waitForMetadataReply();
@@ -788,26 +794,44 @@
 
 	protected void waitForServiceNotification() throws Exception
 	{
-	  
-	    synchronized( serviceMonitor )
-	    {
-	  	  while( !serviceInitializationCompleted )
-		  {
-		    if ( serviceInitializationException )
-		    {
-		      throw new ResourceInitializationException();
-		    }
-		    if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
-		      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
-		    }
-
-	      serviceMonitor.wait();
-	      if ( serviceInitializationException )
-	      {
-	        throw new ResourceInitializationException();
-	      }
-	    }
-	  }
+    while( !serviceInitializationCompleted ) {
+     if ( serviceInitializationException ) {
+       throw new ResourceInitializationException();
+     }
+     if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
+       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
+     }
+     try {
+       serviceSemaphore.acquire();
+     } catch( InterruptedException e) {
+     } finally {
+       serviceSemaphore.release();
+     }
+     if ( serviceInitializationException ) {
+       throw new ResourceInitializationException();
+     }
+   }
+//	  
+//	  
+//	    synchronized( serviceMonitor )
+//	    {
+//	  	  while( !serviceInitializationCompleted )
+//		   {
+//		    if ( serviceInitializationException )
+//		    {
+//		      throw new ResourceInitializationException();
+//		    }
+//		    if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
+//		      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
+//		    }
+//
+//	      serviceMonitor.wait();
+//	      if ( serviceInitializationException )
+//	      {
+//	        throw new ResourceInitializationException();
+//	      }
+//	    }
+//	  }
 	}
 	
 	
@@ -861,18 +885,12 @@
       if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
       }
-	    synchronized(serviceMonitor)
-	    {
-	      serviceMonitor.notifyAll();
-	    }
+      serviceSemaphore.release();
 	  }
 
 	  public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
 	    serviceInitializationCompleted =  true;
-	    synchronized(serviceMonitor)
-	    {
-	      serviceMonitor.notifyAll();
-	    }
+      serviceSemaphore.release();
 	  }
 
 	  public void notifyOnTermination(String message) {