You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:16:24 UTC

svn commit: r810547 [1/5] - /incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/

Author: cwiklik
Date: Wed Sep  2 15:16:23 2009
New Revision: 810547

URL: http://svn.apache.org/viewvc?rev=810547&view=rev
Log:
UIMA-1541 Reformatted to conform to UIMA formatting guidelines. No other changes included.

Modified:
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannelMBean.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ModifiableListener.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaEEAdminSpringContext.java

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java Wed Sep  2 15:16:23 2009
@@ -29,8 +29,7 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.jmx.ManagementContext;
-//import org.apache.activemq.memory.UsageListener;
+import org.apache.activemq.broker.jmx.ManagementContext; //import org.apache.activemq.memory.UsageListener;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.util.Level;
@@ -38,353 +37,322 @@
 import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextClosedEvent;
 
-public class BrokerDeployer implements ApplicationListener
-{
-	private static final Class CLASS_NAME = BrokerDeployer.class;
-    private static final int BASE_JMX_PORT = 1200;
-    private static final int MAX_PORT_THRESHOLD = 200;
-    
-	private static BrokerService service = new BrokerService();
-	private Object semaphore = new Object();
-	private long maxBrokerMemory=0;
-	private String brokerURI;
-	private TransportConnector  tcpConnector = null;
-	private TransportConnector  httpConnector = null;
-
-	public BrokerDeployer(long maxMemoryinBytes) throws Exception
-	{
-		maxBrokerMemory = maxMemoryinBytes;
-		startInternalBroker();
-	}
-	
-	
-	public BrokerDeployer() throws Exception
-	{
-		startInternalBroker();
-	}
-	public BrokerService getBroker()
-	{
-		return service;
-	}
-	public void startInternalBroker() throws Exception
-	{
-		TransportConnector  connector = null;
-		
-		if (maxBrokerMemory > 0 )
-		{
-			System.out.println("Configuring Internal Broker With Max Memory Of:"+maxBrokerMemory);
+public class BrokerDeployer implements ApplicationListener {
+  private static final Class CLASS_NAME = BrokerDeployer.class;
+
+  private static final int BASE_JMX_PORT = 1200;
+
+  private static final int MAX_PORT_THRESHOLD = 200;
+
+  private static BrokerService service = new BrokerService();
+
+  private Object semaphore = new Object();
+
+  private long maxBrokerMemory = 0;
+
+  private String brokerURI;
+
+  private TransportConnector tcpConnector = null;
+
+  private TransportConnector httpConnector = null;
+
+  public BrokerDeployer(long maxMemoryinBytes) throws Exception {
+    maxBrokerMemory = maxMemoryinBytes;
+    startInternalBroker();
+  }
+
+  public BrokerDeployer() throws Exception {
+    startInternalBroker();
+  }
+
+  public BrokerService getBroker() {
+    return service;
+  }
+
+  public void startInternalBroker() throws Exception {
+    TransportConnector connector = null;
+
+    if (maxBrokerMemory > 0) {
+      System.out.println("Configuring Internal Broker With Max Memory Of:" + maxBrokerMemory);
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-                    "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_broker_memory__CONFIG",
-                    new Object[] {maxBrokerMemory});
+                "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_broker_memory__CONFIG", new Object[] { maxBrokerMemory });
       }
-		}
-		String[] connectors = service.getNetworkConnectorURIs();
-		if ( connectors != null  )
-		{
-			for( int i=0; i < connectors.length; i++)
-			{
-				System.out.println("ActiveMQ Broker Started With Connector:"+connectors[i]);
-			}
-			brokerURI = service.getMasterConnectorURI();
-		}
-		else
-		{
-			
-			String connectorList = "";
-			service.setPersistent(false);
-			int startPort = BASE_JMX_PORT;
-			if ( System.getProperties().containsKey("com.sun.management.jmxremote.port") )
-			{
-				startPort = Integer.parseInt(System.getProperty("com.sun.management.jmxremote.port"));
-				
-			}
-			while( startPort < MAX_PORT_THRESHOLD && !openPort(startPort))
-			{
-				startPort++;
-			}
-			if ( startPort < (startPort+MAX_PORT_THRESHOLD ) )
-			{
-				MBeanServer jmxServer = null;
-				//	Check if the MBeanServer is available. If it is, plug it into the
-				//	local Broker. We only need one MBeanServer in the JVM
-				if ( (jmxServer = ManagementContext.findTigerMBeanServer()) != null)
-				{
-					System.out.println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger.");
-					service.getManagementContext().setMBeanServer(jmxServer);
-					//	Specify JMX Port
-					service.getManagementContext().setConnectorPort(startPort);
-				}
-				else
-				{
-					service.getManagementContext().setConnectorPort(startPort);
-					service.setUseJmx(true);
-				}
-				
-				System.setProperty("com.sun.management.jmxremote.port", String.valueOf(startPort)); 
+    }
+    String[] connectors = service.getNetworkConnectorURIs();
+    if (connectors != null) {
+      for (int i = 0; i < connectors.length; i++) {
+        System.out.println("ActiveMQ Broker Started With Connector:" + connectors[i]);
+      }
+      brokerURI = service.getMasterConnectorURI();
+    } else {
+
+      String connectorList = "";
+      service.setPersistent(false);
+      int startPort = BASE_JMX_PORT;
+      if (System.getProperties().containsKey("com.sun.management.jmxremote.port")) {
+        startPort = Integer.parseInt(System.getProperty("com.sun.management.jmxremote.port"));
+
+      }
+      while (startPort < MAX_PORT_THRESHOLD && !openPort(startPort)) {
+        startPort++;
+      }
+      if (startPort < (startPort + MAX_PORT_THRESHOLD)) {
+        MBeanServer jmxServer = null;
+        // Check if the MBeanServer is available. If it is, plug it into the
+        // local Broker. We only need one MBeanServer in the JVM
+        if ((jmxServer = ManagementContext.findTigerMBeanServer()) != null) {
+          System.out.println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger.");
+          service.getManagementContext().setMBeanServer(jmxServer);
+          // Specify JMX Port
+          service.getManagementContext().setConnectorPort(startPort);
+        } else {
+          service.getManagementContext().setConnectorPort(startPort);
+          service.setUseJmx(true);
+        }
+
+        System.setProperty("com.sun.management.jmxremote.port", String.valueOf(startPort));
 
-				System.out.println("JMX Console connect URI:  service:jmx:rmi:///jndi/rmi://localhost:"+startPort+"/jmxrmi");
+        System.out.println("JMX Console connect URI:  service:jmx:rmi:///jndi/rmi://localhost:"
+                + startPort + "/jmxrmi");
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-	                    "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jmx_uri__CONFIG",
-	                    new Object[] {"service:jmx:rmi:///jndi/rmi://localhost:"+startPort+"/jmxrmi" });
+          UIMAFramework.getLogger(CLASS_NAME)
+                  .logrb(
+                          Level.CONFIG,
+                          CLASS_NAME.getName(),
+                          "startInternalBroker",
+                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                          "UIMAJMS_jmx_uri__CONFIG",
+                          new Object[] { "service:jmx:rmi:///jndi/rmi://localhost:" + startPort
+                                  + "/jmxrmi" });
         }
-			}
+      }
+
+      brokerURI = generateInternalURI("tcp", 18810, true, false);
 
-			brokerURI = generateInternalURI("tcp", 18810, true, false);
-			
-			//	Wait until sucessfull adding connector to the broker
-			//	Sleeps for 1sec between retries until success
-			int timeBetweenRetries = 1000;
-			boolean tcpConnectorAcquiredValidPort = false;
-			while( !tcpConnectorAcquiredValidPort )
-			{
-				try
-				{
-					tcpConnector = service.addConnector(brokerURI);
-					tcpConnectorAcquiredValidPort = true;
-				}
-				catch( Exception e) 
-				{
-					synchronized(this)
-					{
-						wait(timeBetweenRetries);
-					}
-				} // silence InstanceAlreadyExistsException
-				
-			}
-			System.out.println("Adding TCP Connector:"+tcpConnector.getConnectUri());
+      // Wait until sucessfull adding connector to the broker
+      // Sleeps for 1sec between retries until success
+      int timeBetweenRetries = 1000;
+      boolean tcpConnectorAcquiredValidPort = false;
+      while (!tcpConnectorAcquiredValidPort) {
+        try {
+          tcpConnector = service.addConnector(brokerURI);
+          tcpConnectorAcquiredValidPort = true;
+        } catch (Exception e) {
+          synchronized (this) {
+            wait(timeBetweenRetries);
+          }
+        } // silence InstanceAlreadyExistsException
+
+      }
+      System.out.println("Adding TCP Connector:" + tcpConnector.getConnectUri());
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-                    "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG",
-                    new Object[] {"Adding TCP Connector",tcpConnector.getConnectUri() });
+                "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_adding_connector__CONFIG",
+                new Object[] { "Adding TCP Connector", tcpConnector.getConnectUri() });
       }
