You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/08/09 17:47:08 UTC
svn commit: r983692 -
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
Author: cwiklik
Date: Mon Aug 9 15:47:08 2010
New Revision: 983692
URL: http://svn.apache.org/viewvc?rev=983692&view=rev
Log:
UIMA-1855 Fixes lazy initialization problem reported by Findbugs
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java?rev=983692&r1=983691&r2=983692&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java Mon Aug 9 15:47:08 2010
@@ -56,6 +56,8 @@ public class BrokerDeployer implements A
private TransportConnector httpConnector = null;
+ private Object brokerInstanceMux = new Object();
+
public BrokerDeployer(long maxMemoryinBytes) throws Exception {
maxBrokerMemory = maxMemoryinBytes;
startInternalBroker();
@@ -66,138 +68,182 @@ public class BrokerDeployer implements A
}
public BrokerService getBroker() {
- return service;
+ synchronized( brokerInstanceMux ) {
+ return service;
+ }
}
- public void startInternalBroker() throws Exception {
- TransportConnector connector = null;
+ public void startInternalBroker() throws Exception {
+ TransportConnector connector = null;
+ synchronized (brokerInstanceMux) {
+ if (maxBrokerMemory > 0) {
+ System.out
+ .println("Configuring Internal Broker With Max Memory Of:"
+ + maxBrokerMemory);
+ if (UIMAFramework.getLogger(CLASS_NAME)
+ .isLoggable(Level.CONFIG)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG,
+ CLASS_NAME.getName(), "startInternalBroker",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_broker_memory__CONFIG",
+ new Object[] { maxBrokerMemory });
+ }
+ }
+ String[] connectors = service.getNetworkConnectorURIs();
+ if (connectors != null) {
+ for (int i = 0; i < connectors.length; i++) {
+ System.out
+ .println("ActiveMQ Broker Started With Connector:"
+ + connectors[i]);
+ }
+ brokerURI = service.getMasterConnectorURI();
+ } else {
+
+ String connectorList = "";
+ service.setPersistent(false);
+ int startPort = BASE_JMX_PORT;
+ if (System.getProperties().containsKey(
+ "com.sun.management.jmxremote.port")) {
+ startPort = Integer.parseInt(System
+ .getProperty("com.sun.management.jmxremote.port"));
+
+ }
+ while (startPort < MAX_PORT_THRESHOLD && !openPort(startPort)) {
+ startPort++;
+ }
+ if (startPort < (startPort + MAX_PORT_THRESHOLD)) {
+ MBeanServer jmxServer = null;
+ // Check if the MBeanServer is available. If it is, plug it
+ // into the
+ // local Broker. We only need one MBeanServer in the JVM
+ if ((jmxServer = ManagementContext.findTigerMBeanServer()) != null) {
+ System.out
+ .println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger.");
+ service.getManagementContext()
+ .setMBeanServer(jmxServer);
+ // Specify JMX Port
+ service.getManagementContext().setConnectorPort(
+ startPort);
+ } else {
+ service.getManagementContext().setConnectorPort(
+ startPort);
+ service.setUseJmx(true);
+ }
+
+ System.setProperty("com.sun.management.jmxremote.port",
+ String.valueOf(startPort));
+
+ System.out
+ .println("JMX Console connect URI: service:jmx:rmi:///jndi/rmi://localhost:"
+ + startPort + "/jmxrmi");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
+ Level.CONFIG)) {
+ UIMAFramework
+ .getLogger(CLASS_NAME)
+ .logrb(
+ Level.CONFIG,
+ CLASS_NAME.getName(),
+ "startInternalBroker",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_jmx_uri__CONFIG",
+ new Object[] { "service:jmx:rmi:///jndi/rmi://localhost:"
+ + startPort + "/jmxrmi" });
+ }
+ }
+
+ brokerURI = generateInternalURI("tcp", 18810, true, false);
+
+ // Wait until sucessfull adding connector to the broker
+ // Sleeps for 1sec between retries until success
+ int timeBetweenRetries = 1000;
+ boolean tcpConnectorAcquiredValidPort = false;
+ while (!tcpConnectorAcquiredValidPort) {
+ try {
+ tcpConnector = service.addConnector(brokerURI);
+ tcpConnectorAcquiredValidPort = true;
+ } catch (Exception e) {
+ synchronized (this) {
+ wait(timeBetweenRetries);
+ }
+ } // silence InstanceAlreadyExistsException
+
+ }
+ System.out.println("Adding TCP Connector:"
+ + tcpConnector.getConnectUri());
+ if (UIMAFramework.getLogger(CLASS_NAME)
+ .isLoggable(Level.CONFIG)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.CONFIG,
+ CLASS_NAME.getName(),
+ "startInternalBroker",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_adding_connector__CONFIG",
+ new Object[] { "Adding TCP Connector",
+ tcpConnector.getConnectUri() });
+ }
+ connectorList = tcpConnector.getName();
+ if (System.getProperty("StompSupport") != null) {
+ String stompURI = generateInternalURI("stomp", 61613,
+ false, false);
+ connector = service.addConnector(stompURI);
+ System.out.println("Adding STOMP Connector:"
+ + connector.getConnectUri());
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
+ Level.CONFIG)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.CONFIG,
+ CLASS_NAME.getName(),
+ "startInternalBroker",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_adding_connector__CONFIG",
+ new Object[] { "Adding STOMP Connector",
+ connector.getConnectUri() });
+ }
+ connectorList += "," + connector.getName();
+ }
+ if (System.getProperty("HTTP") != null) {
+ String stringPort = System.getProperty("HTTP");
+ int p = Integer.parseInt(stringPort);
+ String httpURI = generateInternalURI("http", p, false, true);
+ httpConnector = service.addConnector(httpURI);
+ System.out.println("Adding HTTP Connector:"
+ + httpConnector.getConnectUri());
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
+ Level.CONFIG)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.CONFIG,
+ CLASS_NAME.getName(),
+ "startInternalBroker",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_adding_connector__CONFIG",
+ new Object[] { "Adding HTTP Connector",
+ httpConnector.getConnectUri() });
+ }
+ connectorList += "," + httpConnector.getName();
+ }
+ service.start();
+ System.setProperty("ActiveMQConnectors", connectorList);
+ System.out.println("Broker Service Started - URL:"
+ + service.getVmConnectorURI());
+ if (UIMAFramework.getLogger(CLASS_NAME)
+ .isLoggable(Level.CONFIG)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG,
+ CLASS_NAME.getName(), "startInternalBroker",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_broker_started__CONFIG",
+ new Object[] { service.getVmConnectorURI() });
+ }
+ }
+ }
+
+ // Allow the connectors some time to start
+ synchronized (semaphore) {
+ semaphore.wait(1000);
+ }
+ // System.out.println("JMX Server Port:"+service.getManagementContext().getRmiServerPort());
+ // setConnectorPort(startPort);
- if (maxBrokerMemory > 0) {
- System.out.println("Configuring Internal Broker With Max Memory Of:" + maxBrokerMemory);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_broker_memory__CONFIG", new Object[] { maxBrokerMemory });
- }
- }
- String[] connectors = service.getNetworkConnectorURIs();
- if (connectors != null) {
- for (int i = 0; i < connectors.length; i++) {
- System.out.println("ActiveMQ Broker Started With Connector:" + connectors[i]);
- }
- brokerURI = service.getMasterConnectorURI();
- } else {
-
- String connectorList = "";
- service.setPersistent(false);
- int startPort = BASE_JMX_PORT;
- if (System.getProperties().containsKey("com.sun.management.jmxremote.port")) {
- startPort = Integer.parseInt(System.getProperty("com.sun.management.jmxremote.port"));
-
- }
- while (startPort < MAX_PORT_THRESHOLD && !openPort(startPort)) {
- startPort++;
- }
- if (startPort < (startPort + MAX_PORT_THRESHOLD)) {
- MBeanServer jmxServer = null;
- // Check if the MBeanServer is available. If it is, plug it into the
- // local Broker. We only need one MBeanServer in the JVM
- if ((jmxServer = ManagementContext.findTigerMBeanServer()) != null) {
- System.out.println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger.");
- service.getManagementContext().setMBeanServer(jmxServer);
- // Specify JMX Port
- service.getManagementContext().setConnectorPort(startPort);
- } else {
- service.getManagementContext().setConnectorPort(startPort);
- service.setUseJmx(true);
- }
-
- System.setProperty("com.sun.management.jmxremote.port", String.valueOf(startPort));
-
- System.out.println("JMX Console connect URI: service:jmx:rmi:///jndi/rmi://localhost:"
- + startPort + "/jmxrmi");
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
- UIMAFramework.getLogger(CLASS_NAME)
- .logrb(
- Level.CONFIG,
- CLASS_NAME.getName(),
- "startInternalBroker",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_jmx_uri__CONFIG",
- new Object[] { "service:jmx:rmi:///jndi/rmi://localhost:" + startPort
- + "/jmxrmi" });
- }
- }
-
- brokerURI = generateInternalURI("tcp", 18810, true, false);
-
- // Wait until sucessfull adding connector to the broker
- // Sleeps for 1sec between retries until success
- int timeBetweenRetries = 1000;
- boolean tcpConnectorAcquiredValidPort = false;
- while (!tcpConnectorAcquiredValidPort) {
- try {
- tcpConnector = service.addConnector(brokerURI);
- tcpConnectorAcquiredValidPort = true;
- } catch (Exception e) {
- synchronized (this) {
- wait(timeBetweenRetries);
- }
- } // silence InstanceAlreadyExistsException
-
- }
- System.out.println("Adding TCP Connector:" + tcpConnector.getConnectUri());
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_adding_connector__CONFIG",
- new Object[] { "Adding TCP Connector", tcpConnector.getConnectUri() });
- }
- connectorList = tcpConnector.getName();
- if (System.getProperty("StompSupport") != null) {
- String stompURI = generateInternalURI("stomp", 61613, false, false);
- connector = service.addConnector(stompURI);
- System.out.println("Adding STOMP Connector:" + connector.getConnectUri());
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_adding_connector__CONFIG",
- new Object[] { "Adding STOMP Connector", connector.getConnectUri() });
- }
- connectorList += "," + connector.getName();
- }
- if (System.getProperty("HTTP") != null) {
- String stringPort = System.getProperty("HTTP");
- int p = Integer.parseInt(stringPort);
- String httpURI = generateInternalURI("http", p, false, true);
- httpConnector = service.addConnector(httpURI);
- System.out.println("Adding HTTP Connector:" + httpConnector.getConnectUri());
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_adding_connector__CONFIG",
- new Object[] { "Adding HTTP Connector", httpConnector.getConnectUri() });
- }
- connectorList += "," + httpConnector.getName();
- }
- service.start();
- System.setProperty("ActiveMQConnectors", connectorList);
- System.out.println("Broker Service Started - URL:" + service.getVmConnectorURI());
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
- "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_broker_started__CONFIG", new Object[] { service.getVmConnectorURI() });
- }
- }
-
- // Allow the connectors some time to start
- synchronized (semaphore) {
- semaphore.wait(1000);
- }
- // System.out.println("JMX Server Port:"+service.getManagementContext().getRmiServerPort());
- // setConnectorPort(startPort);
-
- }
+ }
private boolean openPort(int aPort) {
ServerSocket ssocket = null;
@@ -285,50 +331,59 @@ public class BrokerDeployer implements A
* objects from JMX Server.
*
*/
- public void stop() {
- Object monitor = new Object();
- if (service != null) {
- try {
- // if ( usageListener != null )
- // {
- // service.getMemoryManager().removeUsageListener(usageListener);
- // }
- if (tcpConnector != null) {
- tcpConnector.stop();
- System.out
- .println("Broker Connector:" + tcpConnector.getUri().toString() + " is stopped");
- }
- if (httpConnector != null) {
- System.out.println("Broker Stopping HTTP Connector:" + httpConnector.getUri().toString());
- httpConnector.stop();
- System.out.println("Broker Connector:" + httpConnector.getUri().toString()
- + " is stopped");
- }
- service.getManagementContext().stop();
- service.stop();
- Broker broker = service.getBroker();
- while (!broker.isStopped()) {
- synchronized (monitor) {
- try {
- monitor.wait(20); // wait for the broker to terminate
- } catch (Exception e) {
- }
- }
+ public void stop() {
+ Object monitor = new Object();
+ synchronized (brokerInstanceMux) {
+ if (service != null) {
+ try {
+ // if ( usageListener != null )
+ // {
+ // service.getMemoryManager().removeUsageListener(usageListener);
+ // }
+ if (tcpConnector != null) {
+ tcpConnector.stop();
+ System.out.println("Broker Connector:"
+ + tcpConnector.getUri().toString()
+ + " is stopped");
+ }
+ if (httpConnector != null) {
+ System.out.println("Broker Stopping HTTP Connector:"
+ + httpConnector.getUri().toString());
+ httpConnector.stop();
+ System.out.println("Broker Connector:"
+ + httpConnector.getUri().toString()
+ + " is stopped");
+ }
+ service.getManagementContext().stop();
+ service.stop();
+ Broker broker = service.getBroker();
+ while (!broker.isStopped()) {
+ synchronized (monitor) {
+ try {
+ monitor.wait(20); // wait for the broker to
+ // terminate
+ } catch (Exception e) {
+ }
+ }
+
+ }
+ System.out.println("Broker is stopped");
+ broker = null;
+ service = null;
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
+ Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.WARNING, CLASS_NAME.getName(), "stop",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
+ }
- }
- System.out.println("Broker is stopped");
- broker = null;
- service = null;
- } catch (Exception e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
+ }
+ }
- }
- }
- }
+ }
+ }
/**
* Callback method invoked by Spring container during its lifecycle changes Ignore all events