You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2010/10/06 17:00:19 UTC
svn commit: r1005070 - in /openejb/trunk/openejb3/container:
openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/
openejb-core/src/main/java/org/apache/openejb/resource/activemq/
Author: andygumbrecht
Date: Wed Oct 6 15:00:18 2010
New Revision: 1005070
URL: http://svn.apache.org/viewvc?rev=1005070&view=rev
Log:
The MQ RA may be called upon to start more than one broker, but only ever one for a given URI - Added this support to start and stop.
Modified:
openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
Modified: openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java?rev=1005070&r1=1005069&r2=1005070&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java (original)
+++ openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java Wed Oct 6 15:00:18 2010
@@ -28,60 +28,107 @@ import javax.naming.Context;
import javax.naming.NamingException;
import javax.sql.DataSource;
import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import org.apache.openejb.util.LogCategory;
public class ActiveMQ4Factory implements BrokerFactory.BrokerFactoryHandler {
+
private static final ThreadLocal<Properties> threadProperties = new ThreadLocal<Properties>();
+ private static final Map<URI, BrokerService> brokers = new HashMap<URI, BrokerService>();
+ private static Throwable throwable = null;
public static void setThreadProperties(Properties value) {
threadProperties.set(value);
}
- public BrokerService createBroker(URI brokerURI) throws Exception {
- URI uri = new URI(brokerURI.getRawSchemeSpecificPart());
- BrokerService broker = BrokerFactory.createBroker(uri);
-
- Properties properties = getLowerCaseProperties();
-
- Object value = properties.get("datasource");
- if (value instanceof String && value.toString().length() == 0) {
- value = null;
- }
+ @Override
+ public synchronized BrokerService createBroker(URI brokerURI) throws Exception {
- if (value != null) {
- DataSource dataSource;
- if (value instanceof DataSource) {
- dataSource = (DataSource) value;
- } else {
- String resouceId = (String) value;
+ BrokerService broker = brokers.get(brokerURI);
- try {
- ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
- Context context = containerSystem.getJNDIContext();
- Object obj = context.lookup("openejb/Resource/" + resouceId);
- if (!(obj instanceof DataSource)) {
- throw new IllegalArgumentException("Resource with id " + resouceId +
- " is not a DataSource, but is " + obj.getClass().getName());
+ if (null == broker) {
+
+ URI uri = new URI(brokerURI.getRawSchemeSpecificPart());
+ broker = BrokerFactory.createBroker(uri);
+ brokers.put(brokerURI, broker);
+
+ if (!uri.getScheme().toLowerCase().startsWith("xbean")) {
+
+ Properties properties = getLowerCaseProperties();
+
+ Object value = properties.get("datasource");
+ if (value instanceof String && value.toString().length() == 0) {
+ value = null;
+ }
+
+ if (value != null) {
+ DataSource dataSource;
+ if (value instanceof DataSource) {
+ dataSource = (DataSource) value;
+ } else {
+ String resouceId = (String) value;
+
+ try {
+ ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
+ Context context = containerSystem.getJNDIContext();
+ Object obj = context.lookup("openejb/Resource/" + resouceId);
+ if (!(obj instanceof DataSource)) {
+ throw new IllegalArgumentException("Resource with id " + resouceId
+ + " is not a DataSource, but is " + obj.getClass().getName());
+ }
+ dataSource = (DataSource) obj;
+ } catch (NamingException e) {
+ throw new IllegalArgumentException("Unknown datasource " + resouceId);
+ }
}
- dataSource = (DataSource) obj;
- } catch (NamingException e) {
- throw new IllegalArgumentException("Unknown datasource " + resouceId);
+
+ JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+ persistenceAdapter.setDataSource(dataSource);
+ broker.setPersistenceAdapter(persistenceAdapter);
+ } else {
+ MemoryPersistenceAdapter persistenceAdapter = new MemoryPersistenceAdapter();
+ broker.setPersistenceAdapter(persistenceAdapter);
}
}
- JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
- persistenceAdapter.setDataSource(dataSource);
- broker.setPersistenceAdapter(persistenceAdapter);
- } else {
- MemoryPersistenceAdapter persistenceAdapter = new MemoryPersistenceAdapter();
- broker.setPersistenceAdapter(persistenceAdapter);
+ if (!broker.isStarted()) {
+
+ final BrokerService bs = broker;
+
+ final Thread start = new Thread("ActiveMQFactory start and checkpoint") {
+
+ @Override
+ public void run() {
+ try {
+ //Start before returning - this is known to be safe.
+ bs.start();
+ } catch (Throwable t) {
+ throwable = t;
+ }
+ }
+ };
+
+ start.setDaemon(true);
+ start.start();
+
+ try {
+ start.join(5000);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+
+ if (null != throwable) {
+ org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").error("ActiveMQ failed to start within 5 seconds - It may not be usable", throwable);
+ }
+ }
}
return broker;
}
-
private Properties getLowerCaseProperties() {
Properties properties = threadProperties.get();
Properties newProperties = new Properties();
@@ -96,4 +143,8 @@ public class ActiveMQ4Factory implements
}
return newProperties;
}
+
+ public Collection<BrokerService> getBrokers() {
+ return brokers.values();
+ }
}
Modified: openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java?rev=1005070&r1=1005069&r2=1005070&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java (original)
+++ openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java Wed Oct 6 15:00:18 2010
@@ -30,14 +30,15 @@ import javax.naming.Context;
import javax.naming.NamingException;
import javax.sql.DataSource;
import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class ActiveMQ5Factory implements BrokerFactoryHandler {
- public static final String XBEAN = "xbean";
private static final ThreadLocal<Properties> threadProperties = new ThreadLocal<Properties>();
- private static BrokerService broker = null;
+ private static final Map<URI,BrokerService> brokers = new HashMap<URI, BrokerService>();
private static Throwable throwable = null;
public static void setThreadProperties(Properties value) {
@@ -45,106 +46,108 @@ public class ActiveMQ5Factory implements
}
@Override
- public BrokerService createBroker(final URI brokerURI) throws Exception {
+ public synchronized BrokerService createBroker(final URI brokerURI) throws Exception {
- final URI uri = new URI(brokerURI.getRawSchemeSpecificPart());
- broker = BrokerFactory.createBroker(uri);
+ BrokerService broker = brokers.get(brokerURI);
- if (!uri.getScheme().toLowerCase().startsWith(XBEAN)) {
-
- Properties properties = getLowerCaseProperties();
-
- Object value = properties.get("datasource");
- if (value instanceof String && value.toString().length() == 0) {
- value = null;
- }
+ if (null == broker) {
- if (value != null) {
- DataSource dataSource;
- if (value instanceof DataSource) {
- dataSource = (DataSource) value;
- } else {
- String resouceId = (String) value;
+ final URI uri = new URI(brokerURI.getRawSchemeSpecificPart());
+ broker = BrokerFactory.createBroker(uri);
+ brokers.put(brokerURI, broker);
+
+ if (!uri.getScheme().toLowerCase().startsWith("xbean")) {
+
+ Properties properties = getLowerCaseProperties();
+
+ Object value = properties.get("datasource");
+ if (value instanceof String && value.toString().length() == 0) {
+ value = null;
+ }
+
+ if (value != null) {
+ DataSource dataSource;
+ if (value instanceof DataSource) {
+ dataSource = (DataSource) value;
+ } else {
+ String resouceId = (String) value;
- try {
- ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
- Context context = containerSystem.getJNDIContext();
- Object obj = context.lookup("openejb/Resource/" + resouceId);
- if (!(obj instanceof DataSource)) {
- throw new IllegalArgumentException("Resource with id " + resouceId
- + " is not a DataSource, but is " + obj.getClass().getName());
+ try {
+ ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
+ Context context = containerSystem.getJNDIContext();
+ Object obj = context.lookup("openejb/Resource/" + resouceId);
+ if (!(obj instanceof DataSource)) {
+ throw new IllegalArgumentException("Resource with id " + resouceId
+ + " is not a DataSource, but is " + obj.getClass().getName());
+ }
+ dataSource = (DataSource) obj;
+ } catch (NamingException e) {
+ throw new IllegalArgumentException("Unknown datasource " + resouceId);
}
- dataSource = (DataSource) obj;
- } catch (NamingException e) {
- throw new IllegalArgumentException("Unknown datasource " + resouceId);
}
+
+ JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+
+ if (properties.containsKey("usedatabaselock")) {
+ //This must be false for hsqldb
+ persistenceAdapter.setUseDatabaseLock(Boolean.parseBoolean(properties.getProperty("usedatabaselock", "true")));
+ }
+
+ persistenceAdapter.setDataSource(dataSource);
+ broker.setPersistenceAdapter(persistenceAdapter);
+ } else {
+ MemoryPersistenceAdapter persistenceAdapter = new MemoryPersistenceAdapter();
+ broker.setPersistenceAdapter(persistenceAdapter);
}
- JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+ try {
+ //TODO - New in 5.4.x
+ broker.setSchedulerSupport(false);
+ } catch (Throwable t) {
+ //Ignore
+ }
- if (properties.containsKey("usedatabaselock")) {
- //This must be false for hsqldb
- persistenceAdapter.setUseDatabaseLock(Boolean.parseBoolean(properties.getProperty("usedatabaselock", "true")));
- }
-
- persistenceAdapter.setDataSource(dataSource);
- broker.setPersistenceAdapter(persistenceAdapter);
- } else {
- MemoryPersistenceAdapter persistenceAdapter = new MemoryPersistenceAdapter();
- broker.setPersistenceAdapter(persistenceAdapter);
- }
-
- try {
- //TODO - New in 5.4.x
- broker.setSchedulerSupport(false);
- } catch (Throwable t) {
- //Ignore
+ //Notify when an error occurs on shutdown.
+ broker.setUseLoggingForShutdownErrors(org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").isErrorEnabled());
}
- //Notify when an error occurs on shutdown.
- broker.setUseLoggingForShutdownErrors(org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").isErrorEnabled());
- }
+ //We must close the broker
+ broker.setUseShutdownHook(false);
+ broker.setSystemExitOnShutdown(false);
+
+ if (!broker.isStarted()) {
- //We must close the broker
- broker.setUseShutdownHook(false);
- broker.setSystemExitOnShutdown(false);
-
- if (!broker.isStarted()) {
-
- final Thread start = new Thread("ActiveMQFactory start and checkpoint") {
-
- @Override
- public void run() {
- try {
- //Start before returning - this is known to be safe.
- broker.start();
+ final BrokerService bs = broker;
+ final Thread start = new Thread("ActiveMQFactory start and checkpoint") {
+
+ @Override
+ public void run() {
try {
- //This is no longer available from AMQ5.4
- broker.waitUntilStarted();
+ //Start before returning - this is known to be safe.
+ bs.start();
+ bs.waitUntilStarted();
+
+ //Force a checkpoint to initialize pools
+ bs.getPersistenceAdapter().checkpoint(true);
} catch (Throwable t) {
- org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").warning("ActiveMQ waitUntilStarted failed", t);
+ throwable = t;
}
-
- //Force a checkpoint to initialize pools
- broker.getPersistenceAdapter().checkpoint(true);
- } catch (Throwable t) {
- throwable = t;
}
- }
- };
+ };
- start.setDaemon(true);
- start.start();
+ start.setDaemon(true);
+ start.start();
- try {
- start.join(5000);
- } catch (InterruptedException e) {
- //Ignore
- }
+ try {
+ start.join(5000);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
- if (null != throwable) {
- org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").error("ActiveMQ failed to start within 5 seconds - It may not be usable", throwable);
+ if (null != throwable) {
+ org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").error("ActiveMQ failed to start within 5 seconds - It may not be usable", throwable);
+ }
}
}
@@ -166,4 +169,8 @@ public class ActiveMQ5Factory implements
}
return newProperties;
}
+
+ public Collection<BrokerService> getBrokers() {
+ return brokers.values();
+ }
}
Modified: openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java?rev=1005070&r1=1005069&r2=1005070&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java (original)
+++ openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java Wed Oct 6 15:00:18 2010
@@ -22,16 +22,17 @@ import org.apache.activemq.broker.Broker
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
+import java.util.Collection;
import java.util.Properties;
public class ActiveMQFactory {
private static final Method setThreadProperties;
private static final Method createBroker;
+ private static final Method getBrokers;
private static final Object instance;
private static final Class clazz;
private static String brokerPrefix;
- private static BrokerService broker;
static {
@@ -70,6 +71,12 @@ public class ActiveMQFactory {
} catch (NoSuchMethodException e) {
throw new RuntimeException("Unable to create ActiveMQFactory createBroker method", e);
}
+
+ try {
+ getBrokers = clazz.getDeclaredMethod("getBrokers", (Class[]) null);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("Unable to create ActiveMQFactory createBroker method", e);
+ }
}
/**
@@ -96,8 +103,7 @@ public class ActiveMQFactory {
public BrokerService createBroker(final URI brokerURI) throws Exception {
try {
- broker = (BrokerService) createBroker.invoke(instance, brokerURI);
- return broker;
+ return (BrokerService) createBroker.invoke(instance, brokerURI);
} catch (IllegalAccessException e) {
throw new Exception("ActiveMQFactory.createBroker.IllegalAccessException", e);
} catch (IllegalArgumentException e) {
@@ -108,11 +114,19 @@ public class ActiveMQFactory {
}
/**
- * Returns either the configured broker, or null if it has not yet been created.
- * This intended for access upon RA shutdown in order to wait for the broker to finish.
- * @return BrokerService or null
+ * Returns a map of configured brokers.
+ * This intended for access upon RA shutdown in order to wait for the brokers to finish.
+ * @return Map<URI, BrokerService>
*/
- public static BrokerService getBroker() {
- return broker;
+ public static Collection<BrokerService> getBrokers() throws Exception {
+ try {
+ return (Collection<BrokerService>) getBrokers.invoke(instance, (Object[]) null);
+ } catch (IllegalAccessException e) {
+ throw new Exception("ActiveMQFactory.createBroker.IllegalAccessException", e);
+ } catch (IllegalArgumentException e) {
+ throw new Exception("ActiveMQFactory.createBroker.IllegalArgumentException", e);
+ } catch (InvocationTargetException e) {
+ throw new Exception("ActiveMQFactory.createBroker.InvocationTargetException", e);
+ }
}
}
Modified: openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java?rev=1005070&r1=1005069&r2=1005070&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java (original)
+++ openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java Wed Oct 6 15:00:18 2010
@@ -23,7 +23,9 @@ import javax.resource.spi.BootstrapConte
import javax.resource.spi.ResourceAdapterInternalException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collection;
import java.util.Properties;
+import org.apache.activemq.broker.BrokerService;
import org.apache.openejb.util.LogCategory;
public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQResourceAdapter {
@@ -189,14 +191,15 @@ public class ActiveMQResourceAdapter ext
}
}
- private void stopImpl() {
+ private void stopImpl() throws Exception {
super.stop();
- final org.apache.activemq.broker.BrokerService br = ActiveMQFactory.getBroker();
- if (null != br) {
+ final Collection<BrokerService> brokers = ActiveMQFactory.getBrokers();
+
+ for(final BrokerService bs : brokers){
try {
- br.waitUntilStopped();
+ bs.waitUntilStopped();
} catch (Throwable t) {
//Ignore
}