-			connectorList = tcpConnector.getName();
-			if ( System.getProperty("StompSupport") != null )
-			{
-				String stompURI = generateInternalURI("stomp", 61613, false, false);
-				connector = service.addConnector(stompURI);
-				System.out.println("Adding STOMP Connector:"+connector.getConnectUri());
+      connectorList = tcpConnector.getName();
+      if (System.getProperty("StompSupport") != null) {
+        String stompURI = generateInternalURI("stomp", 61613, false, false);
+        connector = service.addConnector(stompURI);
+        System.out.println("Adding STOMP Connector:" + connector.getConnectUri());
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-	                    "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG",
-	                    new Object[] {"Adding STOMP Connector",connector.getConnectUri() });
+                  "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_adding_connector__CONFIG",
+                  new Object[] { "Adding STOMP Connector", connector.getConnectUri() });
         }
-				connectorList += ","+connector.getName();
-			}
-			if ( System.getProperty("HTTP") != null )
-			{
-				String stringPort = System.getProperty("HTTP");
-				int p = Integer.parseInt(stringPort);
-				String httpURI = generateInternalURI("http", p, false, true);
-				httpConnector = service.addConnector(httpURI);
-				System.out.println("Adding HTTP Connector:"+httpConnector.getConnectUri());
+        connectorList += "," + connector.getName();
+      }
+      if (System.getProperty("HTTP") != null) {
+        String stringPort = System.getProperty("HTTP");
+        int p = Integer.parseInt(stringPort);
+        String httpURI = generateInternalURI("http", p, false, true);
+        httpConnector = service.addConnector(httpURI);
+        System.out.println("Adding HTTP Connector:" + httpConnector.getConnectUri());
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-	                    "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG",
-	                    new Object[] {"Adding HTTP Connector",httpConnector.getConnectUri() });
+                  "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_adding_connector__CONFIG",
+                  new Object[] { "Adding HTTP Connector", httpConnector.getConnectUri() });
         }
-				connectorList += ","+httpConnector.getName();
-			}
-			service.start();
-			System.setProperty("ActiveMQConnectors", connectorList);
-			System.out.println("Broker Service Started - URL:"+service.getVmConnectorURI());
+        connectorList += "," + httpConnector.getName();
+      }
+      service.start();
+      System.setProperty("ActiveMQConnectors", connectorList);
+      System.out.println("Broker Service Started - URL:" + service.getVmConnectorURI());
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-                    "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_broker_started__CONFIG",
-                    new Object[] {service.getVmConnectorURI() });
+                "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_broker_started__CONFIG", new Object[] { service.getVmConnectorURI() });
       }
-		}
-			
-		// Allow the connectors some time to start
-		synchronized( semaphore )
-		{
-			semaphore.wait(1000);
-		}
-		//System.out.println("JMX Server Port:"+service.getManagementContext().getRmiServerPort());
-				//setConnectorPort(startPort);
-
-	
-	}
-	private boolean openPort( int aPort )
-	{
-		ServerSocket ssocket= null;
-		try
-		{
-			ssocket = new ServerSocket(aPort);
-			return true;
-		}
-		catch( Exception e)
-		{
-			return false;
-		}
-		finally
-		{
-			try
-			{
-				if ( ssocket != null )
-				{
-					ssocket.close();
-				}
-			}
-			catch (IOException ioe)
-			{
-			}
-		}
-		
-	}
-	
-	/**
-	 * Generates a unique port for the Network Connector that will be plugged into the internal Broker.
-	 * This connector externalizes the internal broker so that remote delegates can reply back to the
-	 * Aggregate. This method tests port 18810 for availability and it fails increments the port by one
-	 * until a port is valid. 
-	 *  
-	 * @return - Broker URI with a unique port
-	 */
-	private String generateInternalURI(String aProtocol, int aDefaultPort, boolean cacheURL, boolean oneTry) 
-	throws Exception
-	{
-		boolean success = false;
-		int openPort=aDefaultPort;
-		ServerSocket ssocket= null;
-		
-		while( !success )
-		{
-			try
-			{
-				ssocket = new ServerSocket(openPort);
+    }
+
+    // Allow the connectors some time to start
+    synchronized (semaphore) {
+      semaphore.wait(1000);
+    }
+    // System.out.println("JMX Server Port:"+service.getManagementContext().getRmiServerPort());
+    // setConnectorPort(startPort);
+
+  }
+
+  private boolean openPort(int aPort) {
+    ServerSocket ssocket = null;
+    try {
+      ssocket = new ServerSocket(aPort);
+      return true;
+    } catch (Exception e) {
+      return false;
+    } finally {
+      try {
+        if (ssocket != null) {
+          ssocket.close();
+        }
+      } catch (IOException ioe) {
+      }
+    }
+
+  }
+
+  /**
+   * Generates a unique port for the Network Connector that will be plugged into the internal
+   * Broker. This connector externalizes the internal broker so that remote delegates can reply back
+   * to the Aggregate. This method tests port 18810 for availability and it fails increments the
+   * port by one until a port is valid.
+   * 
+   * @return - Broker URI with a unique port
+   */
+  private String generateInternalURI(String aProtocol, int aDefaultPort, boolean cacheURL,
+          boolean oneTry) throws Exception {
+    boolean success = false;
+    int openPort = aDefaultPort;
+    ServerSocket ssocket = null;
+
+    while (!success) {
+      try {
+        ssocket = new ServerSocket(openPort);
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-	                    "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_port_available__CONFIG",
-	                    new Object[] {openPort });
+                  "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_port_available__CONFIG", new Object[] { openPort });
+        }
+        String uri = aProtocol + "://"
+                + ssocket.getInetAddress().getLocalHost().getCanonicalHostName() + ":" + openPort;
+        success = true;
+        if (cacheURL) {
+          System.setProperty("BrokerURI", uri);
+
         }
-				String uri = aProtocol+"://"+ssocket.getInetAddress().getLocalHost().getCanonicalHostName()+":"+openPort;
-				success = true;
-				if ( cacheURL )
-				{
-					System.setProperty("BrokerURI", uri);
-		
-				}
-				return uri;
-			}
-			catch( BindException e)
-			{
+        return uri;
+      } catch (BindException e) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-	                    "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_port_not_available__CONFIG",
-	                    new Object[] {openPort });
+                  "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_port_not_available__CONFIG", new Object[] { openPort });
+        }
+        if (oneTry) {
+          System.out.println("Given port:" + openPort + " is not available for " + aProtocol);
+          throw e;
         }
