You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2015/11/18 19:53:43 UTC

[6/6] tomee git commit: JMS Connection wrapper & close dangling on destroy.

JMS Connection wrapper & close dangling on destroy.


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/43dd894b
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/43dd894b
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/43dd894b

Branch: refs/heads/tomee-1.7.x
Commit: 43dd894b04fdb26d705113ace4a93aa294699cea
Parents: a4330bf
Author: AndyGee <an...@gmx.de>
Authored: Wed Nov 18 19:52:59 2015 +0100
Committer: AndyGee <an...@gmx.de>
Committed: Wed Nov 18 19:52:59 2015 +0100

----------------------------------------------------------------------
 .../org/apache/openejb/InjectionProcessor.java  |  20 +-
 .../main/java/org/apache/openejb/OpenEJB.java   |   6 +
 .../resource/activemq/ActiveMQ5Factory.java     |   8 +-
 .../activemq/ConnectionFactoryWrapper.java      |  66 +++++++
 .../resource/activemq/ConnectionWrapper.java    | 124 ++++++++++++
 .../resource/activemq/SessionWrapper.java       | 197 +++++++++++++++++++
 .../injection/jms/MessagingBeanTest.java        |   8 +-
 7 files changed, 424 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java
index 054d0c1..5c64757 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java
@@ -20,6 +20,7 @@ package org.apache.openejb;
 import org.apache.openejb.core.ivm.naming.JndiUrlReference;
 import org.apache.openejb.injection.FallbackPropertyInjector;
 import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.resource.activemq.ConnectionFactoryWrapper;
 import org.apache.openejb.spi.ContainerSystem;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
@@ -28,6 +29,7 @@ import org.apache.xbean.naming.reference.SimpleReference;
 import org.apache.xbean.recipe.ObjectRecipe;
 import org.apache.xbean.recipe.Option;
 
+import javax.jms.ConnectionFactory;
 import javax.naming.Context;
 import javax.naming.NamingException;
 import java.lang.reflect.InvocationTargetException;
@@ -97,6 +99,7 @@ public class InjectionProcessor<T> {
         return instance;
     }
 
