You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by rm...@apache.org on 2017/03/09 19:09:57 UTC

tomee git commit: TOMEE-2021 allow to control through JMX MDB listening state

Repository: tomee
Updated Branches:
  refs/heads/master 933bd899d -> 9c661758a


TOMEE-2021 allow to control through JMX MDB listening state


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/9c661758
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/9c661758
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/9c661758

Branch: refs/heads/master
Commit: 9c661758aeff813d05748e91d01c1e522c09c0a4
Parents: 933bd89
Author: rmannibucau <rm...@apache.org>
Authored: Thu Mar 9 20:09:50 2017 +0100
Committer: rmannibucau <rm...@apache.org>
Committed: Thu Mar 9 20:09:50 2017 +0100

----------------------------------------------------------------------
 .../apache/openejb/core/mdb/MdbContainer.java   |  21 ++-
 .../activemq/ActiveMQResourceAdapter.java       | 179 ++++++++++++++++++
 .../ActiveMQResourceAdapterControlTest.java     | 187 +++++++++++++++++++
 3 files changed, 386 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
index 389d228..a17f342 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
@@ -71,6 +71,9 @@ import static org.apache.openejb.core.transaction.EjbTransactionUtil.handleSyste
 
 public class MdbContainer implements RpcContainer {
     private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources");
+
+    private static final ThreadLocal<BeanContext> CURRENT = new ThreadLocal<>();
+
     private static final Object[] NO_ARGS = new Object[0];
 
     private final Object containerID;
@@ -183,6 +186,7 @@ public class MdbContainer implements RpcContainer {
         }
 
         // activate the endpoint
+        CURRENT.set(beanContext);
         try {
             resourceAdapter.endpointActivation(endpointFactory, activationSpec);
         } catch (final ResourceException e) {
@@ -192,6 +196,8 @@ public class MdbContainer implements RpcContainer {
             deployments.remove(deploymentId);
 
             throw new OpenEJBException(e);
+        } finally {
+            CURRENT.remove();
         }
     }
 
@@ -276,7 +282,12 @@ public class MdbContainer implements RpcContainer {
         try {
             final EndpointFactory endpointFactory = (EndpointFactory) beanContext.getContainerData();
             if (endpointFactory != null) {
-                resourceAdapter.endpointDeactivation(endpointFactory, endpointFactory.getActivationSpec());
+                CURRENT.set(beanContext);
+                try {
+                    resourceAdapter.endpointDeactivation(endpointFactory, endpointFactory.getActivationSpec());
+                } finally {
+                    CURRENT.remove();
+                }
 
                 final MBeanServer server = LocalMBeanServer.get();
                 for (final ObjectName objectName : endpointFactory.jmxNames) {
@@ -481,6 +492,14 @@ public class MdbContainer implements RpcContainer {
         }
     }
 
+    public static BeanContext current() {
+        final BeanContext beanContext = CURRENT.get();
+        if (beanContext == null) {
+            CURRENT.remove();
+        }
+        return beanContext;
+    }
+
     private static class MdbCallContext {
         private Method deliveryMethod;
         private TransactionPolicy txPolicy;

http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
index dde1e2b..73762ce 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
@@ -22,9 +22,15 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
+import org.apache.activemq.ra.ActiveMQEndpointActivationKey;
+import org.apache.activemq.ra.ActiveMQEndpointWorker;
 import org.apache.activemq.ra.ActiveMQManagedConnection;
 import org.apache.activemq.ra.MessageActivationSpec;
+import org.apache.openejb.BeanContext;
+import org.apache.openejb.core.mdb.MdbContainer;
 import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.monitoring.LocalMBeanServer;
+import org.apache.openejb.monitoring.ObjectNameBuilder;
 import org.apache.openejb.resource.AutoConnectionTracker;
 import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory;
 import org.apache.openejb.resource.activemq.jms2.TomEEManagedConnectionProxy;
@@ -38,9 +44,28 @@ import org.apache.openejb.util.reflection.Reflections;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanConstructorInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanParameterInfo;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
 import javax.naming.NamingException;
+import javax.resource.NotSupportedException;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
 import javax.resource.spi.BootstrapContext;
 import javax.resource.spi.ResourceAdapterInternalException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
@@ -48,9 +73,13 @@ import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import static javax.management.MBeanOperationInfo.ACTION;
+
 @SuppressWarnings("UnusedDeclaration")
 public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQResourceAdapter {
 
@@ -58,6 +87,7 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso
     private String useDatabaseLock;
     private String startupTimeout = "60000";
     private BootstrapContext bootstrapContext;
+    private final Map<BeanContext, ObjectName> mbeanNames = new ConcurrentHashMap<>();
 
     public String getDataSource() {
         return dataSource;
@@ -161,6 +191,87 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso
     }
 
     @Override
+    public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec activationSpec) throws ResourceException {
+        final BeanContext current = MdbContainer.current();
+        if (current != null && "false".equalsIgnoreCase(current.getProperties().getProperty("MdbActiveOnStartup"))) {
+            if (!equals(activationSpec.getResourceAdapter())) {
+                throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
+            }
+            if (!(activationSpec instanceof MessageActivationSpec)) {
+                throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass());
+            }
+
+            final ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, MessageActivationSpec.class.cast(activationSpec));
+            Map.class.cast(Reflections.get(this, "endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) {
+            });
+            // we dont want that worker.start();
+        } else {
+            super.endpointActivation(endpointFactory, activationSpec);
+        }
+
+        if (current != null) {
+            addJMxControl(current, current.getProperties().getProperty("MdbJMXControl"));
+        }
+    }
+
+    private void addJMxControl(final BeanContext current, final String name) throws ResourceException {
+        if (name == null || "false".equalsIgnoreCase(name)) {
+            return;
+        }
+
+        final ActiveMQEndpointWorker worker = getWorker(current);
+        final ObjectName jmxName;
+        try {
+            jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder()
+                    .set("J2EEServer", "openejb")
+                    .set("J2EEApplication", null)
+                    .set("EJBModule", current.getModuleID())
+                    .set("StatelessSessionBean", current.getEjbName())
+                    .set("j2eeType", "control")
+                    .set("name", current.getEjbName())
+                    .build() : new ObjectName(name);
+        } catch (final MalformedObjectNameException e) {
+            throw new IllegalArgumentException(e);
+        }
+        mbeanNames.put(current, jmxName);
+
+        final MBeanInfo info = new MBeanInfo(
+                "com.tomitribe.tomee.mdb.MdbControl",
+                "Allows to control a MDB listener",
+                new MBeanAttributeInfo[0],
+                new MBeanConstructorInfo[0],
+                new MBeanOperationInfo[]{
+                        new MBeanOperationInfo("start", "Ensure the listener is active.", new MBeanParameterInfo[0], "void", ACTION),
+                        new MBeanOperationInfo("stop", "Ensure the listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
+                },
+                new MBeanNotificationInfo[0]
+        );
+        LocalMBeanServer.registerSilently(new MdbJmxControl(worker), jmxName);
+        log.info("Deployed MDB control for " + current.getDeploymentID() + " on " + jmxName);
+    }
+
+    @Override
+    public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec activationSpec) {
+        final BeanContext current = MdbContainer.current();
+        if (current != null && "true".equalsIgnoreCase(current.getProperties().getProperty("MdbJMXControl"))) {
+            LocalMBeanServer.unregisterSilently(mbeanNames.remove(current));
+            log.info("Undeployed MDB control for " + current.getDeploymentID());
+        }
+        super.endpointDeactivation(endpointFactory, activationSpec);
+    }
+
+    private ActiveMQEndpointWorker getWorker(final BeanContext beanContext) throws ResourceException {
+        final Map<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> workers = Map.class.cast(Reflections.get(
+                MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(), "endpointWorkers"));
+        for (final Map.Entry<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> entry : workers.entrySet()) {
+            if (entry.getKey().getMessageEndpointFactory() == beanContext.getContainerData()) {
+                return entry.getValue();
+            }
+        }
+        throw new IllegalStateException("No worker for " + beanContext.getDeploymentID());
+    }
+
+    @Override
     public BootstrapContext getBootstrapContext() {
         return this.bootstrapContext;
     }
@@ -296,4 +407,72 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso
             //Ignore
         }
     }
+
+    public static class MdbJmxControl implements DynamicMBean {
+        private static final AttributeList ATTRIBUTE_LIST = new AttributeList();
+        private static final MBeanInfo INFO = new MBeanInfo(
+                "org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl",
+                "Allows to control a MDB (start/stop)",
+                new MBeanAttributeInfo[0],
+                new MBeanConstructorInfo[0],
+                new MBeanOperationInfo[]{
+                        new MBeanOperationInfo("start", "Ensure the listener is active.", new MBeanParameterInfo[0], "void", ACTION),
+                        new MBeanOperationInfo("stop", "Ensure the listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
+                },
+                new MBeanNotificationInfo[0]);
+
+        private final ActiveMQEndpointWorker worker;
+
+        private MdbJmxControl(final ActiveMQEndpointWorker worker) {
+            this.worker = worker;
+        }
+
+        @Override
+        public Object invoke(final String actionName, final Object[] params, final String[] signature) throws MBeanException, ReflectionException {
+            switch (actionName) {
+                case "stop":
+                    try {
+                        worker.stop();
+                    } catch (final InterruptedException e) {
+                        Thread.interrupted();
+                    }
+                    break;
+                case "start":
+                    try {
+                        worker.start();
+                    } catch (ResourceException e) {
+                        throw new MBeanException(new IllegalStateException(e.getMessage()));
+                    }
+                    break;
+                default:
+                    throw new MBeanException(new IllegalStateException("unsupported operation: " + actionName));
+            }
+            return null;
+        }
+
+        @Override
+        public MBeanInfo getMBeanInfo() {
+            return INFO;
+        }
+
+        @Override
+        public Object getAttribute(final String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException {
+            throw new AttributeNotFoundException();
+        }
+
+        @Override
+        public void setAttribute(final Attribute attribute) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException {
+            throw new AttributeNotFoundException();
+        }
+
+        @Override
+        public AttributeList getAttributes(final String[] attributes) {
+            return ATTRIBUTE_LIST;
+        }
+
+        @Override
+        public AttributeList setAttributes(final AttributeList attributes) {
+            return ATTRIBUTE_LIST;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
new file mode 100644
index 0000000..a574f59
--- /dev/null
+++ b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq;
+
+import org.apache.openejb.config.EjbModule;
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.oejb3.EjbDeployment;
+import org.apache.openejb.jee.oejb3.OpenejbJar;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.monitoring.LocalMBeanServer;
+import org.apache.openejb.testing.Classes;
+import org.apache.openejb.testing.Configuration;
+import org.apache.openejb.testing.Module;
+import org.apache.openejb.testng.PropertiesBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.MessageDriven;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(ApplicationComposer.class)
+public class ActiveMQResourceAdapterControlTest {
+    @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb")
+    private Queue queue;
+
+    @Resource
+    private ConnectionFactory connectionFactory;
+
+    @Configuration
+    public Properties config() {
+        return new PropertiesBuilder()
+                .p("ra", "new://Resource?type=ActiveMQResourceAdapter")
+                .p("ra.brokerXmlConfig", "broker:(vm://localhost)?useJmx=false&persistent=false")
+                .p("ra.serverUrl", "vm://localhost")
+
+                .p("mdb", "new://Container?type=MESSAGE")
+                .p("mdb.resourceAdapter", "ra")
+
+                .p("cf", "new://Resource?type=javax.jms.ConnectionFactory")
+                .p("cf.resourceAdapter", "ra")
+
+                .p("openejb.deploymentId.format", "{appId}/{ejbJarId}/{ejbName}")
+                .build();
+    }
+
+    @Module
+    @Classes(value = Mdb.class)
+    public EjbModule app() {
+        return new EjbModule(new EjbJar("test"), new OpenejbJar() {{
+            setId("test");
+            getEjbDeployment().add(new EjbDeployment() {{
+                setEjbName("ejb/Mdb");
+                getProperties().put("MdbActiveOnStartup", "false");
+                getProperties().put("MdbJMXControl", "default:type=test");
+            }});
+        }});
+    }
+
+    @Test
+    public void ensureControl() throws Exception {
+        assertFalse(Mdb.awaiter.message, sendAndWait("Will be received after", 10, TimeUnit.SECONDS));
+
+        setControl("start");
+        assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, TimeUnit.MINUTES));
+        assertEquals("Will be received after", Mdb.awaiter.message);
+
+        final long start = System.currentTimeMillis();
+        assertTrue(sendAndWait("First", 1, TimeUnit.MINUTES));
+        assertEquals("First", Mdb.awaiter.message);
+        final long end = System.currentTimeMillis();
+
+        Mdb.awaiter.message = null;
+        setControl("stop");
+        // default would be wait 10s, but if machine is slow we compute it from the first msg stats
+        final long waitWithoutResponse = Math.max(10, 5 * (end - start) / 1000);
+        System.out.println("We'll wait " + waitWithoutResponse + "s to get a message on a stopped listener");
+        assertFalse(Mdb.awaiter.message, sendAndWait("Will be received after", waitWithoutResponse, TimeUnit.SECONDS));
+        assertNull(Mdb.awaiter.message);
+
+        setControl("start");
+        assertTrue(sendAndWait("Second", 1, TimeUnit.MINUTES));
+        assertEquals("Will be received after", Mdb.awaiter.message);
+
+        Mdb.awaiter.message = null;
+        assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, TimeUnit.MINUTES));
+        assertEquals("Second", Mdb.awaiter.message);
+    }
+
+    private void setControl(final String action) throws Exception {
+        LocalMBeanServer.get().invoke(
+                new ObjectName("default:type=test"),
+                action, new Object[0], new String[0]);
+    }
+
+    private boolean sendAndWait(final String second, final long wait, final TimeUnit unit) throws JMSException {
+        doSend(second);
+        try {
+            return Mdb.awaiter.semaphore.tryAcquire(wait, unit);
+        } catch (final InterruptedException e) {
+            Thread.interrupted();
+            fail();
+            return false;
+        }
+    }
+
+    private void doSend(final String txt) throws JMSException {
+        Connection c = null;
+        try {
+            c = connectionFactory.createConnection();
+            Session session = null;
+            try {
+                session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = null;
+                try {
+                    producer = session.createProducer(queue);
+                    producer.send(session.createTextMessage(txt));
+                } finally {
+                    if (producer != null) {
+                        producer.close();
+                    }
+                }
+            } finally {
+                if (session != null) {
+                    session.close();
+                }
+            }
+        } finally {
+            if (c != null) {
+                c.close();
+            }
+        }
+    }
+
+    @MessageDriven(name = "ejb/Mdb")
+    public static class Mdb implements MessageListener {
+        static final MessageAwaiter awaiter = new MessageAwaiter();
+
+        @Override
+        public synchronized void onMessage(final Message message) {
+            try {
+                awaiter.message = TextMessage.class.cast(message).getText();
+            } catch (final JMSException e) {
+                throw new IllegalStateException(e);
+            } finally {
+                awaiter.semaphore.release();
+            }
+        }
+    }
+
+    public static class MessageAwaiter {
+        private final Semaphore semaphore = new Semaphore(0);
+        private volatile String message;
+    }
+}


Re: tomee git commit: TOMEE-2021 allow to control through JMX MDB listening state

Posted by Jonathan Gallimore <jo...@gmail.com>.
Hope no-one minds - I have backported this to 1.7.x:
https://github.com/apache/tomee/pull/62

If no-one objects, I'll merge this in.

Many thanks

Jon

On Thu, Mar 9, 2017 at 7:09 PM, <rm...@apache.org> wrote:

> Repository: tomee
> Updated Branches:
>   refs/heads/master 933bd899d -> 9c661758a
>
>
> TOMEE-2021 allow to control through JMX MDB listening state
>
>
> Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
> Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/9c661758
> Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/9c661758
> Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/9c661758
>
> Branch: refs/heads/master
> Commit: 9c661758aeff813d05748e91d01c1e522c09c0a4
> Parents: 933bd89
> Author: rmannibucau <rm...@apache.org>
> Authored: Thu Mar 9 20:09:50 2017 +0100
> Committer: rmannibucau <rm...@apache.org>
> Committed: Thu Mar 9 20:09:50 2017 +0100
>
> ----------------------------------------------------------------------
>  .../apache/openejb/core/mdb/MdbContainer.java   |  21 ++-
>  .../activemq/ActiveMQResourceAdapter.java       | 179 ++++++++++++++++++
>  .../ActiveMQResourceAdapterControlTest.java     | 187 +++++++++++++++++++
>  3 files changed, 386 insertions(+), 1 deletion(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/
> container/openejb-core/src/main/java/org/apache/openejb/
> core/mdb/MdbContainer.java
> ----------------------------------------------------------------------
> diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
> b/container/openejb-core/src/main/java/org/apache/openejb/
> core/mdb/MdbContainer.java
> index 389d228..a17f342 100644
> --- a/container/openejb-core/src/main/java/org/apache/openejb/
> core/mdb/MdbContainer.java
> +++ b/container/openejb-core/src/main/java/org/apache/openejb/
> core/mdb/MdbContainer.java
> @@ -71,6 +71,9 @@ import static org.apache.openejb.core.transaction.
> EjbTransactionUtil.handleSyste
>
>  public class MdbContainer implements RpcContainer {
>      private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB,
> "org.apache.openejb.util.resources");
> +
> +    private static final ThreadLocal<BeanContext> CURRENT = new
> ThreadLocal<>();
> +
>      private static final Object[] NO_ARGS = new Object[0];
>
>      private final Object containerID;
> @@ -183,6 +186,7 @@ public class MdbContainer implements RpcContainer {
>          }
>
>          // activate the endpoint
> +        CURRENT.set(beanContext);
>          try {
>              resourceAdapter.endpointActivation(endpointFactory,
> activationSpec);
>          } catch (final ResourceException e) {
> @@ -192,6 +196,8 @@ public class MdbContainer implements RpcContainer {
>              deployments.remove(deploymentId);
>
>              throw new OpenEJBException(e);
> +        } finally {
> +            CURRENT.remove();
>          }
>      }
>
> @@ -276,7 +282,12 @@ public class MdbContainer implements RpcContainer {
>          try {
>              final EndpointFactory endpointFactory = (EndpointFactory)
> beanContext.getContainerData();
>              if (endpointFactory != null) {
> -                resourceAdapter.endpointDeactivation(endpointFactory,
> endpointFactory.getActivationSpec());
> +                CURRENT.set(beanContext);
> +                try {
> +                    resourceAdapter.endpointDeactivation(endpointFactory,
> endpointFactory.getActivationSpec());
> +                } finally {
> +                    CURRENT.remove();
> +                }
>
>                  final MBeanServer server = LocalMBeanServer.get();
>                  for (final ObjectName objectName :
> endpointFactory.jmxNames) {
> @@ -481,6 +492,14 @@ public class MdbContainer implements RpcContainer {
>          }
>      }
>
> +    public static BeanContext current() {
> +        final BeanContext beanContext = CURRENT.get();
> +        if (beanContext == null) {
> +            CURRENT.remove();
> +        }
> +        return beanContext;
> +    }
> +
>      private static class MdbCallContext {
>          private Method deliveryMethod;
>          private TransactionPolicy txPolicy;
>
> http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/
> container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/
> ActiveMQResourceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/container/openejb-core/src/main/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapter.java
> b/container/openejb-core/src/main/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapter.java
> index dde1e2b..73762ce 100644
> --- a/container/openejb-core/src/main/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapter.java
> +++ b/container/openejb-core/src/main/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapter.java
> @@ -22,9 +22,15 @@ import org.apache.activemq.ActiveMQConnectionFactory;
>  import org.apache.activemq.RedeliveryPolicy;
>  import org.apache.activemq.broker.BrokerService;
>  import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
> +import org.apache.activemq.ra.ActiveMQEndpointActivationKey;
> +import org.apache.activemq.ra.ActiveMQEndpointWorker;
>  import org.apache.activemq.ra.ActiveMQManagedConnection;
>  import org.apache.activemq.ra.MessageActivationSpec;
> +import org.apache.openejb.BeanContext;
> +import org.apache.openejb.core.mdb.MdbContainer;
>  import org.apache.openejb.loader.SystemInstance;
> +import org.apache.openejb.monitoring.LocalMBeanServer;
> +import org.apache.openejb.monitoring.ObjectNameBuilder;
>  import org.apache.openejb.resource.AutoConnectionTracker;
>  import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory;
>  import org.apache.openejb.resource.activemq.jms2.
> TomEEManagedConnectionProxy;
> @@ -38,9 +44,28 @@ import org.apache.openejb.util.reflection.Reflections;
>
>  import javax.jms.Connection;
>  import javax.jms.JMSException;
> +import javax.management.Attribute;
> +import javax.management.AttributeList;
> +import javax.management.AttributeNotFoundException;
> +import javax.management.DynamicMBean;
> +import javax.management.InvalidAttributeValueException;
> +import javax.management.MBeanAttributeInfo;
> +import javax.management.MBeanConstructorInfo;
> +import javax.management.MBeanException;
> +import javax.management.MBeanInfo;
> +import javax.management.MBeanNotificationInfo;
> +import javax.management.MBeanOperationInfo;
> +import javax.management.MBeanParameterInfo;
> +import javax.management.MalformedObjectNameException;
> +import javax.management.ObjectName;
> +import javax.management.ReflectionException;
>  import javax.naming.NamingException;
> +import javax.resource.NotSupportedException;
> +import javax.resource.ResourceException;
> +import javax.resource.spi.ActivationSpec;
>  import javax.resource.spi.BootstrapContext;
>  import javax.resource.spi.ResourceAdapterInternalException;
> +import javax.resource.spi.endpoint.MessageEndpointFactory;
>  import java.lang.reflect.InvocationHandler;
>  import java.lang.reflect.Method;
>  import java.lang.reflect.Proxy;
> @@ -48,9 +73,13 @@ import java.net.URISyntaxException;
>  import java.util.Collection;
>  import java.util.Iterator;
>  import java.util.Locale;
> +import java.util.Map;
>  import java.util.Properties;
> +import java.util.concurrent.ConcurrentHashMap;
>  import java.util.concurrent.TimeUnit;
>
> +import static javax.management.MBeanOperationInfo.ACTION;
> +
>  @SuppressWarnings("UnusedDeclaration")
>  public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQResourceAdapter
> {
>
> @@ -58,6 +87,7 @@ public class ActiveMQResourceAdapter extends
> org.apache.activemq.ra.ActiveMQReso
>      private String useDatabaseLock;
>      private String startupTimeout = "60000";
>      private BootstrapContext bootstrapContext;
> +    private final Map<BeanContext, ObjectName> mbeanNames = new
> ConcurrentHashMap<>();
>
>      public String getDataSource() {
>          return dataSource;
> @@ -161,6 +191,87 @@ public class ActiveMQResourceAdapter extends
> org.apache.activemq.ra.ActiveMQReso
>      }
>
>      @Override
> +    public void endpointActivation(final MessageEndpointFactory
> endpointFactory, final ActivationSpec activationSpec) throws
> ResourceException {
> +        final BeanContext current = MdbContainer.current();
> +        if (current != null && "false".equalsIgnoreCase(
> current.getProperties().getProperty("MdbActiveOnStartup"))) {
> +            if (!equals(activationSpec.getResourceAdapter())) {
> +                throw new ResourceException("Activation spec not
> initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter()
> + " != " + this + ")");
> +            }
> +            if (!(activationSpec instanceof MessageActivationSpec)) {
> +                throw new NotSupportedException("That type of
> ActivationSpec not supported: " + activationSpec.getClass());
> +            }
> +
> +            final ActiveMQEndpointActivationKey key = new
> ActiveMQEndpointActivationKey(endpointFactory,
> MessageActivationSpec.class.cast(activationSpec));
> +            Map.class.cast(Reflections.get(this,
> "endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) {
> +            });
> +            // we dont want that worker.start();
> +        } else {
> +            super.endpointActivation(endpointFactory, activationSpec);
> +        }
> +
> +        if (current != null) {
> +            addJMxControl(current, current.getProperties().
> getProperty("MdbJMXControl"));
> +        }
> +    }
> +
> +    private void addJMxControl(final BeanContext current, final String
> name) throws ResourceException {
> +        if (name == null || "false".equalsIgnoreCase(name)) {
> +            return;
> +        }
> +
> +        final ActiveMQEndpointWorker worker = getWorker(current);
> +        final ObjectName jmxName;
> +        try {
> +            jmxName = "true".equalsIgnoreCase(name) ? new
> ObjectNameBuilder()
> +                    .set("J2EEServer", "openejb")
> +                    .set("J2EEApplication", null)
> +                    .set("EJBModule", current.getModuleID())
> +                    .set("StatelessSessionBean", current.getEjbName())
> +                    .set("j2eeType", "control")
> +                    .set("name", current.getEjbName())
> +                    .build() : new ObjectName(name);
> +        } catch (final MalformedObjectNameException e) {
> +            throw new IllegalArgumentException(e);
> +        }
> +        mbeanNames.put(current, jmxName);
> +
> +        final MBeanInfo info = new MBeanInfo(
> +                "com.tomitribe.tomee.mdb.MdbControl",
> +                "Allows to control a MDB listener",
> +                new MBeanAttributeInfo[0],
> +                new MBeanConstructorInfo[0],
> +                new MBeanOperationInfo[]{
> +                        new MBeanOperationInfo("start", "Ensure the
> listener is active.", new MBeanParameterInfo[0], "void", ACTION),
> +                        new MBeanOperationInfo("stop", "Ensure the
> listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
> +                },
> +                new MBeanNotificationInfo[0]
> +        );
> +        LocalMBeanServer.registerSilently(new MdbJmxControl(worker),
> jmxName);
> +        log.info("Deployed MDB control for " + current.getDeploymentID()
> + " on " + jmxName);
> +    }
> +
> +    @Override
> +    public void endpointDeactivation(final MessageEndpointFactory
> endpointFactory, final ActivationSpec activationSpec) {
> +        final BeanContext current = MdbContainer.current();
> +        if (current != null && "true".equalsIgnoreCase(
> current.getProperties().getProperty("MdbJMXControl"))) {
> +            LocalMBeanServer.unregisterSilently(mbeanNames.
> remove(current));
> +            log.info("Undeployed MDB control for " +
> current.getDeploymentID());
> +        }
> +        super.endpointDeactivation(endpointFactory, activationSpec);
> +    }
> +
> +    private ActiveMQEndpointWorker getWorker(final BeanContext
> beanContext) throws ResourceException {
> +        final Map<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>
> workers = Map.class.cast(Reflections.get(
> +                MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(),
> "endpointWorkers"));
> +        for (final Map.Entry<ActiveMQEndpointActivationKey,
> ActiveMQEndpointWorker> entry : workers.entrySet()) {
> +            if (entry.getKey().getMessageEndpointFactory() ==
> beanContext.getContainerData()) {
> +                return entry.getValue();
> +            }
> +        }
> +        throw new IllegalStateException("No worker for " +
> beanContext.getDeploymentID());
> +    }
> +
> +    @Override
>      public BootstrapContext getBootstrapContext() {
>          return this.bootstrapContext;
>      }
> @@ -296,4 +407,72 @@ public class ActiveMQResourceAdapter extends
> org.apache.activemq.ra.ActiveMQReso
>              //Ignore
>          }
>      }
> +
> +    public static class MdbJmxControl implements DynamicMBean {
> +        private static final AttributeList ATTRIBUTE_LIST = new
> AttributeList();
> +        private static final MBeanInfo INFO = new MBeanInfo(
> +                "org.apache.openejb.resource.activemq.
> ActiveMQResourceAdapter.MdbJmxControl",
> +                "Allows to control a MDB (start/stop)",
> +                new MBeanAttributeInfo[0],
> +                new MBeanConstructorInfo[0],
> +                new MBeanOperationInfo[]{
> +                        new MBeanOperationInfo("start", "Ensure the
> listener is active.", new MBeanParameterInfo[0], "void", ACTION),
> +                        new MBeanOperationInfo("stop", "Ensure the
> listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
> +                },
> +                new MBeanNotificationInfo[0]);
> +
> +        private final ActiveMQEndpointWorker worker;
> +
> +        private MdbJmxControl(final ActiveMQEndpointWorker worker) {
> +            this.worker = worker;
> +        }
> +
> +        @Override
> +        public Object invoke(final String actionName, final Object[]
> params, final String[] signature) throws MBeanException,
> ReflectionException {
> +            switch (actionName) {
> +                case "stop":
> +                    try {
> +                        worker.stop();
> +                    } catch (final InterruptedException e) {
> +                        Thread.interrupted();
> +                    }
> +                    break;
> +                case "start":
> +                    try {
> +                        worker.start();
> +                    } catch (ResourceException e) {
> +                        throw new MBeanException(new
> IllegalStateException(e.getMessage()));
> +                    }
> +                    break;
> +                default:
> +                    throw new MBeanException(new IllegalStateException("unsupported
> operation: " + actionName));
> +            }
> +            return null;
> +        }
> +
> +        @Override
> +        public MBeanInfo getMBeanInfo() {
> +            return INFO;
> +        }
> +
> +        @Override
> +        public Object getAttribute(final String attribute) throws
> AttributeNotFoundException, MBeanException, ReflectionException {
> +            throw new AttributeNotFoundException();
> +        }
> +
> +        @Override
> +        public void setAttribute(final Attribute attribute) throws
> AttributeNotFoundException, InvalidAttributeValueException,
> MBeanException, ReflectionException {
> +            throw new AttributeNotFoundException();
> +        }
> +
> +        @Override
> +        public AttributeList getAttributes(final String[] attributes) {
> +            return ATTRIBUTE_LIST;
> +        }
> +
> +        @Override
> +        public AttributeList setAttributes(final AttributeList
> attributes) {
> +            return ATTRIBUTE_LIST;
> +        }
> +    }
>  }
>
> http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/
> container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/
> ActiveMQResourceAdapterControlTest.java
> ----------------------------------------------------------------------
> diff --git a/container/openejb-core/src/test/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapterControlTest.java
> b/container/openejb-core/src/test/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapterControlTest.java
> new file mode 100644
> index 0000000..a574f59
> --- /dev/null
> +++ b/container/openejb-core/src/test/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapterControlTest.java
> @@ -0,0 +1,187 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.openejb.resource.activemq;
> +
> +import org.apache.openejb.config.EjbModule;
> +import org.apache.openejb.jee.EjbJar;
> +import org.apache.openejb.jee.oejb3.EjbDeployment;
> +import org.apache.openejb.jee.oejb3.OpenejbJar;
> +import org.apache.openejb.junit.ApplicationComposer;
> +import org.apache.openejb.monitoring.LocalMBeanServer;
> +import org.apache.openejb.testing.Classes;
> +import org.apache.openejb.testing.Configuration;
> +import org.apache.openejb.testing.Module;
> +import org.apache.openejb.testng.PropertiesBuilder;
> +import org.junit.Test;
> +import org.junit.runner.RunWith;
> +
> +import javax.annotation.Resource;
> +import javax.ejb.MessageDriven;
> +import javax.jms.Connection;
> +import javax.jms.ConnectionFactory;
> +import javax.jms.JMSException;
> +import javax.jms.Message;
> +import javax.jms.MessageListener;
> +import javax.jms.MessageProducer;
> +import javax.jms.Queue;
> +import javax.jms.Session;
> +import javax.jms.TextMessage;
> +import javax.management.ObjectName;
> +import java.util.Properties;
> +import java.util.concurrent.Semaphore;
> +import java.util.concurrent.TimeUnit;
> +
> +import static org.junit.Assert.assertEquals;
> +import static org.junit.Assert.assertFalse;
> +import static org.junit.Assert.assertNull;
> +import static org.junit.Assert.assertTrue;
> +import static org.junit.Assert.fail;
> +
> +@RunWith(ApplicationComposer.class)
> +public class ActiveMQResourceAdapterControlTest {
> +    @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb")
> +    private Queue queue;
> +
> +    @Resource
> +    private ConnectionFactory connectionFactory;
> +
> +    @Configuration
> +    public Properties config() {
> +        return new PropertiesBuilder()
> +                .p("ra", "new://Resource?type=ActiveMQResourceAdapter")
> +                .p("ra.brokerXmlConfig", "broker:(vm://localhost)?
> useJmx=false&persistent=false")
> +                .p("ra.serverUrl", "vm://localhost")
> +
> +                .p("mdb", "new://Container?type=MESSAGE")
> +                .p("mdb.resourceAdapter", "ra")
> +
> +                .p("cf", "new://Resource?type=javax.
> jms.ConnectionFactory")
> +                .p("cf.resourceAdapter", "ra")
> +
> +                .p("openejb.deploymentId.format",
> "{appId}/{ejbJarId}/{ejbName}")
> +                .build();
> +    }
> +
> +    @Module
> +    @Classes(value = Mdb.class)
> +    public EjbModule app() {
> +        return new EjbModule(new EjbJar("test"), new OpenejbJar() {{
> +            setId("test");
> +            getEjbDeployment().add(new EjbDeployment() {{
> +                setEjbName("ejb/Mdb");
> +                getProperties().put("MdbActiveOnStartup", "false");
> +                getProperties().put("MdbJMXControl",
> "default:type=test");
> +            }});
> +        }});
> +    }
> +
> +    @Test
> +    public void ensureControl() throws Exception {
> +        assertFalse(Mdb.awaiter.message, sendAndWait("Will be received
> after", 10, TimeUnit.SECONDS));
> +
> +        setControl("start");
> +        assertTrue(Mdb.awaiter.semaphore.tryAcquire(1,
> TimeUnit.MINUTES));
> +        assertEquals("Will be received after", Mdb.awaiter.message);
> +
> +        final long start = System.currentTimeMillis();
> +        assertTrue(sendAndWait("First", 1, TimeUnit.MINUTES));
> +        assertEquals("First", Mdb.awaiter.message);
> +        final long end = System.currentTimeMillis();
> +
> +        Mdb.awaiter.message = null;
> +        setControl("stop");
> +        // default would be wait 10s, but if machine is slow we compute
> it from the first msg stats
> +        final long waitWithoutResponse = Math.max(10, 5 * (end - start) /
> 1000);
> +        System.out.println("We'll wait " + waitWithoutResponse + "s to
> get a message on a stopped listener");
> +        assertFalse(Mdb.awaiter.message, sendAndWait("Will be received
> after", waitWithoutResponse, TimeUnit.SECONDS));
> +        assertNull(Mdb.awaiter.message);
> +
> +        setControl("start");
> +        assertTrue(sendAndWait("Second", 1, TimeUnit.MINUTES));
> +        assertEquals("Will be received after", Mdb.awaiter.message);
> +
> +        Mdb.awaiter.message = null;
> +        assertTrue(Mdb.awaiter.semaphore.tryAcquire(1,
> TimeUnit.MINUTES));
> +        assertEquals("Second", Mdb.awaiter.message);
> +    }
> +
> +    private void setControl(final String action) throws Exception {
> +        LocalMBeanServer.get().invoke(
> +                new ObjectName("default:type=test"),
> +                action, new Object[0], new String[0]);
> +    }
> +
> +    private boolean sendAndWait(final String second, final long wait,
> final TimeUnit unit) throws JMSException {
> +        doSend(second);
> +        try {
> +            return Mdb.awaiter.semaphore.tryAcquire(wait, unit);
> +        } catch (final InterruptedException e) {
> +            Thread.interrupted();
> +            fail();
> +            return false;
> +        }
> +    }
> +
> +    private void doSend(final String txt) throws JMSException {
> +        Connection c = null;
> +        try {
> +            c = connectionFactory.createConnection();
> +            Session session = null;
> +            try {
> +                session = c.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> +                MessageProducer producer = null;
> +                try {
> +                    producer = session.createProducer(queue);
> +                    producer.send(session.createTextMessage(txt));
> +                } finally {
> +                    if (producer != null) {
> +                        producer.close();
> +                    }
> +                }
> +            } finally {
> +                if (session != null) {
> +                    session.close();
> +                }
> +            }
> +        } finally {
> +            if (c != null) {
> +                c.close();
> +            }
> +        }
> +    }
> +
> +    @MessageDriven(name = "ejb/Mdb")
> +    public static class Mdb implements MessageListener {
> +        static final MessageAwaiter awaiter = new MessageAwaiter();
> +
> +        @Override
> +        public synchronized void onMessage(final Message message) {
> +            try {
> +                awaiter.message = TextMessage.class.cast(
> message).getText();
> +            } catch (final JMSException e) {
> +                throw new IllegalStateException(e);
> +            } finally {
> +                awaiter.semaphore.release();
> +            }
> +        }
> +    }
> +
> +    public static class MessageAwaiter {
> +        private final Semaphore semaphore = new Semaphore(0);
> +        private volatile String message;
> +    }
> +}
>
>