-				if ( oneTry )
-				{
-					System.out.println("Given port:"+openPort+" is not available for "+aProtocol);
-					throw e;
-				}
-				openPort++;
-			}
-			catch( Exception e)
-			{
-				e.printStackTrace();
+        openPort++;
+      } catch (Exception e) {
+        e.printStackTrace();
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-	                    "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
-	                    new Object[] {JmsConstants.threadName(), e });
+                  "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
+        }
+        if (oneTry) {
+          throw e;
         }
-				if ( oneTry )
-				{
-					throw e;
-				}
-			}
-			finally
-			{
-				try
-				{
-					if ( ssocket != null )
-					{
-						ssocket.close();
-					}
-				}
-				catch (IOException ioe)
-				{
-				}
-			}
-		}
-		return null;
-
-	}
-	/**
-	 * Stops the ActiveMQ broker. This method waits for 1 second to allow the broker to
-	 * cleanup objects from JMX Server.
-	 *
-	 */
-	public void stop()
-	{
-		Object monitor = new Object();
-		if ( service != null )
-		{
-			try
-			{
-//				if ( usageListener != null )
-//				{
-//					service.getMemoryManager().removeUsageListener(usageListener);		
-//				}
-				if ( tcpConnector != null )
-				{
-					tcpConnector.stop();
-					System.out.println("Broker Connector:"+tcpConnector.getUri().toString()+ " is stopped");
-				}
-				if ( httpConnector != null )
-				{
-					System.out.println("Broker Stopping HTTP Connector:"+httpConnector.getUri().toString());
-					httpConnector.stop();
-					System.out.println("Broker Connector:"+httpConnector.getUri().toString()+ " is stopped");
-				}
-				service.getManagementContext().stop();
-				service.stop();
-				Broker broker =  service.getBroker();
-				while( !broker.isStopped() )
-				{
-					synchronized( monitor )
-					{
-						try
-						{
-							monitor.wait(20); // wait for the broker to terminate
-						}
-						catch( Exception e) { }
-					}
-					
-				}
-				System.out.println("Broker is stopped");
-				broker = null;
-				service = null;
-			}
-			catch( Exception e) { e.printStackTrace();}
-		}
-	}
-	/**
-	 * Callback method invoked by Spring container during its lifecycle changes 
-	 * Ignore all events except for ContextClosedEvent which indicates the container
-	 * has shutdown. In this case, stop the internal ActiveMQ broker. 
-	 * 
-	 * @param anEvent - an event object
-	 */
-	public void onApplicationEvent(ApplicationEvent anEvent)
-	{
-		if ( anEvent instanceof ContextClosedEvent)
-		{
+      } finally {
+        try {
+          if (ssocket != null) {
+            ssocket.close();
+          }
+        } catch (IOException ioe) {
+        }
+      }
+    }
+    return null;
+
+  }
+
+  /**
+   * Stops the ActiveMQ broker. This method waits for 1 second to allow the broker to cleanup
+   * objects from JMX Server.
+   * 
+   */
+  public void stop() {
+    Object monitor = new Object();
+    if (service != null) {
+      try {
+        // if ( usageListener != null )
+        // {
+        // service.getMemoryManager().removeUsageListener(usageListener);
+        // }
+        if (tcpConnector != null) {
+          tcpConnector.stop();
+          System.out
+                  .println("Broker Connector:" + tcpConnector.getUri().toString() + " is stopped");
+        }
+        if (httpConnector != null) {
+          System.out.println("Broker Stopping HTTP Connector:" + httpConnector.getUri().toString());
+          httpConnector.stop();
+          System.out.println("Broker Connector:" + httpConnector.getUri().toString()
+                  + " is stopped");
+        }
+        service.getManagementContext().stop();
+        service.stop();
+        Broker broker = service.getBroker();
+        while (!broker.isStopped()) {
+          synchronized (monitor) {
+            try {
+              monitor.wait(20); // wait for the broker to terminate
+            } catch (Exception e) {
+            }
+          }
+
+        }
+        System.out.println("Broker is stopped");
+        broker = null;
+        service = null;
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Callback method invoked by Spring container during its lifecycle changes Ignore all events
+   * except for ContextClosedEvent which indicates the container has shutdown. In this case, stop
+   * the internal ActiveMQ broker.
+   * 
+   * @param anEvent
+   *          - an event object
+   */
+  public void onApplicationEvent(ApplicationEvent anEvent) {
+    if (anEvent instanceof ContextClosedEvent) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-	                "onApplicationEvent", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_terminated__INFO",
-	                new Object[] {(( ContextClosedEvent)anEvent).getApplicationContext().getDisplayName()});
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.INFO,
+                CLASS_NAME.getName(),
+                "onApplicationEvent",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_container_terminated__INFO",
+                new Object[] { ((ContextClosedEvent) anEvent).getApplicationContext()
+                        .getDisplayName() });
       }
-			stop();
+      stop();
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-	                "onApplicationEvent", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_broker_stopped__INFO",
-	                new Object[] {brokerURI});
+                "onApplicationEvent", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_broker_stopped__INFO", new Object[] { brokerURI });
       }
