You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2015/11/18 19:53:43 UTC
[6/6] tomee git commit: JMS Connection wrapper & close dangling on
destroy.
JMS Connection wrapper & close dangling on destroy.
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/43dd894b
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/43dd894b
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/43dd894b
Branch: refs/heads/tomee-1.7.x
Commit: 43dd894b04fdb26d705113ace4a93aa294699cea
Parents: a4330bf
Author: AndyGee <an...@gmx.de>
Authored: Wed Nov 18 19:52:59 2015 +0100
Committer: AndyGee <an...@gmx.de>
Committed: Wed Nov 18 19:52:59 2015 +0100
----------------------------------------------------------------------
.../org/apache/openejb/InjectionProcessor.java | 20 +-
.../main/java/org/apache/openejb/OpenEJB.java | 6 +
.../resource/activemq/ActiveMQ5Factory.java | 8 +-
.../activemq/ConnectionFactoryWrapper.java | 66 +++++++
.../resource/activemq/ConnectionWrapper.java | 124 ++++++++++++
.../resource/activemq/SessionWrapper.java | 197 +++++++++++++++++++
.../injection/jms/MessagingBeanTest.java | 8 +-
7 files changed, 424 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java
index 054d0c1..5c64757 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java
@@ -20,6 +20,7 @@ package org.apache.openejb;
import org.apache.openejb.core.ivm.naming.JndiUrlReference;
import org.apache.openejb.injection.FallbackPropertyInjector;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.resource.activemq.ConnectionFactoryWrapper;
import org.apache.openejb.spi.ContainerSystem;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
@@ -28,6 +29,7 @@ import org.apache.xbean.naming.reference.SimpleReference;
import org.apache.xbean.recipe.ObjectRecipe;
import org.apache.xbean.recipe.Option;
+import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.NamingException;
import java.lang.reflect.InvocationTargetException;
@@ -97,6 +99,7 @@ public class InjectionProcessor<T> {
return instance;
}
+ @SuppressWarnings("unchecked")
private void construct() throws OpenEJBException {
if (instance != null) {
throw new IllegalStateException("Instance already constructed");
@@ -179,6 +182,7 @@ public class InjectionProcessor<T> {
}
}
+ @SuppressWarnings("unchecked")
private void fillInjectionProperties(final ObjectRecipe objectRecipe) {
if (injections == null) {
return;
@@ -201,7 +205,7 @@ public class InjectionProcessor<T> {
clazz = suppliedInstance.getClass();
}
- if (context != null) {
+ if (null != context && null != clazz) {
for (final Injection injection : injections) {
if (injection.getTarget() == null) {
continue;
@@ -222,7 +226,13 @@ public class InjectionProcessor<T> {
} catch (final NamingException e) {
if (value instanceof JndiUrlReference) {
try {
- value = SystemInstance.get().getComponent(ContainerSystem.class).getJNDIContext()
+ final ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
+
+ if(null == containerSystem){
+ throw new IllegalStateException("ContainerSystem has not been initialized");
+ }
+
+ value = containerSystem.getJNDIContext()
.lookup(((JndiUrlReference) value).getJndiName());
} catch (final NamingException e1) {
value = null;
@@ -240,6 +250,12 @@ public class InjectionProcessor<T> {
}
if (value != null) {
+
+ if(ConnectionFactory.class.isInstance(value)){
+ //Wrap
+ value = new ConnectionFactoryWrapper(ConnectionFactory.class.cast(value));
+ }
+
final String prefix;
if (usePrefix) {
prefix = injection.getTarget().getName() + "/";
http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java
index 32c761e..a37ed38 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java
@@ -21,6 +21,7 @@ import org.apache.openejb.assembler.classic.DeploymentExceptionManager;
import org.apache.openejb.cdi.CdiBuilder;
import org.apache.openejb.core.ServerFederation;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.resource.activemq.ConnectionFactoryWrapper;
import org.apache.openejb.spi.ApplicationServer;
import org.apache.openejb.spi.Assembler;
import org.apache.openejb.spi.ContainerSystem;
@@ -261,12 +262,17 @@ public final class OpenEJB {
}
public static void destroy() {
+
final Assembler assembler = SystemInstance.get().getComponent(Assembler.class);
+
if (assembler != null) {
assembler.destroy();
} else {
SystemInstance.reset();
}
+
+ ConnectionFactoryWrapper.closeConnections();
+
instance = null;
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
index eb34606..177c1ff 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java
@@ -87,6 +87,7 @@ public class ActiveMQ5Factory implements BrokerFactoryHandler {
final URI uri = new URI(cleanUpUri(brokerURI.getSchemeSpecificPart(), compositeData.getParameters(), params));
broker = BrokerFactory.createBroker(uri);
+ broker.setUseShutdownHook(false);
brokers.put(brokerURI, broker);
if (persistenceAdapter != null) {
@@ -116,9 +117,14 @@ public class ActiveMQ5Factory implements BrokerFactoryHandler {
try {
final ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class);
+
+ if (null == containerSystem) {
+ throw new IllegalArgumentException("ContainerSystem has not been initialized");
+ }
+
final Context context = containerSystem.getJNDIContext();
final Object obj = context.lookup("openejb/Resource/" + resouceId);
- if (!(obj instanceof DataSource)) {
+ if (!DataSource.class.isInstance(obj)) {
throw new IllegalArgumentException("Resource with id " + resouceId
+ " is not a DataSource, but is " + obj.getClass().getName());
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java
new file mode 100644
index 0000000..731f286
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java
@@ -0,0 +1,66 @@
+/**
+ * Tomitribe Confidential
+ * <p/>
+ * Copyright(c) Tomitribe Corporation. 2014
+ * <p/>
+ * The source code for this program is not published or otherwise divested
+ * of its trade secrets, irrespective of what has been deposited with the
+ * U.S. Copyright Office.
+ * <p/>
+ */
+package org.apache.openejb.resource.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ConnectionFactoryWrapper implements ConnectionFactory {
+
+ private static final ArrayList<ConnectionWrapper> connections = new ArrayList<ConnectionWrapper>();
+
+ private final ConnectionFactory factory;
+
+ public ConnectionFactoryWrapper(final ConnectionFactory factory) {
+ this.factory = factory;
+ }
+
+ @Override
+ public Connection createConnection() throws JMSException {
+ return getConnection(factory.createConnection());
+ }
+
+ @Override
+ public Connection createConnection(final String userName, final String password) throws JMSException {
+ return getConnection(factory.createConnection(userName, password));
+ }
+
+ private static Connection getConnection(final Connection connection) {
+ final ConnectionWrapper wrapper = new ConnectionWrapper(connection);
+ connections.add(wrapper);
+ return wrapper;
+ }
+
+ protected static void remove(final ConnectionWrapper connectionWrapper) {
+ connections.remove(connectionWrapper);
+ }
+
+ public static void closeConnections() {
+ final Iterator<ConnectionWrapper> iterator = connections.iterator();
+
+ while (iterator.hasNext()) {
+ final ConnectionWrapper next = iterator.next();
+ iterator.remove();
+ try {
+ next.close();
+ } catch (final Exception e) {
+ //no-op
+ } finally {
+ Logger.getLogger(ConnectionFactoryWrapper.class.getName()).log(Level.SEVERE, "Closed a JMS connection. You have an application that fails to close this connection");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java
new file mode 100644
index 0000000..6dd90ef
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java
@@ -0,0 +1,124 @@
+/**
+ * Tomitribe Confidential
+ * <p/>
+ * Copyright(c) Tomitribe Corporation. 2014
+ * <p/>
+ * The source code for this program is not published or otherwise divested
+ * of its trade secrets, irrespective of what has been deposited with the
+ * U.S. Copyright Office.
+ * <p/>
+ */
+package org.apache.openejb.resource.activemq;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+public class ConnectionWrapper implements Connection {
+
+ private final ArrayList<SessionWrapper> sessions = new ArrayList<SessionWrapper>();
+
+ private final Connection con;
+
+ public ConnectionWrapper(final Connection con) {
+ this.con = con;
+ }
+
+ @Override
+ public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
+ return getSession(con.createSession(transacted, acknowledgeMode));
+ }
+
+ private Session getSession(final Session session) {
+ final SessionWrapper wrapper = new SessionWrapper(this, session);
+ sessions.add(wrapper);
+ return wrapper;
+ }
+
+ protected void remove(final SessionWrapper wrapper) {
+ sessions.remove(wrapper);
+ }
+
+ @Override
+ public String getClientID() throws JMSException {
+ return con.getClientID();
+ }
+
+ @Override
+ public ConnectionConsumer createDurableConnectionConsumer(final Topic topic, final String subscriptionName, final String messageSelector, final ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
+ return con.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages);
+ }
+
+ @Override
+ public ExceptionListener getExceptionListener() throws JMSException {
+ return con.getExceptionListener();
+ }
+
+ @Override
+ public void setClientID(final String clientID) throws JMSException {
+ con.setClientID(clientID);
+ }
+
+ @Override
+ public ConnectionConsumer createConnectionConsumer(final Destination destination, final String messageSelector, final ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
+ return con.createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages);
+ }
+
+ @Override
+ public ConnectionMetaData getMetaData() throws JMSException {
+ return con.getMetaData();
+ }
+
+ @Override
+ public void close() throws JMSException {
+
+ final Iterator<SessionWrapper> iterator = sessions.iterator();
+
+ while (iterator.hasNext()) {
+ final SessionWrapper next = iterator.next();
+ iterator.remove();
+ try {
+ next.close();
+ } catch (final Exception e) {
+ //no-op
+ }
+ }
+
+ try {
+ con.close();
+ } finally {
+ ConnectionFactoryWrapper.remove(this);
+ }
+ }
+
+ @Override
+ public void stop() throws JMSException {
+ con.stop();
+ }
+
+ @Override
+ public void setExceptionListener(final ExceptionListener listener) throws JMSException {
+ con.setExceptionListener(listener);
+ }
+
+ @Override
+ public void start() throws JMSException {
+ con.start();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final ConnectionWrapper that = (ConnectionWrapper) o;
+
+ return con.equals(that.con);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return con.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java
new file mode 100644
index 0000000..f7f5ef8
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java
@@ -0,0 +1,197 @@
+/**
+ * Tomitribe Confidential
+ * <p/>
+ * Copyright(c) Tomitribe Corporation. 2014
+ * <p/>
+ * The source code for this program is not published or otherwise divested
+ * of its trade secrets, irrespective of what has been deposited with the
+ * U.S. Copyright Office.
+ * <p/>
+ */
+package org.apache.openejb.resource.activemq;
+
+import javax.jms.*;
+import java.io.Serializable;
+
+public class SessionWrapper implements Session {
+
+ private final ConnectionWrapper connectionWrapper;
+ private final Session session;
+
+ public SessionWrapper(final ConnectionWrapper connectionWrapper, final Session session) {
+ this.connectionWrapper = connectionWrapper;
+ this.session = session;
+ }
+
+ @Override
+ public BytesMessage createBytesMessage() throws JMSException {
+ return session.createBytesMessage();
+ }
+
+ @Override
+ public TextMessage createTextMessage() throws JMSException {
+ return session.createTextMessage();
+ }
+
+ @Override
+ public Message createMessage() throws JMSException {
+ return session.createMessage();
+ }
+
+ @Override
+ public boolean getTransacted() throws JMSException {
+ return session.getTransacted();
+ }
+
+ @Override
+ public void rollback() throws JMSException {
+ session.rollback();
+ }
+
+ @Override
+ public MessageConsumer createConsumer(final Destination destination, final String messageSelector, final boolean NoLocal) throws JMSException {
+ return session.createConsumer(destination, messageSelector, NoLocal);
+ }
+
+ @Override
+ public QueueBrowser createBrowser(final Queue queue) throws JMSException {
+ return session.createBrowser(queue);
+ }
+
+ @Override
+ public TemporaryQueue createTemporaryQueue() throws JMSException {
+ return session.createTemporaryQueue();
+ }
+
+ @Override
+ public MapMessage createMapMessage() throws JMSException {
+ return session.createMapMessage();
+ }
+
+ @Override
+ public MessageConsumer createConsumer(final Destination destination) throws JMSException {
+ return session.createConsumer(destination);
+ }
+
+ @Override
+ public void close() throws JMSException {
+ try {
+ session.close();
+ } finally {
+ this.connectionWrapper.remove(this);
+ }
+ }
+
+ @Override
+ public void unsubscribe(final String name) throws JMSException {
+ session.unsubscribe(name);
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage(final Serializable object) throws JMSException {
+ return session.createObjectMessage(object);
+ }
+
+ @Override
+ public void run() {
+ session.run();
+ }
+
+ @Override
+ public void recover() throws JMSException {
+ session.recover();
+ }
+
+ @Override
+ public void commit() throws JMSException {
+ session.commit();
+ }
+
+ @Override
+ public int getAcknowledgeMode() throws JMSException {
+ return session.getAcknowledgeMode();
+ }
+
+ @Override
+ public TextMessage createTextMessage(final String text) throws JMSException {
+ return session.createTextMessage(text);
+ }
+
+ @Override
+ public TopicSubscriber createDurableSubscriber(final Topic topic, final String name, final String messageSelector, final boolean noLocal) throws JMSException {
+ return session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() throws JMSException {
+ return session.createObjectMessage();
+ }
+
+ @Override
+ public Topic createTopic(final String topicName) throws JMSException {
+ return session.createTopic(topicName);
+ }
+
+ @Override
+ public void setMessageListener(final MessageListener listener) throws JMSException {
+ session.setMessageListener(listener);
+ }
+
+ @Override
+ public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException {
+ return session.createBrowser(queue, messageSelector);
+ }
+
+ @Override
+ public MessageProducer createProducer(final Destination destination) throws JMSException {
+ return session.createProducer(destination);
+ }
+
+ @Override
+ public Queue createQueue(final String queueName) throws JMSException {
+ return session.createQueue(queueName);
+ }
+
+ @Override
+ public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException {
+ return session.createDurableSubscriber(topic, name);
+ }
+
+ @Override
+ public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException {
+ return session.createConsumer(destination, messageSelector);
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() throws JMSException {
+ return session.createStreamMessage();
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSException {
+ return session.getMessageListener();
+ }
+
+ @Override
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ return session.createTemporaryTopic();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final SessionWrapper that = (SessionWrapper) o;
+
+ return connectionWrapper.equals(that.connectionWrapper) && session.equals(that.session);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = connectionWrapper.hashCode();
+ result = 31 * result + session.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java
----------------------------------------------------------------------
diff --git a/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java
index 2726530..1509541 100644
--- a/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java
+++ b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java
@@ -26,9 +26,10 @@ public class MessagingBeanTest extends TestCase {
public void test() throws Exception {
- final Context context = EJBContainer.createEJBContainer().getContext();
+ final EJBContainer ejbContainer = EJBContainer.createEJBContainer();
+ final Context context = ejbContainer.getContext();
- Messages messages = (Messages) context.lookup("java:global/injection-of-connectionfactory/Messages");
+ final Messages messages = (Messages) context.lookup("java:global/injection-of-connectionfactory/Messages");
messages.sendMessage("Hello World!");
messages.sendMessage("How are you?");
@@ -37,6 +38,9 @@ public class MessagingBeanTest extends TestCase {
assertEquals(messages.receiveMessage(), "Hello World!");
assertEquals(messages.receiveMessage(), "How are you?");
assertEquals(messages.receiveMessage(), "Still spinning?");
+
+ context.close();
+ ejbContainer.close();
}
}
//END SNIPPET: code