You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/12/22 22:19:25 UTC

svn commit: r1052071 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Author: cwiklik
Date: Wed Dec 22 21:19:24 2010
New Revision: 1052071

URL: http://svn.apache.org/viewvc?rev=1052071&view=rev
Log:
UIMA-1979 Synchronized access to JMS Session instance. Renamed the lock

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1052071&r1=1052070&r2=1052071&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Dec 22 21:19:24 2010
@@ -28,7 +28,6 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -38,7 +37,6 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.ConnectionFailedException;
 import org.apache.activemq.advisory.ConsumerEvent;
@@ -61,7 +59,6 @@ import org.apache.uima.aae.message.Async
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry;
 import org.apache.uima.util.Level;
-import org.springframework.util.Assert;
 
 
 public class JmsEndpointConnection_impl implements ConsumerListener {
@@ -97,7 +94,7 @@ public class JmsEndpointConnection_impl 
 
   private volatile boolean failed = false;
 
-  private Object recoveryMux = new Object();
+  private Object lock = new Object();
 
   private final String componentName;
 
@@ -140,10 +137,12 @@ public class JmsEndpointConnection_impl 
   }
 
   public boolean isOpen() {
-    if (failed || producerSession == null || connectionClosedOrFailed(brokerDestinations)) {
-      return false;
-    }
-    return ((ActiveMQSession) producerSession).isRunning();
+	synchronized (lock) {
+	    if (failed || producerSession == null || connectionClosedOrFailed(brokerDestinations)) {
+	        return false;
+	      }
+	      return ((ActiveMQSession) producerSession).isRunning();
+	}
   }
 
   protected static boolean connectionClosedOrFailed(BrokerConnectionEntry aBrokerDestinationMap) {
@@ -164,7 +163,7 @@ public class JmsEndpointConnection_impl 
   private void openChannel(String brokerUri, String aComponentName,
           String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
           ServiceShutdownException {
-	  synchronized (recoveryMux) {
+	  synchronized (lock) {
 		    try {
 
 		        // If replying to http request, reply to a queue managed by this service broker using tcp
@@ -338,7 +337,7 @@ public class JmsEndpointConnection_impl 
   }
 
 	public void close() throws Exception {
-		synchronized (recoveryMux) {
+		synchronized (lock) {
 			if (producer != null) {
 				try {
 					producer.close();
@@ -380,81 +379,87 @@ public class JmsEndpointConnection_impl 
   }
 
   public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException {
-    if ( producerSession == null ) {
-      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
-    }
-    try {
-       if (aTextMessage == null) {
-          return producerSession.createTextMessage();
-       } else {
-          return producerSession.createTextMessage(aTextMessage);
-       }
-     } catch (javax.jms.IllegalStateException e) {
-        try {
-          open();
-        } catch (ServiceShutdownException ex) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "produceTextMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_service_exception_WARNING", controller.getComponentName());
-            
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "produceTextMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_exception__WARNING", ex);
-          }
-        } catch (AsynchAEException ex) {
-          throw ex;
-        }
-      } catch (Exception e) {
-        throw new AsynchAEException(e);
-    }
-    throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
+	synchronized( lock ) {
+		  if ( producerSession == null ) {
+		      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+		    }
+		    try {
+		       if (aTextMessage == null) {
+		          return producerSession.createTextMessage();
+		       } else {
+		          return producerSession.createTextMessage(aTextMessage);
+		       }
+		     } catch (javax.jms.IllegalStateException e) {
+		        try {
+		          open();
+		        } catch (ServiceShutdownException ex) {
+		          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+		            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+		                    "produceTextMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+		                    "UIMAEE_service_exception_WARNING", controller.getComponentName());
+		            
+		            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+		                    "produceTextMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                    "UIMAJMS_exception__WARNING", ex);
+		          }
+		        } catch (AsynchAEException ex) {
+		          throw ex;
+		        }
+		      } catch (Exception e) {
+		        throw new AsynchAEException(e);
+		    }
+		    throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
+	}
   }
 
   public BytesMessage produceByteMessage() throws AsynchAEException {
-    if ( producerSession == null ) {
-      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
-    }
-    boolean done = false;
-    int retryCount = 4;
-    while (retryCount > 0) {
-      try {
-        retryCount--;
-        return producerSession.createBytesMessage();
-      } catch (javax.jms.IllegalStateException e) {
-        try {
-          open();
-        } catch (ServiceShutdownException ex) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "produceByteMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_service_exception_WARNING", controller.getComponentName());
-            
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "produceByteMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_exception__WARNING", ex);
+    synchronized( lock ) {
+        if ( producerSession == null ) {
+            throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
           }
-        }
+          boolean done = false;
+          int retryCount = 4;
+          while (retryCount > 0) {
+            try {
+              retryCount--;
+              return producerSession.createBytesMessage();
+            } catch (javax.jms.IllegalStateException e) {
+              try {
+                open();
+              } catch (ServiceShutdownException ex) {
+                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                          "produceByteMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                          "UIMAEE_service_exception_WARNING", controller.getComponentName());
+                  
+                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                          "produceByteMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                          "UIMAJMS_exception__WARNING", ex);
+                }
+              }
 
-      } catch (Exception e) {
-        throw new AsynchAEException(e);
-      }
+            } catch (Exception e) {
+              throw new AsynchAEException(e);
+            }
+          }
+          throw new AsynchAEException(
+                  new InvalidMessageException("Unable to produce BytesMessage Object"));
     }
-    throw new AsynchAEException(
-            new InvalidMessageException("Unable to produce BytesMessage Object"));
   }
 
   public ObjectMessage produceObjectMessage() throws AsynchAEException {
-    if ( producerSession == null ) {
-      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
-    }
-    try {
-      if (!((ActiveMQSession) producerSession).isRunning()) {
-        open();
-      }
-      return producerSession.createObjectMessage();
-    } catch (Exception e) {
-      throw new AsynchAEException(e);
+    synchronized( lock ) {
+  	  if ( producerSession == null ) {
+  	      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+  	    }
+  	    try {
+  	      if (!((ActiveMQSession) producerSession).isRunning()) {
+  	        open();
+  	      }
+  	      return producerSession.createObjectMessage();
+  	    } catch (Exception e) {
+  	      throw new AsynchAEException(e);
+  	    }
     }
   }
 
@@ -471,7 +476,7 @@ public class JmsEndpointConnection_impl 
     // endpoint for the delegate is marked as FAILED. This will be the case if the listener
     // on the reply queue for the endpoint has failed.
     String endpointName = delegateEndpoint.getEndpoint();
-    synchronized (recoveryMux) {
+    synchronized (lock) {
       if (controller instanceof AggregateAnalysisEngineController) {
         // Using the queue name lookup the delegate key
         String key = ((AggregateAnalysisEngineController) controller)