-		}
-	}
-
+    }
+  }
 
 }

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java Wed Sep  2 15:16:23 2009
@@ -41,151 +41,164 @@
 import org.springframework.jms.listener.SessionAwareMessageListener;
 
 /**
- * Message listener injected at runtime into Aggregate to handle a race condition
- * when multiple threads simultaneously process messages from a Cas Multiplier. 
- * It is only used to process messages from a Cas Multiplier and only if the reply
- * queue has more than one consumer thread configured in a deployment descriptor.
- * The listener creates a pool of threads equal to the number of concurrent consumers
- * defined in the DD for the listener on the reply queue. Once the message is handled
- * in onMessage(), it is than delegated for processing to one of the available threads
- * from the pool. 
- *  
- * This listener guarantees processing order. It receives messages from Spring
- * in a single thread and if it finds a child CAS in the message, it increments the
- * parent (input) CAS child count and delegates processing to the InputChannel instance.
+ * Message listener injected at runtime into Aggregate to handle a race condition when multiple
+ * threads simultaneously process messages from a Cas Multiplier. It is only used to process
+ * messages from a Cas Multiplier and only if the reply queue has more than one consumer thread
+ * configured in a deployment descriptor. The listener creates a pool of threads equal to the number
+ * of concurrent consumers defined in the DD for the listener on the reply queue. Once the message
+ * is handled in onMessage(), it is than delegated for processing to one of the available threads
+ * from the pool.
+ * 
+ * This listener guarantees processing order. It receives messages from Spring in a single thread
+ * and if it finds a child CAS in the message, it increments the parent (input) CAS child count and
+ * delegates processing to the InputChannel instance.
+ * 
+ * The race condition: The Cas Multiplier sends the last child and the parent almost at the same
+ * time. Both are received by the aggregate and are processed in different threads, if a scaleout is
+ * used on the reply queue. One thread may start processing the input CAS while the other thread
+ * (with the last child) is not yet allowed to run. The first thread takes the input CAS all the way
+ * to the final step and since at this time, the input CAS has no children ( the thread processing
+ * the last child has not updated the child count yet), it will be prematurely released. When the
+ * thread with the last child is allowed to run, it finds that the parent no longer exists in the
+ * cache.
  * 
- * The race condition:
- * The Cas Multiplier sends the last child and the parent almost at the same time.
- * Both are received by the aggregate and are processed in different threads, if a
- * scaleout is used on the reply queue. One thread may start processing the input CAS
- * while the other thread (with the last child) is not yet allowed to run. The first
- * thread takes the input CAS all the way to the final step and since at this time,
- * the input CAS has no children ( the thread processing the last child has not updated
- * the child count yet), it will be prematurely released. When the thread with the last
- * child is allowed to run, it finds that the parent no longer exists in the cache. 
  * 
- *    
  */
 public class ConcurrentMessageListener implements SessionAwareMessageListener {
   private static final Class CLASS_NAME = ConcurrentMessageListener.class;
 
   private SessionAwareMessageListener delegateListener;
-  private int concurrentThreadCount=0;
+
+  private int concurrentThreadCount = 0;
+
   private AnalysisEngineController controller;
+
   private ThreadPoolExecutor executor = null;
+
   private LinkedBlockingQueue<Runnable> workQueue;
+
   private CountDownLatch controllerLatch = new CountDownLatch(1);
-  
+
   /**
    * Creates a listener with a given number of process threads. This listener is injected between
-   * Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only
-   * used on reply queues that have scale out attribute in DD greater than 1. Its main job is
-   * to increment number of child CASes for a given input CAS. It does so in a single thread, and
-   * once it completes the update this listener submits the CAS for further processing up to the
+   * Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only used on
+   * reply queues that have scale out attribute in DD greater than 1. Its main job is to increment
+   * number of child CASes for a given input CAS. It does so in a single thread, and once it
+   * completes the update this listener submits the CAS for further processing up to the
    * JmsInputChannel. The CAS is submitted to a queue where the executor assigns a free thread to
-   * process the CAS.  
+   * process the CAS.
    * 
-   * @param concurrentThreads - number of threads to use to process CASes
-   * @param delegateListener - JmsInputChannel instance to delegate CAS to
+   * @param concurrentThreads
+   *          - number of threads to use to process CASes
+   * @param delegateListener
+   *          - JmsInputChannel instance to delegate CAS to
    * @throws InvalidClassException
    */
-  public ConcurrentMessageListener( int concurrentThreads, Object delegateListener) throws InvalidClassException {
-    if ( !(delegateListener instanceof SessionAwareMessageListener) ) {
-      throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"+SessionAwareMessageListener.class+" Received:"+delegateListener.getClass());
+  public ConcurrentMessageListener(int concurrentThreads, Object delegateListener)
+          throws InvalidClassException {
+    if (!(delegateListener instanceof SessionAwareMessageListener)) {
+      throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"
+              + SessionAwareMessageListener.class + " Received:" + delegateListener.getClass());
     }
     concurrentThreadCount = concurrentThreads;
-    this.delegateListener = (SessionAwareMessageListener)delegateListener;
-    if ( concurrentThreads > 1 ) {
-      workQueue = new LinkedBlockingQueue<Runnable>(concurrentThreadCount); 
+    this.delegateListener = (SessionAwareMessageListener) delegateListener;
+    if (concurrentThreads > 1) {
+      workQueue = new LinkedBlockingQueue<Runnable>(concurrentThreadCount);
       executor = new ThreadPoolExecutor(concurrentThreads, concurrentThreads, Long.MAX_VALUE,
               TimeUnit.NANOSECONDS, workQueue);
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       executor.prestartAllCoreThreads();
     }
   }
-  
+
   public void stop() {
-    if ( executor != null ) {
+    if (executor != null) {
       executor.shutdownNow();
-      while( !executor.isTerminated()) {
+      while (!executor.isTerminated()) {
         try {
-          executor.awaitTermination(200,TimeUnit.MILLISECONDS);
-        } catch( InterruptedException e) {
+          executor.awaitTermination(200, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
           break;
         }
       }
     }
   }
+
   public void setAnalysisEngineController(AnalysisEngineController controller) {
     this.controller = controller;
     controllerLatch.countDown();
   }
-  private boolean isMessageFromCasMultiplier(final Message message ) throws JMSException {
+
+  private boolean isMessageFromCasMultiplier(final Message message) throws JMSException {
     return message.propertyExists(AsynchAEMessage.CasSequence);
   }
+
   /**
-   * Intercept a message to increment a child count of the input CAS. 
-   * This method is always called in a single thread, guaranteeing order of processing.
-   * The child CAS will always come here first. Once the count is updated, or this
-   * CAS is not an child, the message will be delegated to one of the threads in the pool
-   * that will eventually call InputChannel object where the actual processing of the
-   * message begins.
+   * Intercept a message to increment a child count of the input CAS. This method is always called
+   * in a single thread, guaranteeing order of processing. The child CAS will always come here
+   * first. Once the count is updated, or this CAS is not an child, the message will be delegated to
+   * one of the threads in the pool that will eventually call InputChannel object where the actual
+   * processing of the message begins.
    * 
    */
   public void onMessage(final Message message, final Session session) throws JMSException {
     try {
-      //  Wait until the controller is plugged in
+      // Wait until the controller is plugged in
       controllerLatch.await();
-    } catch( InterruptedException e) {}
-    if ( isMessageFromCasMultiplier(message)) {
-      //  Check if the message came from a Cas Multiplier and it contains a new Process Request
+    } catch (InterruptedException e) {
+    }
+    if (isMessageFromCasMultiplier(message)) {
+      // Check if the message came from a Cas Multiplier and it contains a new Process Request
       int command = message.getIntProperty(AsynchAEMessage.Command);
       int messageType = message.getIntProperty(AsynchAEMessage.MessageType);
-      //  Intercept Cas Process Request from a Cas Multiplier
-      if (command == AsynchAEMessage.Process && messageType == AsynchAEMessage.Request ) {
-        String msgFrom = (String)message.getStringProperty(AsynchAEMessage.MessageFrom); 
-        if ( msgFrom != null && controller instanceof AggregateAnalysisEngineController ) {
-          String delegateKey =((AggregateAnalysisEngineController)controller).lookUpDelegateKey(msgFrom);
-          if ( delegateKey != null ) {
-            Delegate delegate = ((AggregateAnalysisEngineController)controller).lookupDelegate(delegateKey);
+      // Intercept Cas Process Request from a Cas Multiplier
+      if (command == AsynchAEMessage.Process && messageType == AsynchAEMessage.Request) {
+        String msgFrom = (String) message.getStringProperty(AsynchAEMessage.MessageFrom);
+        if (msgFrom != null && controller instanceof AggregateAnalysisEngineController) {
+          String delegateKey = ((AggregateAnalysisEngineController) controller)
+                  .lookUpDelegateKey(msgFrom);
+          if (delegateKey != null) {
+            Delegate delegate = ((AggregateAnalysisEngineController) controller)
+                    .lookupDelegate(delegateKey);
             delegate.setConcurrentConsumersOnReplyQueue();
           }
         }
         try {
-          String parentCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
-          //  Fetch parent CAS entry from the local cache
+          String parentCasReferenceId = message
+                  .getStringProperty(AsynchAEMessage.InputCasReference);
+          // Fetch parent CAS entry from the local cache
           CasStateEntry parentEntry = controller.getLocalCache().lookupEntry(parentCasReferenceId);
-          //  increment number of child CASes this parent has in play
+          // increment number of child CASes this parent has in play
           parentEntry.incrementSubordinateCasInPlayCount();
         } catch (Exception e) {
           e.printStackTrace();
-          if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
                     "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
-                    new Object[] {  e });
+                    new Object[] { e });
           }
         }
       }
-      
+
     }
-    if ( concurrentThreadCount > 1 ) {
-      //  Delegate meesage to the JmsInputChannel 
+    if (concurrentThreadCount > 1) {
+      // Delegate meesage to the JmsInputChannel
       executor.execute(new Runnable() {
         public void run() {
           try {
             delegateListener.onMessage(message, session);
-          } catch( Exception e) {
+          } catch (Exception e) {
             e.printStackTrace();
-            if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
-                      "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
-                      new Object[] {  e });
+                      "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                      "UIMAEE_exception__WARNING", new Object[] { e });
             }
           }
         }
       });
     } else {
-      //  Just handoff the message to the InputChannel object
+      // Just handoff the message to the InputChannel object
       delegateListener.onMessage(message, session);
     }
   }

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Sep  2 15:16:23 2009
@@ -52,120 +52,116 @@
 import org.apache.uima.util.Level;
 import org.springframework.util.Assert;
 
-public class JmsEndpointConnection_impl implements ConsumerListener
-{
-	private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
+public class JmsEndpointConnection_impl implements ConsumerListener {
+  private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
 
-	private Destination destination;
+  private Destination destination;
 
-	private Session producerSession;
+  private Session producerSession;
 
-	private MessageProducer producer;
+  private MessageProducer producer;
 
-	private BrokerConnectionEntry brokerDestinations;
-	
-	private String serverUri;
+  private BrokerConnectionEntry brokerDestinations;
 
-	private String endpoint;
+  private String serverUri;
 
-	private String endpointName;
+  private String endpoint;
+
+  private String endpointName;
 
   private Endpoint delegateEndpoint;
 
-	private volatile boolean retryEnabled;
+  private volatile boolean retryEnabled;
+
+  private AnalysisEngineController controller = null;
+
+  private volatile boolean connectionAborted = false;
+
+  protected static long connectionCreationTimestamp = 0L;
+
+  private Object semaphore = new Object();
 
-	private AnalysisEngineController controller = null;
+  private boolean isReplyEndpoint;
 
-	private volatile boolean connectionAborted = false;
+  private volatile boolean failed = false;
 
-	protected static long connectionCreationTimestamp = 0L;
+  private Object recoveryMux = new Object();
 
-	private Object semaphore = new Object();
-
-	private boolean isReplyEndpoint;
-	
-	private volatile boolean  failed = false;
-	
-	private Object recoveryMux = new Object();
-	
-	private final String componentName;
-	
-  public JmsEndpointConnection_impl(BrokerConnectionEntry aBrokerDestinationMap, Endpoint anEndpoint, AnalysisEngineController aController)
-	{
+  private final String componentName;
+
+  public JmsEndpointConnection_impl(BrokerConnectionEntry aBrokerDestinationMap,
+          Endpoint anEndpoint, AnalysisEngineController aController) {
     brokerDestinations = aBrokerDestinationMap;
-		serverUri = anEndpoint.getServerURI();
+    serverUri = anEndpoint.getServerURI();
     isReplyEndpoint = anEndpoint.isReplyEndpoint();
     controller = aController;
 
-    if ( ( anEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint ) && 
-			 anEndpoint.getDestination() != null && 
-			 anEndpoint.getDestination() instanceof ActiveMQDestination )
-		{
-			endpoint = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName();
-		}
-		else
-		{
-			endpoint = anEndpoint.getEndpoint();
-		}
-		anEndpoint.remove();
-		componentName = controller.getComponentName();
-		delegateEndpoint = anEndpoint;
-	}
-
-  public boolean isRetryEnabled()
-	{
-		return retryEnabled;
-	}
-
-	public void setRetryEnabled(boolean retryEnabled)
-	{
-		this.retryEnabled = retryEnabled;
-	}
-	
-
-	public boolean isOpen()
-	{
-		if ( failed ||
-		        producerSession == null || 
-		          brokerDestinations.getConnection() == null || 
-		            ((ActiveMQConnection)brokerDestinations.getConnection()).isClosed() ||
-		              ((ActiveMQConnection)brokerDestinations.getConnection()).isClosing() ||
-		                 ((ActiveMQConnection)brokerDestinations.getConnection()).isTransportFailed())
-		{
-			return false;
-		}
-		return ((ActiveMQSession) producerSession).isRunning();
-	}
+    if ((anEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
+            && anEndpoint.getDestination() != null
+            && anEndpoint.getDestination() instanceof ActiveMQDestination) {
+      endpoint = ((ActiveMQDestination) anEndpoint.getDestination()).getPhysicalName();
+    } else {
+      endpoint = anEndpoint.getEndpoint();
+    }
+    anEndpoint.remove();
+    componentName = controller.getComponentName();
+    delegateEndpoint = anEndpoint;
+  }
+
+  public boolean isRetryEnabled() {
+    return retryEnabled;
+  }
+
+  public void setRetryEnabled(boolean retryEnabled) {
+    this.retryEnabled = retryEnabled;
+  }
+
+  public boolean isOpen() {
+    if (failed || producerSession == null || brokerDestinations.getConnection() == null
+            || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()
+            || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosing()
+            || ((ActiveMQConnection) brokerDestinations.getConnection()).isTransportFailed()) {
+      return false;
+    }
+    return ((ActiveMQSession) producerSession).isRunning();
+  }
+
   private void openChannel() throws AsynchAEException, ServiceShutdownException {
     openChannel(getServerUri(), componentName, endpoint, controller);
   }
 
-	private synchronized void openChannel(String brokerUri, String aComponentName, String anEndpointName,  AnalysisEngineController aController) throws AsynchAEException, ServiceShutdownException
-	{
-		try
-		{
-			//	If replying to http request, reply to a queue managed by this service broker using tcp protocol
-			if ( isReplyEndpoint && brokerUri.startsWith("http") && aController != null && aController.getInputChannel() != null )
-			{
-				org.apache.uima.aae.InputChannel iC = aController.getInputChannel(aController.getName());
-				if ( ( brokerUri.trim().length() == 0 ) && iC != null )
-				{
-					brokerUri = iC.getServiceInfo().getBrokerURL();
-				}
-				
+  private synchronized void openChannel(String brokerUri, String aComponentName,
+          String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
+          ServiceShutdownException {
+    try {
+      // If replying to http request, reply to a queue managed by this service broker using tcp
+      // protocol
+      if (isReplyEndpoint && brokerUri.startsWith("http") && aController != null
+              && aController.getInputChannel() != null) {
+        org.apache.uima.aae.InputChannel iC = aController.getInputChannel(aController.getName());
+        if ((brokerUri.trim().length() == 0) && iC != null) {
+          brokerUri = iC.getServiceInfo().getBrokerURL();
+        }
+
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "open", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_override_connection_to_endpoint__FINE", new Object[] { aComponentName, getEndpoint(), aController.getInputChannel().getServerUri() });
+          UIMAFramework.getLogger(CLASS_NAME).logrb(
+                  Level.FINE,
+                  CLASS_NAME.getName(),
+                  "open",
+                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_override_connection_to_endpoint__FINE",
+                  new Object[] { aComponentName, getEndpoint(),
+                      aController.getInputChannel().getServerUri() });
         }
-			}
+      }
 
-      
-      if ( !isOpen() ) {  
+      if (!isOpen()) {
         Connection conn = null;
-        if (brokerDestinations.getConnection() == null ) {
+        if (brokerDestinations.getConnection() == null) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_activemq_open__FINE",
-                      new Object[] { anEndpointName, brokerUri });
+                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_activemq_open__FINE", new Object[] { anEndpointName, brokerUri });
           }
           ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
           conn = factory.createConnection();
@@ -178,236 +174,203 @@
         failed = false;
       }
       Connection conn = brokerDestinations.getConnection();
-      if ( failed ) {
-        //  Unable to create a connection
+      if (failed) {
+        // Unable to create a connection
         return;
       }
       producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-			
-			if ( (delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint ) && delegateEndpoint.getDestination() != null )
-			{
-				producer = producerSession.createProducer(null); 
-				if ( aController != null )
-				{
-	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-			                "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_temp_conn_starting__FINE",
-			                new Object[] { aComponentName, anEndpointName, brokerUri });
-	        }
-				}
-			}
-			else
-			{
-				destination = producerSession.createQueue(getEndpoint());
-				producer = producerSession.createProducer(destination);
-				if ( controller != null )
-				{
-	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-		                "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_conn_starting__FINE",
-		                new Object[] { aComponentName, anEndpointName, brokerUri });
-	        }
-				}
-			}
-			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-			// Since the connection is shared, start it only once 
-			if ( !((ActiveMQConnection)brokerDestinations.getConnection()).isStarted() ) {
-			  brokerDestinations.getConnection().start();
-			}
-			if ( controller != null )
-			{
+
+      if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
+              && delegateEndpoint.getDestination() != null) {
+        producer = producerSession.createProducer(null);
+        if (aController != null) {
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_temp_conn_starting__FINE",
+                    new Object[] { aComponentName, anEndpointName, brokerUri });
+          }
+        }
+      } else {
+        destination = producerSession.createQueue(getEndpoint());
+        producer = producerSession.createProducer(destination);
+        if (controller != null) {
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_conn_starting__FINE",
+                    new Object[] { aComponentName, anEndpointName, brokerUri });
+          }
+        }
+      }
+      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      // Since the connection is shared, start it only once
+      if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) {
+        brokerDestinations.getConnection().start();
+      }
+      if (controller != null) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_conn_started__FINE",
-	                new Object[] { endpoint, brokerUri });
-          if ( controller.getInputChannel() != null ) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connection_open_to_endpoint__FINE", new Object[] { aComponentName, getEndpoint(), brokerUri });
+                  "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri });
+          if (controller.getInputChannel() != null) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_connection_open_to_endpoint__FINE",
+                    new Object[] { aComponentName, getEndpoint(), brokerUri });
           }
         }
-			}
-			failed = false;
-		}
-		catch ( Exception e)
-		{
-		  boolean rethrow = true;
-		  e.printStackTrace();
-		  
-      if ( e instanceof JMSException ) {
-        rethrow = handleJmsException( (JMSException)e );
-        
-      } 
-      if ( rethrow ) {
+      }
+      failed = false;
+    } catch (Exception e) {
+      boolean rethrow = true;
+      e.printStackTrace();
+
+      if (e instanceof JMSException) {
+        rethrow = handleJmsException((JMSException) e);
+
+      }
+      if (rethrow) {
         throw new AsynchAEException(e);
       }
-		}
-	}
-	public synchronized void open() throws AsynchAEException, ServiceShutdownException {
-	  open( delegateEndpoint.getEndpoint(), serverUri);
-	}
-  public synchronized void open(String brokerUri, String anEndpointName) throws AsynchAEException, ServiceShutdownException
-	{
+    }
+  }
+
+  public synchronized void open() throws AsynchAEException, ServiceShutdownException {
+    open(delegateEndpoint.getEndpoint(), serverUri);
+  }
+
+  public synchronized void open(String brokerUri, String anEndpointName) throws AsynchAEException,
+          ServiceShutdownException {
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                "open", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open__FINE",
-                new Object[] { anEndpointName, brokerUri });
-    }
-		if ( !connectionAborted )
-		{
-		    openChannel();
-		}
-
-	}
-
-	public synchronized void abort()
-	{
-		connectionAborted = true;
-		brokerDestinations.getConnectionTimer().stopTimer();
-		try {
-	    this.close();
-		} catch( Exception e) {
-		}
-	}
-
-	public synchronized void close() throws Exception
-	{
-			if (producer != null)
-			{
-				try
-				{
-					producer.close();
-				}
-				catch ( Exception e)
-				{
-					//	Ignore we are shutting down
-				}
-			}
-			if (producerSession != null)
-			{
-				try
-				{
-					producerSession.close();
-				}
-				catch ( Exception e)
-				{
-					//	Ignore we are shutting down
-				}
-				producerSession = null;
-			}
-			if (destination != null)
-			{
-				destination = null;
-			}
-	}
-
-	protected String getEndpoint()
-	{
-		return endpoint;
-	}
-
-	protected void setEndpoint(String endpoint)
-	{
-		this.endpoint = endpoint;
-	}
-
-	protected synchronized String getServerUri()
-	{
-		return serverUri;
-	}
-
-	protected synchronized void setServerUri(String serverUri)
-	{
-		this.serverUri = serverUri;
-	}
-
-	public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException
-	{
-		Assert.notNull(producerSession);
-		boolean done = false;
-		int retryCount = 4;
-		while (retryCount > 0)
-		{
-			try	{
-				retryCount--;
-				
-				if (aTextMessage == null)
-				{
-					return producerSession.createTextMessage();
-				}
-				else
-				{
-					return producerSession.createTextMessage(aTextMessage);
-				}
-
-			}	catch ( javax.jms.IllegalStateException e) {
-				try	{
-					open();
-				}	catch ( ServiceShutdownException ex) {
-					ex.printStackTrace();
-				} catch( AsynchAEException ex) {
-				  
-	        throw ex;
-	      }				
-			}	
-			catch ( Exception e) {
-				throw new AsynchAEException(e);
-			} 
-		}
-		throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
-	}
-  public BytesMessage produceByteMessage() throws AsynchAEException
-  {
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "open",
+              JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open__FINE",
+              new Object[] { anEndpointName, brokerUri });
+    }
+    if (!connectionAborted) {
+      openChannel();
+    }
+
+  }
+
+  public synchronized void abort() {
+    connectionAborted = true;
+    brokerDestinations.getConnectionTimer().stopTimer();
+    try {
+      this.close();
+    } catch (Exception e) {
+    }
+  }
+
+  public synchronized void close() throws Exception {
+    if (producer != null) {
+      try {
+        producer.close();
+      } catch (Exception e) {
+        // Ignore we are shutting down
+      }
+    }
+    if (producerSession != null) {
+      try {
+        producerSession.close();
+      } catch (Exception e) {
+        // Ignore we are shutting down
+      }
+      producerSession = null;
+    }
+    if (destination != null) {
+      destination = null;
+    }
+  }
+
+  protected String getEndpoint() {
+    return endpoint;
+  }
+
+  protected void setEndpoint(String endpoint) {
+    this.endpoint = endpoint;
+  }
+
+  protected synchronized String getServerUri() {
+    return serverUri;
+  }
+
+  protected synchronized void setServerUri(String serverUri) {
+    this.serverUri = serverUri;
+  }
+
+  public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException {
     Assert.notNull(producerSession);
     boolean done = false;
     int retryCount = 4;
-    while (retryCount > 0)
-    {
-      try
-      {
+    while (retryCount > 0) {
+      try {
         retryCount--;
-        return producerSession.createBytesMessage();
-      }
-      catch ( javax.jms.IllegalStateException e)
-      {
-        try
-        {
+
+        if (aTextMessage == null) {
+          return producerSession.createTextMessage();
+        } else {
+          return producerSession.createTextMessage(aTextMessage);
+        }
+
+      } catch (javax.jms.IllegalStateException e) {
+        try {
           open();
+        } catch (ServiceShutdownException ex) {
+          ex.printStackTrace();
+        } catch (AsynchAEException ex) {
+
+          throw ex;
         }
-        catch ( ServiceShutdownException ex)
-        {
+      } catch (Exception e) {
+        throw new AsynchAEException(e);
+      }
+    }
+    throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
+  }
+
+  public BytesMessage produceByteMessage() throws AsynchAEException {
+    Assert.notNull(producerSession);
+    boolean done = false;
+    int retryCount = 4;
+    while (retryCount > 0) {
+      try {
+        retryCount--;
+        return producerSession.createBytesMessage();
+      } catch (javax.jms.IllegalStateException e) {
+        try {
+          open();
+        } catch (ServiceShutdownException ex) {
           ex.printStackTrace();
         }
 
-      }
-      catch ( Exception e)
-      {
+      } catch (Exception e) {
         throw new AsynchAEException(e);
       }
     }
-    throw new AsynchAEException(new InvalidMessageException("Unable to produce BytesMessage Object"));
+    throw new AsynchAEException(
+            new InvalidMessageException("Unable to produce BytesMessage Object"));
+  }
+
+  public ObjectMessage produceObjectMessage() throws AsynchAEException {
+    Assert.notNull(producerSession);
+
+    try {
+      if (!((ActiveMQSession) producerSession).isRunning()) {
+        open();
+      }
+      return producerSession.createObjectMessage();
+    } catch (Exception e) {
+      throw new AsynchAEException(e);
+    }
   }
 
-	public ObjectMessage produceObjectMessage() throws AsynchAEException
-	{
-		Assert.notNull(producerSession);
-
-		try
-		{
-			if (!((ActiveMQSession) producerSession).isRunning())
-			{
-				open();
-			}
-			return producerSession.createObjectMessage();
-		}
-		catch ( Exception e)
-		{
-			throw new AsynchAEException(e);
-		}
-	}
-	
-	private boolean delayCasDelivery(int msgType, Message aMessage, int command) throws Exception {
-    
+  private boolean delayCasDelivery(int msgType, Message aMessage, int command) throws Exception {
+
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "recoverSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "recoverSession",
+              JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE",
+              new Object[] { getEndpoint() });
     }
     openChannel();
     // The connection has been successful. Now check if we need to create a new listener
@@ -415,156 +378,172 @@
     // endpoint for the delegate is marked as FAILED. This will be the case if the listener
     // on the reply queue for the endpoint has failed.
     String endpointName = delegateEndpoint.getEndpoint();
-    synchronized( recoveryMux ) {
-      if ( controller instanceof AggregateAnalysisEngineController ) {
+    synchronized (recoveryMux) {
+      if (controller instanceof AggregateAnalysisEngineController) {
         // Using the queue name lookup the delegate key
-        String key = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpointName);
-        if ( key != null && destination != null && !isReplyEndpoint ) {
+        String key = ((AggregateAnalysisEngineController) controller)
+                .lookUpDelegateKey(endpointName);
+        if (key != null && destination != null && !isReplyEndpoint) {
           // For Process Requests check the state of the delegate that is to receive
           // the CAS. If the delegate state = TIMEOUT_STATE, push the CAS id onto
-          // delegate's list of delayed CASes. The state of the delegate was 
+          // delegate's list of delayed CASes. The state of the delegate was
           // changed to TIMEOUT when a previous CAS timed out.
-          if (msgType != AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
+          if (msgType != AsynchAEMessage.Request && command == AsynchAEMessage.Process) {
             String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
-            if ( casReferenceId != null && 
-                 ((AggregateAnalysisEngineController)controller).delayCasIfDelegateInTimedOutState(casReferenceId, endpointName) ) {
+            if (casReferenceId != null
+                    && ((AggregateAnalysisEngineController) controller)
+                            .delayCasIfDelegateInTimedOutState(casReferenceId, endpointName)) {
               return true;
             }
           }
-          // The aggregate has a master list of endpoints which are typically cloned during processing
+          // The aggregate has a master list of endpoints which are typically cloned during
+          // processing
           // This object uses a copy of the master. When a listener fails, the status of the master
           // endpoint is changed. To check the status, fetch the master endpoint, check its status
           // and if marked as FAILED, create a new listener, a new temp queue and override this
           // object endpoint copy destination property. It *IS* a new replyTo temp queue.
-          Endpoint masterEndpoint = ((AggregateAnalysisEngineController)controller).lookUpEndpoint(key, false);
-          if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
+          Endpoint masterEndpoint = ((AggregateAnalysisEngineController) controller)
+                  .lookUpEndpoint(key, false);
+          if (masterEndpoint.getStatus() == Endpoint.FAILED) {
             // Create a new Listener Object to receive replies
             createListener(key);
-            destination = (Destination)masterEndpoint.getDestination();
-            delegateEndpoint.setDestination(destination);              
-            // Override the reply destination. A new listener has been created along with a new temp queue for replies.
+            destination = (Destination) masterEndpoint.getDestination();
+            delegateEndpoint.setDestination(destination);
+            // Override the reply destination. A new listener has been created along with a new temp
+            // queue for replies.
             aMessage.setJMSReplyTo(destination);
           }
         }
       }
     }
     return false;
-	  
-	}
-	public boolean send(final Message aMessage, long msgSize, boolean startTimer) 
-	{
-		String destinationName = "";
-
-		try
-		{
-        int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
-        int command = aMessage.getIntProperty(AsynchAEMessage.Command);
-
-				if ( failed || brokerDestinations.getConnection() == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
-				{
-				  if ( delayCasDelivery(msgType, aMessage, command) ) {
-				    //  Return true as if the CAS was sent
-				    return true;
-				  }
-				}
-
-				//	Send a reply to a queue provided by the client
-				
-				//  Stop messages and replies are sent to the endpoint provided in the destination object
-				if ( (command == AsynchAEMessage.Stop || command == AsynchAEMessage.ReleaseCAS || isReplyEndpoint) && delegateEndpoint.getDestination() != null  )
-				{
-					destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
-					if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
-					{
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
-					}
-          logMessageSize(aMessage, msgSize, destinationName);
-					synchronized(producer)
-					{
-             producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
-					}
-				}
-				else
-				{
-					destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
-           if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
-           {
-             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
-           }
-           logMessageSize(aMessage, msgSize, destinationName);
-           synchronized(producer)
-           {
-             producer.send(aMessage);
-           }
-				}
-				//  Starts a timer on a broker connection. Every time a new message
-				//  is sent to a destination managed by the broker the timer is 
-				//  restarted. The main purpose of the timer is to close connections
-				//  that are not used.
-				if (startTimer) {
-				  brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp, delegateEndpoint);
-				}
-				//  Succeeded sending the CAS
-				return true;
-		}
-		catch ( Exception e)
-		{
-			//	If the controller has been stopped no need to send messages
-			if ( controller.isStopped())
-			{
-				return true;
-			}
-			else
-			{
-			  if ( e instanceof JMSException ) {
-			    handleJmsException( (JMSException)e );
-			  } else {
-			    e.printStackTrace();
-	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { componentName, e});
-	        }
-			  }
-			    
-			}
-		}
+
+  }
+
+  public boolean send(final Message aMessage, long msgSize, boolean startTimer) {
+    String destinationName = "";
+
+    try {
+      int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+      int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+
+      if (failed || brokerDestinations.getConnection() == null || producerSession == null
+              || !((ActiveMQSession) producerSession).isRunning()) {
+        if (delayCasDelivery(msgType, aMessage, command)) {
+          // Return true as if the CAS was sent
+          return true;
+        }
+      }
+
+      // Send a reply to a queue provided by the client
+
+      // Stop messages and replies are sent to the endpoint provided in the destination object
+      if ((command == AsynchAEMessage.Stop || command == AsynchAEMessage.ReleaseCAS || isReplyEndpoint)
+              && delegateEndpoint.getDestination() != null) {
+        destinationName = ((ActiveMQDestination) delegateEndpoint.getDestination())
+                .getPhysicalName();
+        if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send",
+                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+                  new Object[] { destinationName });
+        }
+        logMessageSize(aMessage, msgSize, destinationName);
+        synchronized (producer) {
+          producer.send((Destination) delegateEndpoint.getDestination(), aMessage);
+        }
+      } else {
+        destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
+        if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send",
+                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+                  new Object[] { destinationName });
+        }
+        logMessageSize(aMessage, msgSize, destinationName);
+        synchronized (producer) {
+          producer.send(aMessage);
+        }
+      }
+      // Starts a timer on a broker connection. Every time a new message
+      // is sent to a destination managed by the broker the timer is
+      // restarted. The main purpose of the timer is to close connections
+      // that are not used.
+      if (startTimer) {
+        brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
+                delegateEndpoint);
+      }
+      // Succeeded sending the CAS
+      return true;
+    } catch (Exception e) {
+      // If the controller has been stopped no need to send messages
+      if (controller.isStopped()) {
+        return true;
+      } else {
+        if (e instanceof JMSException) {
+          handleJmsException((JMSException) e);
+        } else {
+          e.printStackTrace();
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "send",
+                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
+                    new Object[] { componentName, e });
+          }
+        }
+
+      }
+    }
     brokerDestinations.getConnectionTimer().stopTimer();