+    @SuppressWarnings("unchecked")
     private void construct() throws OpenEJBException {
         if (instance != null) {
             throw new IllegalStateException("Instance already constructed");
@@ -179,6 +182,7 @@ public class InjectionProcessor<T> {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void fillInjectionProperties(final ObjectRecipe objectRecipe) {
         if (injections == null) {
             return;
@@ -201,7 +205,7 @@ public class InjectionProcessor<T> {
             clazz = suppliedInstance.getClass();
         }
 
-        if (context != null) {
+        if (null != context && null != clazz) {
             for (final Injection injection : injections) {
                 if (injection.getTarget() == null) {
                     continue;
@@ -222,7 +226,13 @@ public class InjectionProcessor<T> {
                         } catch (final NamingException e) {
                             if (value instanceof JndiUrlReference) {
                                 try {
-                                    value = SystemInstance.get().getComponent(ContainerSystem.class).getJNDIContext()
+                                    final ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
+
+                                    if(null == containerSystem){
+                                        throw new IllegalStateException("ContainerSystem has not been initialized");
+                                    }
+
+                                    value = containerSystem.getJNDIContext()
                                         .lookup(((JndiUrlReference) value).getJndiName());
                                 } catch (final NamingException e1) {
                                     value = null;
@@ -240,6 +250,12 @@ public class InjectionProcessor<T> {
                 }
 
                 if (value != null) {
+
+                    if(ConnectionFactory.class.isInstance(value)){
+                        //Wrap
+                        value = new ConnectionFactoryWrapper(ConnectionFactory.class.cast(value));
+                    }
+
                     final String prefix;
                     if (usePrefix) {
                         prefix = injection.getTarget().getName() + "/";

http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java
index 32c761e..a37ed38 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java
@@ -21,6 +21,7 @@ import org.apache.openejb.assembler.classic.DeploymentExceptionManager;
 import org.apache.openejb.cdi.CdiBuilder;
 import org.apache.openejb.core.ServerFederation;
 import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.resource.activemq.ConnectionFactoryWrapper;
 import org.apache.openejb.spi.ApplicationServer;
 import org.apache.openejb.spi.Assembler;
 import org.apache.openejb.spi.ContainerSystem;
@@ -261,12 +262,17 @@ public final class OpenEJB {
     }
 
     public static void destroy() {
+
         final Assembler assembler = SystemInstance.get().getComponent(Assembler.class);
+
         if (assembler != null) {
             assembler.destroy();
         } else {
             SystemInstance.reset();
         }
+
+        ConnectionFactoryWrapper.closeConnections();
+
         instance = null;
     }
 

http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
index eb34606..177c1ff 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
@@ -87,6 +87,7 @@ public class ActiveMQ5Factory implements BrokerFactoryHandler {
 
             final URI uri = new URI(cleanUpUri(brokerURI.getSchemeSpecificPart(), compositeData.getParameters(), params));
             broker = BrokerFactory.createBroker(uri);
+            broker.setUseShutdownHook(false);
             brokers.put(brokerURI, broker);
 
             if (persistenceAdapter != null) {
@@ -116,9 +117,14 @@ public class ActiveMQ5Factory implements BrokerFactoryHandler {
 
                             try {
                                 final ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
+
+                                if (null == containerSystem) {
+                                    throw new IllegalArgumentException("ContainerSystem has not been initialized");
+                                }
+
                                 final Context context = containerSystem.getJNDIContext();
                                 final Object obj = context.lookup("openejb/Resource/" + resouceId);
-                                if (!(obj instanceof DataSource)) {
+                                if (!DataSource.class.isInstance(obj)) {
                                     throw new IllegalArgumentException("Resource with id " + resouceId
                                             + " is not a DataSource, but is " + obj.getClass().getName());
                                 }

http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java
new file mode 100644
index 0000000..731f286
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java
@@ -0,0 +1,66 @@
+/**
+ * Tomitribe Confidential
+ * <p/>
+ * Copyright(c) Tomitribe Corporation. 2014
+ * <p/>
+ * The source code for this program is not published or otherwise divested
+ * of its trade secrets, irrespective of what has been deposited with the
+ * U.S. Copyright Office.
+ * <p/>
+ */
+package org.apache.openejb.resource.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ConnectionFactoryWrapper implements ConnectionFactory {
+
+    private static final ArrayList<ConnectionWrapper> connections = new ArrayList<ConnectionWrapper>();
+
+    private final ConnectionFactory factory;
+
+    public ConnectionFactoryWrapper(final ConnectionFactory factory) {
+        this.factory = factory;
+    }
+
+    @Override
+    public Connection createConnection() throws JMSException {
+        return getConnection(factory.createConnection());
+    }
+
+    @Override
+    public Connection createConnection(final String userName, final String password) throws JMSException {
+        return getConnection(factory.createConnection(userName, password));
+    }
+
+    private static Connection getConnection(final Connection connection) {
+        final ConnectionWrapper wrapper = new ConnectionWrapper(connection);
+        connections.add(wrapper);
+        return wrapper;
+    }
+
+    protected static void remove(final ConnectionWrapper connectionWrapper) {
+        connections.remove(connectionWrapper);
+    }
+
+    public static void closeConnections() {
+        final Iterator<ConnectionWrapper> iterator = connections.iterator();
+
+        while (iterator.hasNext()) {
+            final ConnectionWrapper next = iterator.next();
+            iterator.remove();
+            try {
+                next.close();
+            } catch (final Exception e) {
+                //no-op
+            } finally {
+                Logger.getLogger(ConnectionFactoryWrapper.class.getName()).log(Level.SEVERE, "Closed a JMS connection. You have an application that fails to close this connection");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java
new file mode 100644
index 0000000..6dd90ef
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java
@@ -0,0 +1,124 @@
+/**
+ * Tomitribe Confidential
+ * <p/>
+ * Copyright(c) Tomitribe Corporation. 2014
+ * <p/>
+ * The source code for this program is not published or otherwise divested
+ * of its trade secrets, irrespective of what has been deposited with the
+ * U.S. Copyright Office.
+ * <p/>
+ */
+package org.apache.openejb.resource.activemq;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+public class ConnectionWrapper implements Connection {
+
+    private final ArrayList<SessionWrapper> sessions = new ArrayList<SessionWrapper>();
+
+    private final Connection con;
+
+    public ConnectionWrapper(final Connection con) {
+        this.con = con;
+    }
+
+    @Override
+    public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
+        return getSession(con.createSession(transacted, acknowledgeMode));
+    }
+
+    private Session getSession(final Session session) {
+        final SessionWrapper wrapper = new SessionWrapper(this, session);
+        sessions.add(wrapper);
+        return wrapper;
+    }
+
+    protected void remove(final SessionWrapper wrapper) {
+        sessions.remove(wrapper);
+    }
+
+    @Override
+    public String getClientID() throws JMSException {
+        return con.getClientID();
+    }
+
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(final Topic topic, final String subscriptionName, final String messageSelector, final ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
+        return con.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages);
+    }
+
+    @Override
+    public ExceptionListener getExceptionListener() throws JMSException {
+        return con.getExceptionListener();
+    }
+
+    @Override
+    public void setClientID(final String clientID) throws JMSException {
+        con.setClientID(clientID);
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(final Destination destination, final String messageSelector, final ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
+        return con.createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages);
+    }
+
+    @Override
+    public ConnectionMetaData getMetaData() throws JMSException {
+        return con.getMetaData();
+    }
+
+    @Override
+    public void close() throws JMSException {
+
+        final Iterator<SessionWrapper> iterator = sessions.iterator();
+
+        while (iterator.hasNext()) {
+            final SessionWrapper next = iterator.next();
+            iterator.remove();
+            try {
+                next.close();
+            } catch (final Exception e) {
+                //no-op
+            }
+        }
+
+        try {
+            con.close();
+        } finally {
+            ConnectionFactoryWrapper.remove(this);
+        }
+    }
+
+    @Override
+    public void stop() throws JMSException {
+        con.stop();
+    }
+
+    @Override
+    public void setExceptionListener(final ExceptionListener listener) throws JMSException {
+        con.setExceptionListener(listener);
+    }
+
+    @Override
+    public void start() throws JMSException {
+        con.start();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        final ConnectionWrapper that = (ConnectionWrapper) o;
+
+        return con.equals(that.con);
+
+    }
+
+    @Override
+    public int hashCode() {
+        return con.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java
new file mode 100644
index 0000000..f7f5ef8
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java
@@ -0,0 +1,197 @@
+/**
+ * Tomitribe Confidential
+ * <p/>
+ * Copyright(c) Tomitribe Corporation. 2014
+ * <p/>
+ * The source code for this program is not published or otherwise divested
+ * of its trade secrets, irrespective of what has been deposited with the
+ * U.S. Copyright Office.
+ * <p/>
+ */
+package org.apache.openejb.resource.activemq;
+
+import javax.jms.*;
+import java.io.Serializable;
+
+public class SessionWrapper implements Session {
+
+    private final ConnectionWrapper connectionWrapper;
+    private final Session session;
+
+    public SessionWrapper(final ConnectionWrapper connectionWrapper, final Session session) {
+        this.connectionWrapper = connectionWrapper;
+        this.session = session;
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() throws JMSException {
+        return session.createBytesMessage();
+    }
+
+    @Override
+    public TextMessage createTextMessage() throws JMSException {
+        return session.createTextMessage();
+    }
+
+    @Override
+    public Message createMessage() throws JMSException {
+        return session.createMessage();
+    }
+
+    @Override
+    public boolean getTransacted() throws JMSException {
+        return session.getTransacted();
+    }
+
+    @Override
+    public void rollback() throws JMSException {
+        session.rollback();
+    }
+
+    @Override
+    public MessageConsumer createConsumer(final Destination destination, final String messageSelector, final boolean NoLocal) throws JMSException {
+        return session.createConsumer(destination, messageSelector, NoLocal);
+    }
+
+    @Override
+    public QueueBrowser createBrowser(final Queue queue) throws JMSException {
+        return session.createBrowser(queue);
+    }
+
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        return session.createTemporaryQueue();
+    }
+
+    @Override
+    public MapMessage createMapMessage() throws JMSException {
+        return session.createMapMessage();
+    }
+
+    @Override
+    public MessageConsumer createConsumer(final Destination destination) throws JMSException {
+        return session.createConsumer(destination);
+    }
+
+    @Override
+    public void close() throws JMSException {
+        try {
+            session.close();
+        } finally {
+            this.connectionWrapper.remove(this);
+        }
+    }
+
+    @Override
+    public void unsubscribe(final String name) throws JMSException {
+        session.unsubscribe(name);
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(final Serializable object) throws JMSException {
+        return session.createObjectMessage(object);
+    }
+
+    @Override
+    public void run() {
+        session.run();
+    }
+
+    @Override
+    public void recover() throws JMSException {
+        session.recover();
+    }
+
+    @Override
+    public void commit() throws JMSException {
+        session.commit();
+    }
+
+    @Override
+    public int getAcknowledgeMode() throws JMSException {
+        return session.getAcknowledgeMode();
+    }
+
+    @Override
+    public TextMessage createTextMessage(final String text) throws JMSException {
+        return session.createTextMessage(text);
+    }
+
+    @Override
+    public TopicSubscriber createDurableSubscriber(final Topic topic, final String name, final String messageSelector, final boolean noLocal) throws JMSException {
+        return session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return session.createObjectMessage();
+    }
+
+    @Override
+    public Topic createTopic(final String topicName) throws JMSException {
+        return session.createTopic(topicName);
+    }
+
+    @Override
+    public void setMessageListener(final MessageListener listener) throws JMSException {
+        session.setMessageListener(listener);
+    }
+
+    @Override
+    public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException {
+        return session.createBrowser(queue, messageSelector);
+    }
+
+    @Override
+    public MessageProducer createProducer(final Destination destination) throws JMSException {
+        return session.createProducer(destination);
+    }
+
+    @Override
+    public Queue createQueue(final String queueName) throws JMSException {
+        return session.createQueue(queueName);
+    }
+
+    @Override
+    public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException {
+        return session.createDurableSubscriber(topic, name);
+    }
+
+    @Override
+    public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException {
+        return session.createConsumer(destination, messageSelector);
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() throws JMSException {
+        return session.createStreamMessage();
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return session.getMessageListener();
+    }
+
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        return session.createTemporaryTopic();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        final SessionWrapper that = (SessionWrapper) o;
+
+        return connectionWrapper.equals(that.connectionWrapper) && session.equals(that.session);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = connectionWrapper.hashCode();
+        result = 31 * result + session.hashCode();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java
----------------------------------------------------------------------
diff --git a/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java
index 2726530..1509541 100644
--- a/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java
+++ b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java
@@ -26,9 +26,10 @@ public class MessagingBeanTest extends TestCase {
 
     public void test() throws Exception {
 
-        final Context context = EJBContainer.createEJBContainer().getContext();
+        final EJBContainer ejbContainer = EJBContainer.createEJBContainer();
+        final Context context = ejbContainer.getContext();
 
-        Messages messages = (Messages) context.lookup("java:global/injection-of-connectionfactory/Messages");
+        final Messages messages = (Messages) context.lookup("java:global/injection-of-connectionfactory/Messages");
 
         messages.sendMessage("Hello World!");
         messages.sendMessage("How are you?");
@@ -37,6 +38,9 @@ public class MessagingBeanTest extends TestCase {
         assertEquals(messages.receiveMessage(), "Hello World!");
         assertEquals(messages.receiveMessage(), "How are you?");
         assertEquals(messages.receiveMessage(), "Still spinning?");
+
+        context.close();
+        ejbContainer.close();
     }
 }
 //END SNIPPET: code