You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:16:24 UTC
svn commit: r810547 [1/5] -
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
Author: cwiklik
Date: Wed Sep 2 15:16:23 2009
New Revision: 810547
URL: http://svn.apache.org/viewvc?rev=810547&view=rev
Log:
UIMA-1541 Reformatted to conform to UIMA formatting guidelines. No other changes included.
Modified:
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannelMBean.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ModifiableListener.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaEEAdminSpringContext.java
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java Wed Sep 2 15:16:23 2009
@@ -29,8 +29,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.jmx.ManagementContext;
-//import org.apache.activemq.memory.UsageListener;
+import org.apache.activemq.broker.jmx.ManagementContext; //import org.apache.activemq.memory.UsageListener;
import org.apache.uima.UIMAFramework;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.util.Level;
@@ -38,353 +37,322 @@
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
-public class BrokerDeployer implements ApplicationListener
-{
- private static final Class CLASS_NAME = BrokerDeployer.class;
- private static final int BASE_JMX_PORT = 1200;
- private static final int MAX_PORT_THRESHOLD = 200;
-
- private static BrokerService service = new BrokerService();
- private Object semaphore = new Object();
- private long maxBrokerMemory=0;
- private String brokerURI;
- private TransportConnector tcpConnector = null;
- private TransportConnector httpConnector = null;
-
- public BrokerDeployer(long maxMemoryinBytes) throws Exception
- {
- maxBrokerMemory = maxMemoryinBytes;
- startInternalBroker();
- }
-
-
- public BrokerDeployer() throws Exception
- {
- startInternalBroker();
- }
- public BrokerService getBroker()
- {
- return service;
- }
- public void startInternalBroker() throws Exception
- {
- TransportConnector connector = null;
-
- if (maxBrokerMemory > 0 )
- {
- System.out.println("Configuring Internal Broker With Max Memory Of:"+maxBrokerMemory);
+public class BrokerDeployer implements ApplicationListener {
+ private static final Class CLASS_NAME = BrokerDeployer.class;
+
+ private static final int BASE_JMX_PORT = 1200;
+
+ private static final int MAX_PORT_THRESHOLD = 200;
+
+ private static BrokerService service = new BrokerService();
+
+ private Object semaphore = new Object();
+
+ private long maxBrokerMemory = 0;
+
+ private String brokerURI;
+
+ private TransportConnector tcpConnector = null;
+
+ private TransportConnector httpConnector = null;
+
+ public BrokerDeployer(long maxMemoryinBytes) throws Exception {
+ maxBrokerMemory = maxMemoryinBytes;
+ startInternalBroker();
+ }
+
+ public BrokerDeployer() throws Exception {
+ startInternalBroker();
+ }
+
+ public BrokerService getBroker() {
+ return service;
+ }
+
+ public void startInternalBroker() throws Exception {
+ TransportConnector connector = null;
+
+ if (maxBrokerMemory > 0) {
+ System.out.println("Configuring Internal Broker With Max Memory Of:" + maxBrokerMemory);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_broker_memory__CONFIG",
- new Object[] {maxBrokerMemory});
+ "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_broker_memory__CONFIG", new Object[] { maxBrokerMemory });
}
- }
- String[] connectors = service.getNetworkConnectorURIs();
- if ( connectors != null )
- {
- for( int i=0; i < connectors.length; i++)
- {
- System.out.println("ActiveMQ Broker Started With Connector:"+connectors[i]);
- }
- brokerURI = service.getMasterConnectorURI();
- }
- else
- {
-
- String connectorList = "";
- service.setPersistent(false);
- int startPort = BASE_JMX_PORT;
- if ( System.getProperties().containsKey("com.sun.management.jmxremote.port") )
- {
- startPort = Integer.parseInt(System.getProperty("com.sun.management.jmxremote.port"));
-
- }
- while( startPort < MAX_PORT_THRESHOLD && !openPort(startPort))
- {
- startPort++;
- }
- if ( startPort < (startPort+MAX_PORT_THRESHOLD ) )
- {
- MBeanServer jmxServer = null;
- // Check if the MBeanServer is available. If it is, plug it into the
- // local Broker. We only need one MBeanServer in the JVM
- if ( (jmxServer = ManagementContext.findTigerMBeanServer()) != null)
- {
- System.out.println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger.");
- service.getManagementContext().setMBeanServer(jmxServer);
- // Specify JMX Port
- service.getManagementContext().setConnectorPort(startPort);
- }
- else
- {
- service.getManagementContext().setConnectorPort(startPort);
- service.setUseJmx(true);
- }
-
- System.setProperty("com.sun.management.jmxremote.port", String.valueOf(startPort));
+ }
+ String[] connectors = service.getNetworkConnectorURIs();
+ if (connectors != null) {
+ for (int i = 0; i < connectors.length; i++) {
+ System.out.println("ActiveMQ Broker Started With Connector:" + connectors[i]);
+ }
+ brokerURI = service.getMasterConnectorURI();
+ } else {
+
+ String connectorList = "";
+ service.setPersistent(false);
+ int startPort = BASE_JMX_PORT;
+ if (System.getProperties().containsKey("com.sun.management.jmxremote.port")) {
+ startPort = Integer.parseInt(System.getProperty("com.sun.management.jmxremote.port"));
+
+ }
+ while (startPort < MAX_PORT_THRESHOLD && !openPort(startPort)) {
+ startPort++;
+ }
+ if (startPort < (startPort + MAX_PORT_THRESHOLD)) {
+ MBeanServer jmxServer = null;
+ // Check if the MBeanServer is available. If it is, plug it into the
+ // local Broker. We only need one MBeanServer in the JVM
+ if ((jmxServer = ManagementContext.findTigerMBeanServer()) != null) {
+ System.out.println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger.");
+ service.getManagementContext().setMBeanServer(jmxServer);
+ // Specify JMX Port
+ service.getManagementContext().setConnectorPort(startPort);
+ } else {
+ service.getManagementContext().setConnectorPort(startPort);
+ service.setUseJmx(true);
+ }
+
+ System.setProperty("com.sun.management.jmxremote.port", String.valueOf(startPort));
- System.out.println("JMX Console connect URI: service:jmx:rmi:///jndi/rmi://localhost:"+startPort+"/jmxrmi");
+ System.out.println("JMX Console connect URI: service:jmx:rmi:///jndi/rmi://localhost:"
+ + startPort + "/jmxrmi");
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jmx_uri__CONFIG",
- new Object[] {"service:jmx:rmi:///jndi/rmi://localhost:"+startPort+"/jmxrmi" });
+ UIMAFramework.getLogger(CLASS_NAME)
+ .logrb(
+ Level.CONFIG,
+ CLASS_NAME.getName(),
+ "startInternalBroker",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_jmx_uri__CONFIG",
+ new Object[] { "service:jmx:rmi:///jndi/rmi://localhost:" + startPort
+ + "/jmxrmi" });
}
- }
+ }
+
+ brokerURI = generateInternalURI("tcp", 18810, true, false);
- brokerURI = generateInternalURI("tcp", 18810, true, false);
-
- // Wait until sucessfull adding connector to the broker
- // Sleeps for 1sec between retries until success
- int timeBetweenRetries = 1000;
- boolean tcpConnectorAcquiredValidPort = false;
- while( !tcpConnectorAcquiredValidPort )
- {
- try
- {
- tcpConnector = service.addConnector(brokerURI);
- tcpConnectorAcquiredValidPort = true;
- }
- catch( Exception e)
- {
- synchronized(this)
- {
- wait(timeBetweenRetries);
- }
- } // silence InstanceAlreadyExistsException
-
- }
- System.out.println("Adding TCP Connector:"+tcpConnector.getConnectUri());
+ // Wait until sucessfull adding connector to the broker
+ // Sleeps for 1sec between retries until success
+ int timeBetweenRetries = 1000;
+ boolean tcpConnectorAcquiredValidPort = false;
+ while (!tcpConnectorAcquiredValidPort) {
+ try {
+ tcpConnector = service.addConnector(brokerURI);
+ tcpConnectorAcquiredValidPort = true;
+ } catch (Exception e) {
+ synchronized (this) {
+ wait(timeBetweenRetries);
+ }
+ } // silence InstanceAlreadyExistsException
+
+ }
+ System.out.println("Adding TCP Connector:" + tcpConnector.getConnectUri());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG",
- new Object[] {"Adding TCP Connector",tcpConnector.getConnectUri() });
+ "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_adding_connector__CONFIG",
+ new Object[] { "Adding TCP Connector", tcpConnector.getConnectUri() });
}
- connectorList = tcpConnector.getName();
- if ( System.getProperty("StompSupport") != null )
- {
- String stompURI = generateInternalURI("stomp", 61613, false, false);
- connector = service.addConnector(stompURI);
- System.out.println("Adding STOMP Connector:"+connector.getConnectUri());
+ connectorList = tcpConnector.getName();
+ if (System.getProperty("StompSupport") != null) {
+ String stompURI = generateInternalURI("stomp", 61613, false, false);
+ connector = service.addConnector(stompURI);
+ System.out.println("Adding STOMP Connector:" + connector.getConnectUri());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG",
- new Object[] {"Adding STOMP Connector",connector.getConnectUri() });
+ "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_adding_connector__CONFIG",
+ new Object[] { "Adding STOMP Connector", connector.getConnectUri() });
}
- connectorList += ","+connector.getName();
- }
- if ( System.getProperty("HTTP") != null )
- {
- String stringPort = System.getProperty("HTTP");
- int p = Integer.parseInt(stringPort);
- String httpURI = generateInternalURI("http", p, false, true);
- httpConnector = service.addConnector(httpURI);
- System.out.println("Adding HTTP Connector:"+httpConnector.getConnectUri());
+ connectorList += "," + connector.getName();
+ }
+ if (System.getProperty("HTTP") != null) {
+ String stringPort = System.getProperty("HTTP");
+ int p = Integer.parseInt(stringPort);
+ String httpURI = generateInternalURI("http", p, false, true);
+ httpConnector = service.addConnector(httpURI);
+ System.out.println("Adding HTTP Connector:" + httpConnector.getConnectUri());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG",
- new Object[] {"Adding HTTP Connector",httpConnector.getConnectUri() });
+ "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_adding_connector__CONFIG",
+ new Object[] { "Adding HTTP Connector", httpConnector.getConnectUri() });
}
- connectorList += ","+httpConnector.getName();
- }
- service.start();
- System.setProperty("ActiveMQConnectors", connectorList);
- System.out.println("Broker Service Started - URL:"+service.getVmConnectorURI());
+ connectorList += "," + httpConnector.getName();
+ }
+ service.start();
+ System.setProperty("ActiveMQConnectors", connectorList);
+ System.out.println("Broker Service Started - URL:" + service.getVmConnectorURI());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_broker_started__CONFIG",
- new Object[] {service.getVmConnectorURI() });
+ "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_broker_started__CONFIG", new Object[] { service.getVmConnectorURI() });
}
- }
-
- // Allow the connectors some time to start
- synchronized( semaphore )
- {
- semaphore.wait(1000);
- }
- //System.out.println("JMX Server Port:"+service.getManagementContext().getRmiServerPort());
- //setConnectorPort(startPort);
-
-
- }
- private boolean openPort( int aPort )
- {
- ServerSocket ssocket= null;
- try
- {
- ssocket = new ServerSocket(aPort);
- return true;
- }
- catch( Exception e)
- {
- return false;
- }
- finally
- {
- try
- {
- if ( ssocket != null )
- {
- ssocket.close();
- }
- }
- catch (IOException ioe)
- {
- }
- }
-
- }
-
- /**
- * Generates a unique port for the Network Connector that will be plugged into the internal Broker.
- * This connector externalizes the internal broker so that remote delegates can reply back to the
- * Aggregate. This method tests port 18810 for availability and it fails increments the port by one
- * until a port is valid.
- *
- * @return - Broker URI with a unique port
- */
- private String generateInternalURI(String aProtocol, int aDefaultPort, boolean cacheURL, boolean oneTry)
- throws Exception
- {
- boolean success = false;
- int openPort=aDefaultPort;
- ServerSocket ssocket= null;
-
- while( !success )
- {
- try
- {
- ssocket = new ServerSocket(openPort);
+ }
+
+ // Allow the connectors some time to start
+ synchronized (semaphore) {
+ semaphore.wait(1000);
+ }
+ // System.out.println("JMX Server Port:"+service.getManagementContext().getRmiServerPort());
+ // setConnectorPort(startPort);
+
+ }
+
+ private boolean openPort(int aPort) {
+ ServerSocket ssocket = null;
+ try {
+ ssocket = new ServerSocket(aPort);
+ return true;
+ } catch (Exception e) {
+ return false;
+ } finally {
+ try {
+ if (ssocket != null) {
+ ssocket.close();
+ }
+ } catch (IOException ioe) {
+ }
+ }
+
+ }
+
+ /**
+ * Generates a unique port for the Network Connector that will be plugged into the internal
+ * Broker. This connector externalizes the internal broker so that remote delegates can reply back
+ * to the Aggregate. This method tests port 18810 for availability and it fails increments the
+ * port by one until a port is valid.
+ *
+ * @return - Broker URI with a unique port
+ */
+ private String generateInternalURI(String aProtocol, int aDefaultPort, boolean cacheURL,
+ boolean oneTry) throws Exception {
+ boolean success = false;
+ int openPort = aDefaultPort;
+ ServerSocket ssocket = null;
+
+ while (!success) {
+ try {
+ ssocket = new ServerSocket(openPort);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_port_available__CONFIG",
- new Object[] {openPort });
+ "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_port_available__CONFIG", new Object[] { openPort });
+ }
+ String uri = aProtocol + "://"
+ + ssocket.getInetAddress().getLocalHost().getCanonicalHostName() + ":" + openPort;
+ success = true;
+ if (cacheURL) {
+ System.setProperty("BrokerURI", uri);
+
}
- String uri = aProtocol+"://"+ssocket.getInetAddress().getLocalHost().getCanonicalHostName()+":"+openPort;
- success = true;
- if ( cacheURL )
- {
- System.setProperty("BrokerURI", uri);
-
- }
- return uri;
- }
- catch( BindException e)
- {
+ return uri;
+ } catch (BindException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_port_not_available__CONFIG",
- new Object[] {openPort });
+ "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_port_not_available__CONFIG", new Object[] { openPort });
+ }
+ if (oneTry) {
+ System.out.println("Given port:" + openPort + " is not available for " + aProtocol);
+ throw e;
}
- if ( oneTry )
- {
- System.out.println("Given port:"+openPort+" is not available for "+aProtocol);
- throw e;
- }
- openPort++;
- }
- catch( Exception e)
- {
- e.printStackTrace();
+ openPort++;
+ } catch (Exception e) {
+ e.printStackTrace();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
- new Object[] {JmsConstants.threadName(), e });
+ "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
+ }
+ if (oneTry) {
+ throw e;
}
- if ( oneTry )
- {
- throw e;
- }
- }
- finally
- {
- try
- {
- if ( ssocket != null )
- {
- ssocket.close();
- }
- }
- catch (IOException ioe)
- {
- }
- }
- }
- return null;
-
- }
- /**
- * Stops the ActiveMQ broker. This method waits for 1 second to allow the broker to
- * cleanup objects from JMX Server.
- *
- */
- public void stop()
- {
- Object monitor = new Object();
- if ( service != null )
- {
- try
- {
-// if ( usageListener != null )
-// {
-// service.getMemoryManager().removeUsageListener(usageListener);
-// }
- if ( tcpConnector != null )
- {
- tcpConnector.stop();
- System.out.println("Broker Connector:"+tcpConnector.getUri().toString()+ " is stopped");
- }
- if ( httpConnector != null )
- {
- System.out.println("Broker Stopping HTTP Connector:"+httpConnector.getUri().toString());
- httpConnector.stop();
- System.out.println("Broker Connector:"+httpConnector.getUri().toString()+ " is stopped");
- }
- service.getManagementContext().stop();
- service.stop();
- Broker broker = service.getBroker();
- while( !broker.isStopped() )
- {
- synchronized( monitor )
- {
- try
- {
- monitor.wait(20); // wait for the broker to terminate
- }
- catch( Exception e) { }
- }
-
- }
- System.out.println("Broker is stopped");
- broker = null;
- service = null;
- }
- catch( Exception e) { e.printStackTrace();}
- }
- }
- /**
- * Callback method invoked by Spring container during its lifecycle changes
- * Ignore all events except for ContextClosedEvent which indicates the container
- * has shutdown. In this case, stop the internal ActiveMQ broker.
- *
- * @param anEvent - an event object
- */
- public void onApplicationEvent(ApplicationEvent anEvent)
- {
- if ( anEvent instanceof ContextClosedEvent)
- {
+ } finally {
+ try {
+ if (ssocket != null) {
+ ssocket.close();
+ }
+ } catch (IOException ioe) {
+ }
+ }
+ }
+ return null;
+
+ }
+
+ /**
+ * Stops the ActiveMQ broker. This method waits for 1 second to allow the broker to cleanup
+ * objects from JMX Server.
+ *
+ */
+ public void stop() {
+ Object monitor = new Object();
+ if (service != null) {
+ try {
+ // if ( usageListener != null )
+ // {
+ // service.getMemoryManager().removeUsageListener(usageListener);
+ // }
+ if (tcpConnector != null) {
+ tcpConnector.stop();
+ System.out
+ .println("Broker Connector:" + tcpConnector.getUri().toString() + " is stopped");
+ }
+ if (httpConnector != null) {
+ System.out.println("Broker Stopping HTTP Connector:" + httpConnector.getUri().toString());
+ httpConnector.stop();
+ System.out.println("Broker Connector:" + httpConnector.getUri().toString()
+ + " is stopped");
+ }
+ service.getManagementContext().stop();
+ service.stop();
+ Broker broker = service.getBroker();
+ while (!broker.isStopped()) {
+ synchronized (monitor) {
+ try {
+ monitor.wait(20); // wait for the broker to terminate
+ } catch (Exception e) {
+ }
+ }
+
+ }
+ System.out.println("Broker is stopped");
+ broker = null;
+ service = null;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Callback method invoked by Spring container during its lifecycle changes Ignore all events
+ * except for ContextClosedEvent which indicates the container has shutdown. In this case, stop
+ * the internal ActiveMQ broker.
+ *
+ * @param anEvent
+ * - an event object
+ */
+ public void onApplicationEvent(ApplicationEvent anEvent) {
+ if (anEvent instanceof ContextClosedEvent) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "onApplicationEvent", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_terminated__INFO",
- new Object[] {(( ContextClosedEvent)anEvent).getApplicationContext().getDisplayName()});
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "onApplicationEvent",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_container_terminated__INFO",
+ new Object[] { ((ContextClosedEvent) anEvent).getApplicationContext()
+ .getDisplayName() });
}
- stop();
+ stop();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "onApplicationEvent", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_broker_stopped__INFO",
- new Object[] {brokerURI});
+ "onApplicationEvent", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_broker_stopped__INFO", new Object[] { brokerURI });
}
- }
- }
-
+ }
+ }
}
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java Wed Sep 2 15:16:23 2009
@@ -41,151 +41,164 @@
import org.springframework.jms.listener.SessionAwareMessageListener;
/**
- * Message listener injected at runtime into Aggregate to handle a race condition
- * when multiple threads simultaneously process messages from a Cas Multiplier.
- * It is only used to process messages from a Cas Multiplier and only if the reply
- * queue has more than one consumer thread configured in a deployment descriptor.
- * The listener creates a pool of threads equal to the number of concurrent consumers
- * defined in the DD for the listener on the reply queue. Once the message is handled
- * in onMessage(), it is than delegated for processing to one of the available threads
- * from the pool.
- *
- * This listener guarantees processing order. It receives messages from Spring
- * in a single thread and if it finds a child CAS in the message, it increments the
- * parent (input) CAS child count and delegates processing to the InputChannel instance.
+ * Message listener injected at runtime into Aggregate to handle a race condition when multiple
+ * threads simultaneously process messages from a Cas Multiplier. It is only used to process
+ * messages from a Cas Multiplier and only if the reply queue has more than one consumer thread
+ * configured in a deployment descriptor. The listener creates a pool of threads equal to the number
+ * of concurrent consumers defined in the DD for the listener on the reply queue. Once the message
+ * is handled in onMessage(), it is than delegated for processing to one of the available threads
+ * from the pool.
+ *
+ * This listener guarantees processing order. It receives messages from Spring in a single thread
+ * and if it finds a child CAS in the message, it increments the parent (input) CAS child count and
+ * delegates processing to the InputChannel instance.
+ *
+ * The race condition: The Cas Multiplier sends the last child and the parent almost at the same
+ * time. Both are received by the aggregate and are processed in different threads, if a scaleout is
+ * used on the reply queue. One thread may start processing the input CAS while the other thread
+ * (with the last child) is not yet allowed to run. The first thread takes the input CAS all the way
+ * to the final step and since at this time, the input CAS has no children ( the thread processing
+ * the last child has not updated the child count yet), it will be prematurely released. When the
+ * thread with the last child is allowed to run, it finds that the parent no longer exists in the
+ * cache.
*
- * The race condition:
- * The Cas Multiplier sends the last child and the parent almost at the same time.
- * Both are received by the aggregate and are processed in different threads, if a
- * scaleout is used on the reply queue. One thread may start processing the input CAS
- * while the other thread (with the last child) is not yet allowed to run. The first
- * thread takes the input CAS all the way to the final step and since at this time,
- * the input CAS has no children ( the thread processing the last child has not updated
- * the child count yet), it will be prematurely released. When the thread with the last
- * child is allowed to run, it finds that the parent no longer exists in the cache.
*
- *
*/
public class ConcurrentMessageListener implements SessionAwareMessageListener {
private static final Class CLASS_NAME = ConcurrentMessageListener.class;
private SessionAwareMessageListener delegateListener;
- private int concurrentThreadCount=0;
+
+ private int concurrentThreadCount = 0;
+
private AnalysisEngineController controller;
+
private ThreadPoolExecutor executor = null;
+
private LinkedBlockingQueue<Runnable> workQueue;
+
private CountDownLatch controllerLatch = new CountDownLatch(1);
-
+
/**
* Creates a listener with a given number of process threads. This listener is injected between
- * Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only
- * used on reply queues that have scale out attribute in DD greater than 1. Its main job is
- * to increment number of child CASes for a given input CAS. It does so in a single thread, and
- * once it completes the update this listener submits the CAS for further processing up to the
+ * Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only used on
+ * reply queues that have scale out attribute in DD greater than 1. Its main job is to increment
+ * number of child CASes for a given input CAS. It does so in a single thread, and once it
+ * completes the update this listener submits the CAS for further processing up to the
* JmsInputChannel. The CAS is submitted to a queue where the executor assigns a free thread to
- * process the CAS.
+ * process the CAS.
*
- * @param concurrentThreads - number of threads to use to process CASes
- * @param delegateListener - JmsInputChannel instance to delegate CAS to
+ * @param concurrentThreads
+ * - number of threads to use to process CASes
+ * @param delegateListener
+ * - JmsInputChannel instance to delegate CAS to
* @throws InvalidClassException
*/
- public ConcurrentMessageListener( int concurrentThreads, Object delegateListener) throws InvalidClassException {
- if ( !(delegateListener instanceof SessionAwareMessageListener) ) {
- throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"+SessionAwareMessageListener.class+" Received:"+delegateListener.getClass());
+ public ConcurrentMessageListener(int concurrentThreads, Object delegateListener)
+ throws InvalidClassException {
+ if (!(delegateListener instanceof SessionAwareMessageListener)) {
+ throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"
+ + SessionAwareMessageListener.class + " Received:" + delegateListener.getClass());
}
concurrentThreadCount = concurrentThreads;
- this.delegateListener = (SessionAwareMessageListener)delegateListener;
- if ( concurrentThreads > 1 ) {
- workQueue = new LinkedBlockingQueue<Runnable>(concurrentThreadCount);
+ this.delegateListener = (SessionAwareMessageListener) delegateListener;
+ if (concurrentThreads > 1) {
+ workQueue = new LinkedBlockingQueue<Runnable>(concurrentThreadCount);
executor = new ThreadPoolExecutor(concurrentThreads, concurrentThreads, Long.MAX_VALUE,
TimeUnit.NANOSECONDS, workQueue);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.prestartAllCoreThreads();
}
}
-
+
public void stop() {
- if ( executor != null ) {
+ if (executor != null) {
executor.shutdownNow();
- while( !executor.isTerminated()) {
+ while (!executor.isTerminated()) {
try {
- executor.awaitTermination(200,TimeUnit.MILLISECONDS);
- } catch( InterruptedException e) {
+ executor.awaitTermination(200, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
break;
}
}
}
}
+
public void setAnalysisEngineController(AnalysisEngineController controller) {
this.controller = controller;
controllerLatch.countDown();
}
- private boolean isMessageFromCasMultiplier(final Message message ) throws JMSException {
+
+ private boolean isMessageFromCasMultiplier(final Message message) throws JMSException {
return message.propertyExists(AsynchAEMessage.CasSequence);
}
+
/**
- * Intercept a message to increment a child count of the input CAS.
- * This method is always called in a single thread, guaranteeing order of processing.
- * The child CAS will always come here first. Once the count is updated, or this
- * CAS is not an child, the message will be delegated to one of the threads in the pool
- * that will eventually call InputChannel object where the actual processing of the
- * message begins.
+ * Intercept a message to increment a child count of the input CAS. This method is always called
+ * in a single thread, guaranteeing order of processing. The child CAS will always come here
+ * first. Once the count is updated, or this CAS is not an child, the message will be delegated to
+ * one of the threads in the pool that will eventually call InputChannel object where the actual
+ * processing of the message begins.
*
*/
public void onMessage(final Message message, final Session session) throws JMSException {
try {
- // Wait until the controller is plugged in
+ // Wait until the controller is plugged in
controllerLatch.await();
- } catch( InterruptedException e) {}
- if ( isMessageFromCasMultiplier(message)) {
- // Check if the message came from a Cas Multiplier and it contains a new Process Request
+ } catch (InterruptedException e) {
+ }
+ if (isMessageFromCasMultiplier(message)) {
+ // Check if the message came from a Cas Multiplier and it contains a new Process Request
int command = message.getIntProperty(AsynchAEMessage.Command);
int messageType = message.getIntProperty(AsynchAEMessage.MessageType);
- // Intercept Cas Process Request from a Cas Multiplier
- if (command == AsynchAEMessage.Process && messageType == AsynchAEMessage.Request ) {
- String msgFrom = (String)message.getStringProperty(AsynchAEMessage.MessageFrom);
- if ( msgFrom != null && controller instanceof AggregateAnalysisEngineController ) {
- String delegateKey =((AggregateAnalysisEngineController)controller).lookUpDelegateKey(msgFrom);
- if ( delegateKey != null ) {
- Delegate delegate = ((AggregateAnalysisEngineController)controller).lookupDelegate(delegateKey);
+ // Intercept Cas Process Request from a Cas Multiplier
+ if (command == AsynchAEMessage.Process && messageType == AsynchAEMessage.Request) {
+ String msgFrom = (String) message.getStringProperty(AsynchAEMessage.MessageFrom);
+ if (msgFrom != null && controller instanceof AggregateAnalysisEngineController) {
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(msgFrom);
+ if (delegateKey != null) {
+ Delegate delegate = ((AggregateAnalysisEngineController) controller)
+ .lookupDelegate(delegateKey);
delegate.setConcurrentConsumersOnReplyQueue();
}
}
try {
- String parentCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
- // Fetch parent CAS entry from the local cache
+ String parentCasReferenceId = message
+ .getStringProperty(AsynchAEMessage.InputCasReference);
+ // Fetch parent CAS entry from the local cache
CasStateEntry parentEntry = controller.getLocalCache().lookupEntry(parentCasReferenceId);
- // increment number of child CASes this parent has in play
+ // increment number of child CASes this parent has in play
parentEntry.incrementSubordinateCasInPlayCount();
} catch (Exception e) {
e.printStackTrace();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
- new Object[] { e });
+ new Object[] { e });
}
}
}
-
+
}
- if ( concurrentThreadCount > 1 ) {
- // Delegate meesage to the JmsInputChannel
+ if (concurrentThreadCount > 1) {
+ // Delegate meesage to the JmsInputChannel
executor.execute(new Runnable() {
public void run() {
try {
delegateListener.onMessage(message, session);
- } catch( Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
- new Object[] { e });
+ "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
});
} else {
- // Just handoff the message to the InputChannel object
+ // Just handoff the message to the InputChannel object
delegateListener.onMessage(message, session);
}
}
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Sep 2 15:16:23 2009
@@ -52,120 +52,116 @@
import org.apache.uima.util.Level;
import org.springframework.util.Assert;
-public class JmsEndpointConnection_impl implements ConsumerListener
-{
- private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
+public class JmsEndpointConnection_impl implements ConsumerListener {
+ private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
- private Destination destination;
+ private Destination destination;
- private Session producerSession;
+ private Session producerSession;
- private MessageProducer producer;
+ private MessageProducer producer;
- private BrokerConnectionEntry brokerDestinations;
-
- private String serverUri;
+ private BrokerConnectionEntry brokerDestinations;
- private String endpoint;
+ private String serverUri;
- private String endpointName;
+ private String endpoint;
+
+ private String endpointName;
private Endpoint delegateEndpoint;
- private volatile boolean retryEnabled;
+ private volatile boolean retryEnabled;
+
+ private AnalysisEngineController controller = null;
+
+ private volatile boolean connectionAborted = false;
+
+ protected static long connectionCreationTimestamp = 0L;
+
+ private Object semaphore = new Object();
- private AnalysisEngineController controller = null;
+ private boolean isReplyEndpoint;
- private volatile boolean connectionAborted = false;
+ private volatile boolean failed = false;
- protected static long connectionCreationTimestamp = 0L;
+ private Object recoveryMux = new Object();
- private Object semaphore = new Object();
-
- private boolean isReplyEndpoint;
-
- private volatile boolean failed = false;
-
- private Object recoveryMux = new Object();
-
- private final String componentName;
-
- public JmsEndpointConnection_impl(BrokerConnectionEntry aBrokerDestinationMap, Endpoint anEndpoint, AnalysisEngineController aController)
- {
+ private final String componentName;
+
+ public JmsEndpointConnection_impl(BrokerConnectionEntry aBrokerDestinationMap,
+ Endpoint anEndpoint, AnalysisEngineController aController) {
brokerDestinations = aBrokerDestinationMap;
- serverUri = anEndpoint.getServerURI();
+ serverUri = anEndpoint.getServerURI();
isReplyEndpoint = anEndpoint.isReplyEndpoint();
controller = aController;
- if ( ( anEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint ) &&
- anEndpoint.getDestination() != null &&
- anEndpoint.getDestination() instanceof ActiveMQDestination )
- {
- endpoint = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName();
- }
- else
- {
- endpoint = anEndpoint.getEndpoint();
- }
- anEndpoint.remove();
- componentName = controller.getComponentName();
- delegateEndpoint = anEndpoint;
- }
-
- public boolean isRetryEnabled()
- {
- return retryEnabled;
- }
-
- public void setRetryEnabled(boolean retryEnabled)
- {
- this.retryEnabled = retryEnabled;
- }
-
-
- public boolean isOpen()
- {
- if ( failed ||
- producerSession == null ||
- brokerDestinations.getConnection() == null ||
- ((ActiveMQConnection)brokerDestinations.getConnection()).isClosed() ||
- ((ActiveMQConnection)brokerDestinations.getConnection()).isClosing() ||
- ((ActiveMQConnection)brokerDestinations.getConnection()).isTransportFailed())
- {
- return false;
- }
- return ((ActiveMQSession) producerSession).isRunning();
- }
+ if ((anEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
+ && anEndpoint.getDestination() != null
+ && anEndpoint.getDestination() instanceof ActiveMQDestination) {
+ endpoint = ((ActiveMQDestination) anEndpoint.getDestination()).getPhysicalName();
+ } else {
+ endpoint = anEndpoint.getEndpoint();
+ }
+ anEndpoint.remove();
+ componentName = controller.getComponentName();
+ delegateEndpoint = anEndpoint;
+ }
+
+ public boolean isRetryEnabled() {
+ return retryEnabled;
+ }
+
+ public void setRetryEnabled(boolean retryEnabled) {
+ this.retryEnabled = retryEnabled;
+ }
+
+ public boolean isOpen() {
+ if (failed || producerSession == null || brokerDestinations.getConnection() == null
+ || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()
+ || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosing()
+ || ((ActiveMQConnection) brokerDestinations.getConnection()).isTransportFailed()) {
+ return false;
+ }
+ return ((ActiveMQSession) producerSession).isRunning();
+ }
+
private void openChannel() throws AsynchAEException, ServiceShutdownException {
openChannel(getServerUri(), componentName, endpoint, controller);
}
- private synchronized void openChannel(String brokerUri, String aComponentName, String anEndpointName, AnalysisEngineController aController) throws AsynchAEException, ServiceShutdownException
- {
- try
- {
- // If replying to http request, reply to a queue managed by this service broker using tcp protocol
- if ( isReplyEndpoint && brokerUri.startsWith("http") && aController != null && aController.getInputChannel() != null )
- {
- org.apache.uima.aae.InputChannel iC = aController.getInputChannel(aController.getName());
- if ( ( brokerUri.trim().length() == 0 ) && iC != null )
- {
- brokerUri = iC.getServiceInfo().getBrokerURL();
- }
-
+ private synchronized void openChannel(String brokerUri, String aComponentName,
+ String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
+ ServiceShutdownException {
+ try {
+ // If replying to http request, reply to a queue managed by this service broker using tcp
+ // protocol
+ if (isReplyEndpoint && brokerUri.startsWith("http") && aController != null
+ && aController.getInputChannel() != null) {
+ org.apache.uima.aae.InputChannel iC = aController.getInputChannel(aController.getName());
+ if ((brokerUri.trim().length() == 0) && iC != null) {
+ brokerUri = iC.getServiceInfo().getBrokerURL();
+ }
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "open", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_override_connection_to_endpoint__FINE", new Object[] { aComponentName, getEndpoint(), aController.getInputChannel().getServerUri() });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "open",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_override_connection_to_endpoint__FINE",
+ new Object[] { aComponentName, getEndpoint(),
+ aController.getInputChannel().getServerUri() });
}
- }
+ }
-
- if ( !isOpen() ) {
+ if (!isOpen()) {
Connection conn = null;
- if (brokerDestinations.getConnection() == null ) {
+ if (brokerDestinations.getConnection() == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_activemq_open__FINE",
- new Object[] { anEndpointName, brokerUri });
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_activemq_open__FINE", new Object[] { anEndpointName, brokerUri });
}
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
conn = factory.createConnection();
@@ -178,236 +174,203 @@
failed = false;
}
Connection conn = brokerDestinations.getConnection();
- if ( failed ) {
- // Unable to create a connection
+ if (failed) {
+ // Unable to create a connection
return;
}
producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
- if ( (delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint ) && delegateEndpoint.getDestination() != null )
- {
- producer = producerSession.createProducer(null);
- if ( aController != null )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_temp_conn_starting__FINE",
- new Object[] { aComponentName, anEndpointName, brokerUri });
- }
- }
- }
- else
- {
- destination = producerSession.createQueue(getEndpoint());
- producer = producerSession.createProducer(destination);
- if ( controller != null )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_conn_starting__FINE",
- new Object[] { aComponentName, anEndpointName, brokerUri });
- }
- }
- }
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // Since the connection is shared, start it only once
- if ( !((ActiveMQConnection)brokerDestinations.getConnection()).isStarted() ) {
- brokerDestinations.getConnection().start();
- }
- if ( controller != null )
- {
+
+ if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
+ && delegateEndpoint.getDestination() != null) {
+ producer = producerSession.createProducer(null);
+ if (aController != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_temp_conn_starting__FINE",
+ new Object[] { aComponentName, anEndpointName, brokerUri });
+ }
+ }
+ } else {
+ destination = producerSession.createQueue(getEndpoint());
+ producer = producerSession.createProducer(destination);
+ if (controller != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_conn_starting__FINE",
+ new Object[] { aComponentName, anEndpointName, brokerUri });
+ }
+ }
+ }
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ // Since the connection is shared, start it only once
+ if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) {
+ brokerDestinations.getConnection().start();
+ }
+ if (controller != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_conn_started__FINE",
- new Object[] { endpoint, brokerUri });
- if ( controller.getInputChannel() != null ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connection_open_to_endpoint__FINE", new Object[] { aComponentName, getEndpoint(), brokerUri });
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri });
+ if (controller.getInputChannel() != null) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_connection_open_to_endpoint__FINE",
+ new Object[] { aComponentName, getEndpoint(), brokerUri });
}
}
- }
- failed = false;
- }
- catch ( Exception e)
- {
- boolean rethrow = true;
- e.printStackTrace();
-
- if ( e instanceof JMSException ) {
- rethrow = handleJmsException( (JMSException)e );
-
- }
- if ( rethrow ) {
+ }
+ failed = false;
+ } catch (Exception e) {
+ boolean rethrow = true;
+ e.printStackTrace();
+
+ if (e instanceof JMSException) {
+ rethrow = handleJmsException((JMSException) e);
+
+ }
+ if (rethrow) {
throw new AsynchAEException(e);
}
- }
- }
- public synchronized void open() throws AsynchAEException, ServiceShutdownException {
- open( delegateEndpoint.getEndpoint(), serverUri);
- }
- public synchronized void open(String brokerUri, String anEndpointName) throws AsynchAEException, ServiceShutdownException
- {
+ }
+ }
+
+ public synchronized void open() throws AsynchAEException, ServiceShutdownException {
+ open(delegateEndpoint.getEndpoint(), serverUri);
+ }
+
+ public synchronized void open(String brokerUri, String anEndpointName) throws AsynchAEException,
+ ServiceShutdownException {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "open", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open__FINE",
- new Object[] { anEndpointName, brokerUri });
- }
- if ( !connectionAborted )
- {
- openChannel();
- }
-
- }
-
- public synchronized void abort()
- {
- connectionAborted = true;
- brokerDestinations.getConnectionTimer().stopTimer();
- try {
- this.close();
- } catch( Exception e) {
- }
- }
-
- public synchronized void close() throws Exception
- {
- if (producer != null)
- {
- try
- {
- producer.close();
- }
- catch ( Exception e)
- {
- // Ignore we are shutting down
- }
- }
- if (producerSession != null)
- {
- try
- {
- producerSession.close();
- }
- catch ( Exception e)
- {
- // Ignore we are shutting down
- }
- producerSession = null;
- }
- if (destination != null)
- {
- destination = null;
- }
- }
-
- protected String getEndpoint()
- {
- return endpoint;
- }
-
- protected void setEndpoint(String endpoint)
- {
- this.endpoint = endpoint;
- }
-
- protected synchronized String getServerUri()
- {
- return serverUri;
- }
-
- protected synchronized void setServerUri(String serverUri)
- {
- this.serverUri = serverUri;
- }
-
- public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException
- {
- Assert.notNull(producerSession);
- boolean done = false;
- int retryCount = 4;
- while (retryCount > 0)
- {
- try {
- retryCount--;
-
- if (aTextMessage == null)
- {
- return producerSession.createTextMessage();
- }
- else
- {
- return producerSession.createTextMessage(aTextMessage);
- }
-
- } catch ( javax.jms.IllegalStateException e) {
- try {
- open();
- } catch ( ServiceShutdownException ex) {
- ex.printStackTrace();
- } catch( AsynchAEException ex) {
-
- throw ex;
- }
- }
- catch ( Exception e) {
- throw new AsynchAEException(e);
- }
- }
- throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
- }
- public BytesMessage produceByteMessage() throws AsynchAEException
- {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "open",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open__FINE",
+ new Object[] { anEndpointName, brokerUri });
+ }
+ if (!connectionAborted) {
+ openChannel();
+ }
+
+ }
+
+ public synchronized void abort() {
+ connectionAborted = true;
+ brokerDestinations.getConnectionTimer().stopTimer();
+ try {
+ this.close();
+ } catch (Exception e) {
+ }
+ }
+
+ public synchronized void close() throws Exception {
+ if (producer != null) {
+ try {
+ producer.close();
+ } catch (Exception e) {
+ // Ignore we are shutting down
+ }
+ }
+ if (producerSession != null) {
+ try {
+ producerSession.close();
+ } catch (Exception e) {
+ // Ignore we are shutting down
+ }
+ producerSession = null;
+ }
+ if (destination != null) {
+ destination = null;
+ }
+ }
+
+ protected String getEndpoint() {
+ return endpoint;
+ }
+
+ protected void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ protected synchronized String getServerUri() {
+ return serverUri;
+ }
+
+ protected synchronized void setServerUri(String serverUri) {
+ this.serverUri = serverUri;
+ }
+
+ public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException {
Assert.notNull(producerSession);
boolean done = false;
int retryCount = 4;
- while (retryCount > 0)
- {
- try
- {
+ while (retryCount > 0) {
+ try {
retryCount--;
- return producerSession.createBytesMessage();
- }
- catch ( javax.jms.IllegalStateException e)
- {
- try
- {
+
+ if (aTextMessage == null) {
+ return producerSession.createTextMessage();
+ } else {
+ return producerSession.createTextMessage(aTextMessage);
+ }
+
+ } catch (javax.jms.IllegalStateException e) {
+ try {
open();
+ } catch (ServiceShutdownException ex) {
+ ex.printStackTrace();
+ } catch (AsynchAEException ex) {
+
+ throw ex;
}
- catch ( ServiceShutdownException ex)
- {
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+ throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
+ }
+
+ public BytesMessage produceByteMessage() throws AsynchAEException {
+ Assert.notNull(producerSession);
+ boolean done = false;
+ int retryCount = 4;
+ while (retryCount > 0) {
+ try {
+ retryCount--;
+ return producerSession.createBytesMessage();
+ } catch (javax.jms.IllegalStateException e) {
+ try {
+ open();
+ } catch (ServiceShutdownException ex) {
ex.printStackTrace();
}
- }
- catch ( Exception e)
- {
+ } catch (Exception e) {
throw new AsynchAEException(e);
}
}
- throw new AsynchAEException(new InvalidMessageException("Unable to produce BytesMessage Object"));
+ throw new AsynchAEException(
+ new InvalidMessageException("Unable to produce BytesMessage Object"));
+ }
+
+ public ObjectMessage produceObjectMessage() throws AsynchAEException {
+ Assert.notNull(producerSession);
+
+ try {
+ if (!((ActiveMQSession) producerSession).isRunning()) {
+ open();
+ }
+ return producerSession.createObjectMessage();
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
}
- public ObjectMessage produceObjectMessage() throws AsynchAEException
- {
- Assert.notNull(producerSession);
-
- try
- {
- if (!((ActiveMQSession) producerSession).isRunning())
- {
- open();
- }
- return producerSession.createObjectMessage();
- }
- catch ( Exception e)
- {
- throw new AsynchAEException(e);
- }
- }
-
- private boolean delayCasDelivery(int msgType, Message aMessage, int command) throws Exception {
-
+ private boolean delayCasDelivery(int msgType, Message aMessage, int command) throws Exception {
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "recoverSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "recoverSession",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE",
+ new Object[] { getEndpoint() });
}
openChannel();
// The connection has been successful. Now check if we need to create a new listener
@@ -415,156 +378,172 @@
// endpoint for the delegate is marked as FAILED. This will be the case if the listener
// on the reply queue for the endpoint has failed.
String endpointName = delegateEndpoint.getEndpoint();
- synchronized( recoveryMux ) {
- if ( controller instanceof AggregateAnalysisEngineController ) {
+ synchronized (recoveryMux) {
+ if (controller instanceof AggregateAnalysisEngineController) {
// Using the queue name lookup the delegate key
- String key = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpointName);
- if ( key != null && destination != null && !isReplyEndpoint ) {
+ String key = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(endpointName);
+ if (key != null && destination != null && !isReplyEndpoint) {
// For Process Requests check the state of the delegate that is to receive
// the CAS. If the delegate state = TIMEOUT_STATE, push the CAS id onto
- // delegate's list of delayed CASes. The state of the delegate was
+ // delegate's list of delayed CASes. The state of the delegate was
// changed to TIMEOUT when a previous CAS timed out.
- if (msgType != AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
+ if (msgType != AsynchAEMessage.Request && command == AsynchAEMessage.Process) {
String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
- if ( casReferenceId != null &&
- ((AggregateAnalysisEngineController)controller).delayCasIfDelegateInTimedOutState(casReferenceId, endpointName) ) {
+ if (casReferenceId != null
+ && ((AggregateAnalysisEngineController) controller)
+ .delayCasIfDelegateInTimedOutState(casReferenceId, endpointName)) {
return true;
}
}
- // The aggregate has a master list of endpoints which are typically cloned during processing
+ // The aggregate has a master list of endpoints which are typically cloned during
+ // processing
// This object uses a copy of the master. When a listener fails, the status of the master
// endpoint is changed. To check the status, fetch the master endpoint, check its status
// and if marked as FAILED, create a new listener, a new temp queue and override this
// object endpoint copy destination property. It *IS* a new replyTo temp queue.
- Endpoint masterEndpoint = ((AggregateAnalysisEngineController)controller).lookUpEndpoint(key, false);
- if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
+ Endpoint masterEndpoint = ((AggregateAnalysisEngineController) controller)
+ .lookUpEndpoint(key, false);
+ if (masterEndpoint.getStatus() == Endpoint.FAILED) {
// Create a new Listener Object to receive replies
createListener(key);
- destination = (Destination)masterEndpoint.getDestination();
- delegateEndpoint.setDestination(destination);
- // Override the reply destination. A new listener has been created along with a new temp queue for replies.
+ destination = (Destination) masterEndpoint.getDestination();
+ delegateEndpoint.setDestination(destination);
+ // Override the reply destination. A new listener has been created along with a new temp
+ // queue for replies.
aMessage.setJMSReplyTo(destination);
}
}
}
}
return false;
-
- }
- public boolean send(final Message aMessage, long msgSize, boolean startTimer)
- {
- String destinationName = "";
-
- try
- {
- int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
- int command = aMessage.getIntProperty(AsynchAEMessage.Command);
-
- if ( failed || brokerDestinations.getConnection() == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
- {
- if ( delayCasDelivery(msgType, aMessage, command) ) {
- // Return true as if the CAS was sent
- return true;
- }
- }
-
- // Send a reply to a queue provided by the client
-
- // Stop messages and replies are sent to the endpoint provided in the destination object
- if ( (command == AsynchAEMessage.Stop || command == AsynchAEMessage.ReleaseCAS || isReplyEndpoint) && delegateEndpoint.getDestination() != null )
- {
- destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
- if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
- }
- logMessageSize(aMessage, msgSize, destinationName);
- synchronized(producer)
- {
- producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
- }
- }
- else
- {
- destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
- if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
- }
- logMessageSize(aMessage, msgSize, destinationName);
- synchronized(producer)
- {
- producer.send(aMessage);
- }
- }
- // Starts a timer on a broker connection. Every time a new message
- // is sent to a destination managed by the broker the timer is
- // restarted. The main purpose of the timer is to close connections
- // that are not used.
- if (startTimer) {
- brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp, delegateEndpoint);
- }
- // Succeeded sending the CAS
- return true;
- }
- catch ( Exception e)
- {
- // If the controller has been stopped no need to send messages
- if ( controller.isStopped())
- {
- return true;
- }
- else
- {
- if ( e instanceof JMSException ) {
- handleJmsException( (JMSException)e );
- } else {
- e.printStackTrace();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { componentName, e});
- }
- }
-
- }
- }
+
+ }
+
+ public boolean send(final Message aMessage, long msgSize, boolean startTimer) {
+ String destinationName = "";
+
+ try {
+ int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+
+ if (failed || brokerDestinations.getConnection() == null || producerSession == null
+ || !((ActiveMQSession) producerSession).isRunning()) {
+ if (delayCasDelivery(msgType, aMessage, command)) {
+ // Return true as if the CAS was sent
+ return true;
+ }
+ }
+
+ // Send a reply to a queue provided by the client
+
+ // Stop messages and replies are sent to the endpoint provided in the destination object
+ if ((command == AsynchAEMessage.Stop || command == AsynchAEMessage.ReleaseCAS || isReplyEndpoint)
+ && delegateEndpoint.getDestination() != null) {
+ destinationName = ((ActiveMQDestination) delegateEndpoint.getDestination())
+ .getPhysicalName();
+ if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+ new Object[] { destinationName });
+ }
+ logMessageSize(aMessage, msgSize, destinationName);
+ synchronized (producer) {
+ producer.send((Destination) delegateEndpoint.getDestination(), aMessage);
+ }
+ } else {
+ destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
+ if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+ new Object[] { destinationName });
+ }
+ logMessageSize(aMessage, msgSize, destinationName);
+ synchronized (producer) {
+ producer.send(aMessage);
+ }
+ }
+ // Starts a timer on a broker connection. Every time a new message
+ // is sent to a destination managed by the broker the timer is
+ // restarted. The main purpose of the timer is to close connections
+ // that are not used.
+ if (startTimer) {
+ brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
+ delegateEndpoint);
+ }
+ // Succeeded sending the CAS
+ return true;
+ } catch (Exception e) {
+ // If the controller has been stopped no need to send messages
+ if (controller.isStopped()) {
+ return true;
+ } else {
+ if (e instanceof JMSException) {
+ handleJmsException((JMSException) e);
+ } else {
+ e.printStackTrace();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "send",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
+ new Object[] { componentName, e });
+ }
+ }
+
+ }
+ }
brokerDestinations.getConnectionTimer().stopTimer();
- // Failed here
- return false;
- }
+ // Failed here
+ return false;
+ }
- private void logMessageSize( Message aMessage, long msgSize, String destinationName ) {
- if ( UIMAFramework.getLogger().isLoggable(Level.FINE)) {
+ private void logMessageSize(Message aMessage, long msgSize, String destinationName) {
+ if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
boolean isReply = false;
- if ( isReplyEndpoint ) {
+ if (isReplyEndpoint) {
isReply = true;
}
- String type="Text";
- if ( aMessage instanceof BytesMessage ) {
+ String type = "Text";
+ if (aMessage instanceof BytesMessage) {
type = "Binary";
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "logMessageSize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_msg_size__FINE", new Object[] { componentName, isReply==true?"Reply":"Request", "Binary", destinationName,msgSize});
- } else if ( aMessage instanceof TextMessage ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "logMessageSize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_msg_size__FINE", new Object[] { componentName, isReply==true?"Reply":"Request", "XMI", destinationName,msgSize});
- }
- }
- }
- /**
- * This method is called during recovery of failed connection. It is only called if the endpoint
- * associated with a given delegate is marked as FAILED. It is marked that way when a listener
- * attached to the reply queue fails. This method creates a new listener and a new temp queue.
- *
- * @param delegateKey
- * @throws Exception
- */
- private void createListener(String delegateKey) throws Exception {
- if ( controller instanceof AggregateAnalysisEngineController ) {
- // Fetch an InputChannel that handles messages for a given delegate
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "logMessageSize",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_msg_size__FINE",
+ new Object[] { componentName, isReply == true ? "Reply" : "Request", "Binary",
+ destinationName, msgSize });
+ } else if (aMessage instanceof TextMessage) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "logMessageSize",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_msg_size__FINE",
+ new Object[] { componentName, isReply == true ? "Reply" : "Request", "XMI",
+ destinationName, msgSize });
+ }
+ }
+ }
+
+ /**
+ * This method is called during recovery of failed connection. It is only called if the endpoint
+ * associated with a given delegate is marked as FAILED. It is marked that way when a listener
+ * attached to the reply queue fails. This method creates a new listener and a new temp queue.
+ *
+ * @param delegateKey
+ * @throws Exception
+ */
+ private void createListener(String delegateKey) throws Exception {
+ if (controller instanceof AggregateAnalysisEngineController) {
+ // Fetch an InputChannel that handles messages for a given delegate
InputChannel iC = controller.getReplyInputChannel(delegateKey);
- // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
+ // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
iC.createListener(delegateKey);
}
- }
+ }
- private synchronized boolean handleJmsException( JMSException ex) {
+ private synchronized boolean handleJmsException(JMSException ex) {
if (!failed) {
failed = true;
}
@@ -583,7 +562,8 @@
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_send_failed_deleted_queue_INFO", new Object[] { componentName, destName, serverUri });
+ "UIMAJMS_send_failed_deleted_queue_INFO",
+ new Object[] { componentName, destName, serverUri });
}
controller.addEndpointToDoNotProcessList(delegateEndpoint.getDestination().toString());
return false;
@@ -605,24 +585,21 @@
}
ex.printStackTrace();
}
- } catch ( Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
}
return true;
- }
- public void onConsumerEvent(ConsumerEvent arg0)
- {
- if (controller != null)
- {
- controller.handleDelegateLifeCycleEvent(getEndpoint(), arg0.getConsumerCount());
- }
- }
+ }
+
+ public void onConsumerEvent(ConsumerEvent arg0) {
+ if (controller != null) {
+ controller.handleDelegateLifeCycleEvent(getEndpoint(), arg0.getConsumerCount());
+ }
+ }
- protected synchronized void finalize() throws Throwable
- {
+ protected synchronized void finalize() throws Throwable {
brokerDestinations.getConnectionTimer().stopTimer();
- }
-
-
+ }
+
}