-    //  Failed here
-		return false;
-	}
+    // Failed here
+    return false;
+  }
 
-	private void logMessageSize( Message aMessage, long msgSize, String destinationName ) {
-    if ( UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+  private void logMessageSize(Message aMessage, long msgSize, String destinationName) {
+    if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
       boolean isReply = false;
-      if ( isReplyEndpoint  ) {
+      if (isReplyEndpoint) {
         isReply = true;
       }
-      String type="Text";
-      if ( aMessage instanceof BytesMessage ) {
+      String type = "Text";
+      if (aMessage instanceof BytesMessage) {
         type = "Binary";
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "logMessageSize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_msg_size__FINE", new Object[] { componentName, isReply==true?"Reply":"Request", "Binary", destinationName,msgSize});
-      } else if ( aMessage instanceof TextMessage ) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "logMessageSize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_msg_size__FINE", new Object[] { componentName, isReply==true?"Reply":"Request", "XMI", destinationName,msgSize});
-      }
-    }
-	}
-	/**
-	 * This method is called during recovery of failed connection. It is only called if the endpoint
-	 * associated with a given delegate is marked as FAILED. It is marked that way when a listener
-	 * attached to the reply queue fails. This method creates a new listener and a new temp queue.
-	 * 
-	 * @param delegateKey
-	 * @throws Exception
-	 */
-	private void createListener(String delegateKey) throws Exception {
-    if ( controller instanceof AggregateAnalysisEngineController ) {
-      //  Fetch an InputChannel that handles messages for a given delegate
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINE,
+                CLASS_NAME.getName(),
+                "logMessageSize",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_msg_size__FINE",
+                new Object[] { componentName, isReply == true ? "Reply" : "Request", "Binary",
+                    destinationName, msgSize });
+      } else if (aMessage instanceof TextMessage) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(
+                Level.FINE,
+                CLASS_NAME.getName(),
+                "logMessageSize",
+                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAJMS_msg_size__FINE",
+                new Object[] { componentName, isReply == true ? "Reply" : "Request", "XMI",
+                    destinationName, msgSize });
+      }
+    }
+  }
+
+  /**
+   * This method is called during recovery of failed connection. It is only called if the endpoint
+   * associated with a given delegate is marked as FAILED. It is marked that way when a listener
+   * attached to the reply queue fails. This method creates a new listener and a new temp queue.
+   * 
+   * @param delegateKey
+   * @throws Exception
+   */
+  private void createListener(String delegateKey) throws Exception {
+    if (controller instanceof AggregateAnalysisEngineController) {
+      // Fetch an InputChannel that handles messages for a given delegate
       InputChannel iC = controller.getReplyInputChannel(delegateKey);
-      //  Create a new Listener, new Temp Queue and associate the listener with the Input Channel
+      // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
       iC.createListener(delegateKey);
     }
-	}
+  }
 
