You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2010/10/06 17:00:19 UTC

svn commit: r1005070 - in /openejb/trunk/openejb3/container: openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ openejb-core/src/main/java/org/apache/openejb/resource/activemq/

Author: andygumbrecht
Date: Wed Oct  6 15:00:18 2010
New Revision: 1005070

URL: http://svn.apache.org/viewvc?rev=1005070&view=rev
Log:
The MQ RA may be called upon to start more than one broker, but only ever one for a given URI - Added this support to start and stop.

Modified:
    openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java
    openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
    openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java
    openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java

Modified: openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java?rev=1005070&r1=1005069&r2=1005070&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java (original)
+++ openejb/trunk/openejb3/container/openejb-activemq4/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ4Factory.java Wed Oct  6 15:00:18 2010
@@ -28,60 +28,107 @@ import javax.naming.Context;
 import javax.naming.NamingException;
 import javax.sql.DataSource;
 import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import org.apache.openejb.util.LogCategory;
 
 public class ActiveMQ4Factory implements BrokerFactory.BrokerFactoryHandler {
+
     private static final ThreadLocal<Properties> threadProperties = new ThreadLocal<Properties>();
+    private static final Map<URI, BrokerService> brokers = new HashMap<URI, BrokerService>();
+    private static Throwable throwable = null;
 
     public static void setThreadProperties(Properties value) {
         threadProperties.set(value);
     }
 
-    public BrokerService createBroker(URI brokerURI) throws Exception {
-        URI uri = new URI(brokerURI.getRawSchemeSpecificPart());
-        BrokerService broker = BrokerFactory.createBroker(uri);
-
-        Properties properties = getLowerCaseProperties();
-
-        Object value = properties.get("datasource");
-        if (value instanceof String && value.toString().length() == 0) {
-            value = null;
-        }
+    @Override
+    public synchronized BrokerService createBroker(URI brokerURI) throws Exception {
 
-        if (value != null) {
-            DataSource dataSource;
-            if (value instanceof DataSource) {
-                dataSource = (DataSource) value;
-            } else {
-                String resouceId = (String) value;
+        BrokerService broker = brokers.get(brokerURI);
 
-                try {
-                    ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
-                    Context context = containerSystem.getJNDIContext();
-                    Object obj = context.lookup("openejb/Resource/" + resouceId);
-                    if (!(obj instanceof DataSource)) {
-                        throw new IllegalArgumentException("Resource with id " + resouceId +
-                            " is not a DataSource, but is " + obj.getClass().getName());
+        if (null == broker) {
+
+            URI uri = new URI(brokerURI.getRawSchemeSpecificPart());
+            broker = BrokerFactory.createBroker(uri);
+            brokers.put(brokerURI, broker);
+
+            if (!uri.getScheme().toLowerCase().startsWith("xbean")) {
+
+                Properties properties = getLowerCaseProperties();
+
+                Object value = properties.get("datasource");
+                if (value instanceof String && value.toString().length() == 0) {
+                    value = null;
+                }
+
+                if (value != null) {
+                    DataSource dataSource;
+                    if (value instanceof DataSource) {
+                        dataSource = (DataSource) value;
+                    } else {
+                        String resouceId = (String) value;
+
+                        try {
+                            ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
+                            Context context = containerSystem.getJNDIContext();
+                            Object obj = context.lookup("openejb/Resource/" + resouceId);
+                            if (!(obj instanceof DataSource)) {
+                                throw new IllegalArgumentException("Resource with id " + resouceId
+                                    + " is not a DataSource, but is " + obj.getClass().getName());
+                            }
+                            dataSource = (DataSource) obj;
+                        } catch (NamingException e) {
+                            throw new IllegalArgumentException("Unknown datasource " + resouceId);
+                        }
                     }
-                    dataSource = (DataSource) obj;
-                } catch (NamingException e) {
-                    throw new IllegalArgumentException("Unknown datasource " + resouceId);
+
+                    JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+                    persistenceAdapter.setDataSource(dataSource);
+                    broker.setPersistenceAdapter(persistenceAdapter);
+                } else {
+                    MemoryPersistenceAdapter persistenceAdapter = new MemoryPersistenceAdapter();
+                    broker.setPersistenceAdapter(persistenceAdapter);
                 }
             }
 
-            JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
-            persistenceAdapter.setDataSource(dataSource);
-            broker.setPersistenceAdapter(persistenceAdapter);
-        } else {
-            MemoryPersistenceAdapter persistenceAdapter = new MemoryPersistenceAdapter();
-            broker.setPersistenceAdapter(persistenceAdapter);
+            if (!broker.isStarted()) {
+
+                final BrokerService bs = broker;
+
+                final Thread start = new Thread("ActiveMQFactory start and checkpoint") {
+
+                    @Override
+                    public void run() {
+                        try {
+                            //Start before returning - this is known to be safe.
+                            bs.start();
+                        } catch (Throwable t) {
+                            throwable = t;
+                        }
+                    }
+                };
+
+                start.setDaemon(true);
+                start.start();
+
+                try {
+                    start.join(5000);
+                } catch (InterruptedException e) {
+                    //Ignore
+                }
+
+                if (null != throwable) {
+                    org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").error("ActiveMQ failed to start within 5 seconds - It may not be usable", throwable);
+                }
+            }
         }
 
         return broker;
     }
 
-
     private Properties getLowerCaseProperties() {
         Properties properties = threadProperties.get();
         Properties newProperties = new Properties();
@@ -96,4 +143,8 @@ public class ActiveMQ4Factory implements
         }
         return newProperties;
     }
+
+    public Collection<BrokerService> getBrokers() {
+        return brokers.values();
+    }
 }

Modified: openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java?rev=1005070&r1=1005069&r2=1005070&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java (original)
+++ openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java Wed Oct  6 15:00:18 2010
@@ -30,14 +30,15 @@ import javax.naming.Context;
 import javax.naming.NamingException;
 import javax.sql.DataSource;
 import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 public class ActiveMQ5Factory implements BrokerFactoryHandler {
 
-    public static final String XBEAN = "xbean";
     private static final ThreadLocal<Properties> threadProperties = new ThreadLocal<Properties>();
-    private static BrokerService broker = null;
+    private static final Map<URI,BrokerService> brokers = new HashMap<URI, BrokerService>();
     private static Throwable throwable = null;
 
     public static void setThreadProperties(Properties value) {
@@ -45,106 +46,108 @@ public class ActiveMQ5Factory implements
     }
 
     @Override
-    public BrokerService createBroker(final URI brokerURI) throws Exception {
+    public synchronized BrokerService createBroker(final URI brokerURI) throws Exception {
 
-        final URI uri = new URI(brokerURI.getRawSchemeSpecificPart());
-        broker = BrokerFactory.createBroker(uri);
+        BrokerService broker = brokers.get(brokerURI);
 
-        if (!uri.getScheme().toLowerCase().startsWith(XBEAN)) {
-      
-            Properties properties = getLowerCaseProperties();
-
-            Object value = properties.get("datasource");
-            if (value instanceof String && value.toString().length() == 0) {
-                value = null;
-            }
+        if (null == broker) {
 
-            if (value != null) {
-                DataSource dataSource;
-                if (value instanceof DataSource) {
-                    dataSource = (DataSource) value;
-                } else {
-                    String resouceId = (String) value;
+            final URI uri = new URI(brokerURI.getRawSchemeSpecificPart());
+            broker = BrokerFactory.createBroker(uri);
+            brokers.put(brokerURI, broker);
+
+            if (!uri.getScheme().toLowerCase().startsWith("xbean")) {
+
+                Properties properties = getLowerCaseProperties();
+
+                Object value = properties.get("datasource");
+                if (value instanceof String && value.toString().length() == 0) {
+                    value = null;
+                }
+
+                if (value != null) {
+                    DataSource dataSource;
+                    if (value instanceof DataSource) {
+                        dataSource = (DataSource) value;
+                    } else {
+                        String resouceId = (String) value;
 
-                    try {
-                        ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
-                        Context context = containerSystem.getJNDIContext();
-                        Object obj = context.lookup("openejb/Resource/" + resouceId);
-                        if (!(obj instanceof DataSource)) {
-                            throw new IllegalArgumentException("Resource with id " + resouceId
-                                                               + " is not a DataSource, but is " + obj.getClass().getName());
+                        try {
+                            ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
+                            Context context = containerSystem.getJNDIContext();
+                            Object obj = context.lookup("openejb/Resource/" + resouceId);
+                            if (!(obj instanceof DataSource)) {
+                                throw new IllegalArgumentException("Resource with id " + resouceId
+                                                                   + " is not a DataSource, but is " + obj.getClass().getName());
+                            }
+                            dataSource = (DataSource) obj;
+                        } catch (NamingException e) {
+                            throw new IllegalArgumentException("Unknown datasource " + resouceId);
                         }
-                        dataSource = (DataSource) obj;
-                    } catch (NamingException e) {
-                        throw new IllegalArgumentException("Unknown datasource " + resouceId);
                     }
+
+                    JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+
+                    if (properties.containsKey("usedatabaselock")) {
+                        //This must be false for hsqldb
+                        persistenceAdapter.setUseDatabaseLock(Boolean.parseBoolean(properties.getProperty("usedatabaselock", "true")));
+                    }
+
+                    persistenceAdapter.setDataSource(dataSource);
+                    broker.setPersistenceAdapter(persistenceAdapter);
+                } else {
+                    MemoryPersistenceAdapter persistenceAdapter = new MemoryPersistenceAdapter();
+                    broker.setPersistenceAdapter(persistenceAdapter);
                 }
 
-                JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+                try {
+                    //TODO - New in 5.4.x
+                    broker.setSchedulerSupport(false);
+                } catch (Throwable t) {
+                    //Ignore
+                }
 
-                if (properties.containsKey("usedatabaselock")) {
-                    //This must be false for hsqldb
-                    persistenceAdapter.setUseDatabaseLock(Boolean.parseBoolean(properties.getProperty("usedatabaselock", "true")));
-                }
-
-                persistenceAdapter.setDataSource(dataSource);
-                broker.setPersistenceAdapter(persistenceAdapter);
-            } else {
-                MemoryPersistenceAdapter persistenceAdapter = new MemoryPersistenceAdapter();
-                broker.setPersistenceAdapter(persistenceAdapter);
-            }            
-
-            try {
-                //TODO - New in 5.4.x
-                broker.setSchedulerSupport(false);
-            } catch (Throwable t) {
-                //Ignore
+                //Notify when an error occurs on shutdown.
+                broker.setUseLoggingForShutdownErrors(org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").isErrorEnabled());
             }
 
-            //Notify when an error occurs on shutdown.
-            broker.setUseLoggingForShutdownErrors(org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").isErrorEnabled());
-        }
+            //We must close the broker
+            broker.setUseShutdownHook(false);
+            broker.setSystemExitOnShutdown(false);
+
+            if (!broker.isStarted()) {
 
-        //We must close the broker
-        broker.setUseShutdownHook(false);
-        broker.setSystemExitOnShutdown(false);
-
-        if (!broker.isStarted()) {
-
-            final Thread start = new Thread("ActiveMQFactory start and checkpoint") {
-
-                @Override
-                public void run() {
-                    try {
-                        //Start before returning - this is known to be safe.
-                        broker.start();
+                final BrokerService bs = broker;
 
+                final Thread start = new Thread("ActiveMQFactory start and checkpoint") {
+
+                    @Override
+                    public void run() {
                         try {
-                            //This is no longer available from AMQ5.4
-                            broker.waitUntilStarted();
+                            //Start before returning - this is known to be safe.
+                            bs.start();
+                            bs.waitUntilStarted();
+
+                            //Force a checkpoint to initialize pools
+                            bs.getPersistenceAdapter().checkpoint(true);
                         } catch (Throwable t) {
-                            org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").warning("ActiveMQ waitUntilStarted failed", t);
+                            throwable = t;
                         }
-
-                        //Force a checkpoint to initialize pools
-                        broker.getPersistenceAdapter().checkpoint(true);
-                    } catch (Throwable t) {
-                        throwable = t;
                     }
-                }
-            };
+                };
 
-            start.setDaemon(true);
-            start.start();
+                start.setDaemon(true);
+                start.start();
 
-            try {
-                start.join(5000);
-            } catch (InterruptedException e) {
-                //Ignore
-            }
+                try {
+                    start.join(5000);
+                } catch (InterruptedException e) {
+                    //Ignore
+                }
 
-            if (null != throwable) {
-                org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").error("ActiveMQ failed to start within 5 seconds - It may not be usable", throwable);
+                if (null != throwable) {
+                    org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").error("ActiveMQ failed to start within 5 seconds - It may not be usable", throwable);
+                }
             }
         }
 
@@ -166,4 +169,8 @@ public class ActiveMQ5Factory implements
         }
         return newProperties;
     }
+
+    public Collection<BrokerService> getBrokers() {
+        return brokers.values();
+    }
 }

Modified: openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java?rev=1005070&r1=1005069&r2=1005070&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java (original)
+++ openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQFactory.java Wed Oct  6 15:00:18 2010
@@ -22,16 +22,17 @@ import org.apache.activemq.broker.Broker
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
+import java.util.Collection;
 import java.util.Properties;
 
 public class ActiveMQFactory {
 
     private static final Method setThreadProperties;
     private static final Method createBroker;
+    private static final Method getBrokers;
     private static final Object instance;
     private static final Class clazz;
     private static String brokerPrefix;
-    private static BrokerService broker;
 
     static {
 
@@ -70,6 +71,12 @@ public class ActiveMQFactory {
         } catch (NoSuchMethodException e) {
             throw new RuntimeException("Unable to create ActiveMQFactory createBroker method", e);
         }
+
+        try {
+            getBrokers = clazz.getDeclaredMethod("getBrokers", (Class[]) null);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("Unable to create ActiveMQFactory createBroker method", e);
+        }
     }
 
     /**
@@ -96,8 +103,7 @@ public class ActiveMQFactory {
 
     public BrokerService createBroker(final URI brokerURI) throws Exception {
         try {
-            broker = (BrokerService) createBroker.invoke(instance, brokerURI);
-            return broker;
+            return (BrokerService) createBroker.invoke(instance, brokerURI);
         } catch (IllegalAccessException e) {
             throw new Exception("ActiveMQFactory.createBroker.IllegalAccessException", e);
         } catch (IllegalArgumentException e) {
@@ -108,11 +114,19 @@ public class ActiveMQFactory {
     }
 
     /**
-     * Returns either the configured broker, or null if it has not yet been created.
-     * This intended for access upon RA shutdown in order to wait for the broker to finish.
-     * @return BrokerService or null
+     * Returns a map of configured brokers.
+     * This intended for access upon RA shutdown in order to wait for the brokers to finish.
+     * @return Map<URI, BrokerService>
      */
-    public static BrokerService getBroker() {
-        return broker;
+    public static Collection<BrokerService> getBrokers() throws Exception {
+        try {
+            return (Collection<BrokerService>) getBrokers.invoke(instance, (Object[]) null);
+        } catch (IllegalAccessException e) {
+            throw new Exception("ActiveMQFactory.createBroker.IllegalAccessException", e);
+        } catch (IllegalArgumentException e) {
+            throw new Exception("ActiveMQFactory.createBroker.IllegalArgumentException", e);
+        } catch (InvocationTargetException e) {
+            throw new Exception("ActiveMQFactory.createBroker.InvocationTargetException", e);
+        }
     }
 }

Modified: openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java?rev=1005070&r1=1005069&r2=1005070&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java (original)
+++ openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java Wed Oct  6 15:00:18 2010
@@ -23,7 +23,9 @@ import javax.resource.spi.BootstrapConte
 import javax.resource.spi.ResourceAdapterInternalException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
 import java.util.Properties;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.openejb.util.LogCategory;
 
 public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQResourceAdapter {
@@ -189,14 +191,15 @@ public class ActiveMQResourceAdapter ext
         }
     }
 
-    private void stopImpl() {
+    private void stopImpl() throws Exception {
 
         super.stop();
-        final org.apache.activemq.broker.BrokerService br = ActiveMQFactory.getBroker();
 
-        if (null != br) {
+        final Collection<BrokerService> brokers = ActiveMQFactory.getBrokers();
+
+        for(final BrokerService bs : brokers){
             try {
-                br.waitUntilStopped();
+                bs.waitUntilStopped();
             } catch (Throwable t) {
                 //Ignore
             }