-	private synchronized boolean handleJmsException( JMSException ex) {
+  private synchronized boolean handleJmsException(JMSException ex) {
     if (!failed) {
       failed = true;
     }
@@ -583,7 +562,8 @@
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                   "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_send_failed_deleted_queue_INFO", new Object[] { componentName, destName, serverUri });
+                  "UIMAJMS_send_failed_deleted_queue_INFO",
+                  new Object[] { componentName, destName, serverUri });
         }
         controller.addEndpointToDoNotProcessList(delegateEndpoint.getDestination().toString());
         return false;
@@ -605,24 +585,21 @@
         }
         ex.printStackTrace();
       }
-    } catch ( Exception e) {
+    } catch (Exception e) {
       e.printStackTrace();
     }
 
     return true;
-	}
-	public void onConsumerEvent(ConsumerEvent arg0)
-	{
-		if (controller != null)
-		{
-			controller.handleDelegateLifeCycleEvent(getEndpoint(), arg0.getConsumerCount());
-		}
-	}
+  }
+
+  public void onConsumerEvent(ConsumerEvent arg0) {
+    if (controller != null) {
+      controller.handleDelegateLifeCycleEvent(getEndpoint(), arg0.getConsumerCount());
+    }
+  }
 
-	protected synchronized void finalize() throws Throwable
-	{
+  protected synchronized void finalize() throws Throwable {
     brokerDestinations.getConnectionTimer().stopTimer();
-	}
-	
-	
+  }
+
 }