You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by rm...@apache.org on 2017/03/09 19:09:57 UTC
tomee git commit: TOMEE-2021 allow to control through JMX MDB
listening state
Repository: tomee
Updated Branches:
refs/heads/master 933bd899d -> 9c661758a
TOMEE-2021 allow to control through JMX MDB listening state
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/9c661758
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/9c661758
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/9c661758
Branch: refs/heads/master
Commit: 9c661758aeff813d05748e91d01c1e522c09c0a4
Parents: 933bd89
Author: rmannibucau <rm...@apache.org>
Authored: Thu Mar 9 20:09:50 2017 +0100
Committer: rmannibucau <rm...@apache.org>
Committed: Thu Mar 9 20:09:50 2017 +0100
----------------------------------------------------------------------
.../apache/openejb/core/mdb/MdbContainer.java | 21 ++-
.../activemq/ActiveMQResourceAdapter.java | 179 ++++++++++++++++++
.../ActiveMQResourceAdapterControlTest.java | 187 +++++++++++++++++++
3 files changed, 386 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
index 389d228..a17f342 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
@@ -71,6 +71,9 @@ import static org.apache.openejb.core.transaction.EjbTransactionUtil.handleSyste
public class MdbContainer implements RpcContainer {
private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources");
+
+ private static final ThreadLocal<BeanContext> CURRENT = new ThreadLocal<>();
+
private static final Object[] NO_ARGS = new Object[0];
private final Object containerID;
@@ -183,6 +186,7 @@ public class MdbContainer implements RpcContainer {
}
// activate the endpoint
+ CURRENT.set(beanContext);
try {
resourceAdapter.endpointActivation(endpointFactory, activationSpec);
} catch (final ResourceException e) {
@@ -192,6 +196,8 @@ public class MdbContainer implements RpcContainer {
deployments.remove(deploymentId);
throw new OpenEJBException(e);
+ } finally {
+ CURRENT.remove();
}
}
@@ -276,7 +282,12 @@ public class MdbContainer implements RpcContainer {
try {
final EndpointFactory endpointFactory = (EndpointFactory) beanContext.getContainerData();
if (endpointFactory != null) {
- resourceAdapter.endpointDeactivation(endpointFactory, endpointFactory.getActivationSpec());
+ CURRENT.set(beanContext);
+ try {
+ resourceAdapter.endpointDeactivation(endpointFactory, endpointFactory.getActivationSpec());
+ } finally {
+ CURRENT.remove();
+ }
final MBeanServer server = LocalMBeanServer.get();
for (final ObjectName objectName : endpointFactory.jmxNames) {
@@ -481,6 +492,14 @@ public class MdbContainer implements RpcContainer {
}
}
+ public static BeanContext current() {
+ final BeanContext beanContext = CURRENT.get();
+ if (beanContext == null) {
+ CURRENT.remove();
+ }
+ return beanContext;
+ }
+
private static class MdbCallContext {
private Method deliveryMethod;
private TransactionPolicy txPolicy;
http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
index dde1e2b..73762ce 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
@@ -22,9 +22,15 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
+import org.apache.activemq.ra.ActiveMQEndpointActivationKey;
+import org.apache.activemq.ra.ActiveMQEndpointWorker;
import org.apache.activemq.ra.ActiveMQManagedConnection;
import org.apache.activemq.ra.MessageActivationSpec;
+import org.apache.openejb.BeanContext;
+import org.apache.openejb.core.mdb.MdbContainer;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.monitoring.LocalMBeanServer;
+import org.apache.openejb.monitoring.ObjectNameBuilder;
import org.apache.openejb.resource.AutoConnectionTracker;
import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory;
import org.apache.openejb.resource.activemq.jms2.TomEEManagedConnectionProxy;
@@ -38,9 +44,28 @@ import org.apache.openejb.util.reflection.Reflections;
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanConstructorInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanParameterInfo;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
import javax.naming.NamingException;
+import javax.resource.NotSupportedException;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapterInternalException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
@@ -48,9 +73,13 @@ import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Locale;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import static javax.management.MBeanOperationInfo.ACTION;
+
@SuppressWarnings("UnusedDeclaration")
public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQResourceAdapter {
@@ -58,6 +87,7 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso
private String useDatabaseLock;
private String startupTimeout = "60000";
private BootstrapContext bootstrapContext;
+ private final Map<BeanContext, ObjectName> mbeanNames = new ConcurrentHashMap<>();
public String getDataSource() {
return dataSource;
@@ -161,6 +191,87 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso
}
@Override
+ public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec activationSpec) throws ResourceException {
+ final BeanContext current = MdbContainer.current();
+ if (current != null && "false".equalsIgnoreCase(current.getProperties().getProperty("MdbActiveOnStartup"))) {
+ if (!equals(activationSpec.getResourceAdapter())) {
+ throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
+ }
+ if (!(activationSpec instanceof MessageActivationSpec)) {
+ throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass());
+ }
+
+ final ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, MessageActivationSpec.class.cast(activationSpec));
+ Map.class.cast(Reflections.get(this, "endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) {
+ });
+ // we dont want that worker.start();
+ } else {
+ super.endpointActivation(endpointFactory, activationSpec);
+ }
+
+ if (current != null) {
+ addJMxControl(current, current.getProperties().getProperty("MdbJMXControl"));
+ }
+ }
+
+ private void addJMxControl(final BeanContext current, final String name) throws ResourceException {
+ if (name == null || "false".equalsIgnoreCase(name)) {
+ return;
+ }
+
+ final ActiveMQEndpointWorker worker = getWorker(current);
+ final ObjectName jmxName;
+ try {
+ jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder()
+ .set("J2EEServer", "openejb")
+ .set("J2EEApplication", null)
+ .set("EJBModule", current.getModuleID())
+ .set("StatelessSessionBean", current.getEjbName())
+ .set("j2eeType", "control")
+ .set("name", current.getEjbName())
+ .build() : new ObjectName(name);
+ } catch (final MalformedObjectNameException e) {
+ throw new IllegalArgumentException(e);
+ }
+ mbeanNames.put(current, jmxName);
+
+ final MBeanInfo info = new MBeanInfo(
+ "com.tomitribe.tomee.mdb.MdbControl",
+ "Allows to control a MDB listener",
+ new MBeanAttributeInfo[0],
+ new MBeanConstructorInfo[0],
+ new MBeanOperationInfo[]{
+ new MBeanOperationInfo("start", "Ensure the listener is active.", new MBeanParameterInfo[0], "void", ACTION),
+ new MBeanOperationInfo("stop", "Ensure the listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
+ },
+ new MBeanNotificationInfo[0]
+ );
+ LocalMBeanServer.registerSilently(new MdbJmxControl(worker), jmxName);
+ log.info("Deployed MDB control for " + current.getDeploymentID() + " on " + jmxName);
+ }
+
+ @Override
+ public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec activationSpec) {
+ final BeanContext current = MdbContainer.current();
+ if (current != null && "true".equalsIgnoreCase(current.getProperties().getProperty("MdbJMXControl"))) {
+ LocalMBeanServer.unregisterSilently(mbeanNames.remove(current));
+ log.info("Undeployed MDB control for " + current.getDeploymentID());
+ }
+ super.endpointDeactivation(endpointFactory, activationSpec);
+ }
+
+ private ActiveMQEndpointWorker getWorker(final BeanContext beanContext) throws ResourceException {
+ final Map<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> workers = Map.class.cast(Reflections.get(
+ MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(), "endpointWorkers"));
+ for (final Map.Entry<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> entry : workers.entrySet()) {
+ if (entry.getKey().getMessageEndpointFactory() == beanContext.getContainerData()) {
+ return entry.getValue();
+ }
+ }
+ throw new IllegalStateException("No worker for " + beanContext.getDeploymentID());
+ }
+
+ @Override
public BootstrapContext getBootstrapContext() {
return this.bootstrapContext;
}
@@ -296,4 +407,72 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso
//Ignore
}
}
+
+ public static class MdbJmxControl implements DynamicMBean {
+ private static final AttributeList ATTRIBUTE_LIST = new AttributeList();
+ private static final MBeanInfo INFO = new MBeanInfo(
+ "org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl",
+ "Allows to control a MDB (start/stop)",
+ new MBeanAttributeInfo[0],
+ new MBeanConstructorInfo[0],
+ new MBeanOperationInfo[]{
+ new MBeanOperationInfo("start", "Ensure the listener is active.", new MBeanParameterInfo[0], "void", ACTION),
+ new MBeanOperationInfo("stop", "Ensure the listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
+ },
+ new MBeanNotificationInfo[0]);
+
+ private final ActiveMQEndpointWorker worker;
+
+ private MdbJmxControl(final ActiveMQEndpointWorker worker) {
+ this.worker = worker;
+ }
+
+ @Override
+ public Object invoke(final String actionName, final Object[] params, final String[] signature) throws MBeanException, ReflectionException {
+ switch (actionName) {
+ case "stop":
+ try {
+ worker.stop();
+ } catch (final InterruptedException e) {
+ Thread.interrupted();
+ }
+ break;
+ case "start":
+ try {
+ worker.start();
+ } catch (ResourceException e) {
+ throw new MBeanException(new IllegalStateException(e.getMessage()));
+ }
+ break;
+ default:
+ throw new MBeanException(new IllegalStateException("unsupported operation: " + actionName));
+ }
+ return null;
+ }
+
+ @Override
+ public MBeanInfo getMBeanInfo() {
+ return INFO;
+ }
+
+ @Override
+ public Object getAttribute(final String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException {
+ throw new AttributeNotFoundException();
+ }
+
+ @Override
+ public void setAttribute(final Attribute attribute) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException {
+ throw new AttributeNotFoundException();
+ }
+
+ @Override
+ public AttributeList getAttributes(final String[] attributes) {
+ return ATTRIBUTE_LIST;
+ }
+
+ @Override
+ public AttributeList setAttributes(final AttributeList attributes) {
+ return ATTRIBUTE_LIST;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
new file mode 100644
index 0000000..a574f59
--- /dev/null
+++ b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq;
+
+import org.apache.openejb.config.EjbModule;
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.oejb3.EjbDeployment;
+import org.apache.openejb.jee.oejb3.OpenejbJar;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.monitoring.LocalMBeanServer;
+import org.apache.openejb.testing.Classes;
+import org.apache.openejb.testing.Configuration;
+import org.apache.openejb.testing.Module;
+import org.apache.openejb.testng.PropertiesBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.MessageDriven;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(ApplicationComposer.class)
+public class ActiveMQResourceAdapterControlTest {
+ @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb")
+ private Queue queue;
+
+ @Resource
+ private ConnectionFactory connectionFactory;
+
+ @Configuration
+ public Properties config() {
+ return new PropertiesBuilder()
+ .p("ra", "new://Resource?type=ActiveMQResourceAdapter")
+ .p("ra.brokerXmlConfig", "broker:(vm://localhost)?useJmx=false&persistent=false")
+ .p("ra.serverUrl", "vm://localhost")
+
+ .p("mdb", "new://Container?type=MESSAGE")
+ .p("mdb.resourceAdapter", "ra")
+
+ .p("cf", "new://Resource?type=javax.jms.ConnectionFactory")
+ .p("cf.resourceAdapter", "ra")
+
+ .p("openejb.deploymentId.format", "{appId}/{ejbJarId}/{ejbName}")
+ .build();
+ }
+
+ @Module
+ @Classes(value = Mdb.class)
+ public EjbModule app() {
+ return new EjbModule(new EjbJar("test"), new OpenejbJar() {{
+ setId("test");
+ getEjbDeployment().add(new EjbDeployment() {{
+ setEjbName("ejb/Mdb");
+ getProperties().put("MdbActiveOnStartup", "false");
+ getProperties().put("MdbJMXControl", "default:type=test");
+ }});
+ }});
+ }
+
+ @Test
+ public void ensureControl() throws Exception {
+ assertFalse(Mdb.awaiter.message, sendAndWait("Will be received after", 10, TimeUnit.SECONDS));
+
+ setControl("start");
+ assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, TimeUnit.MINUTES));
+ assertEquals("Will be received after", Mdb.awaiter.message);
+
+ final long start = System.currentTimeMillis();
+ assertTrue(sendAndWait("First", 1, TimeUnit.MINUTES));
+ assertEquals("First", Mdb.awaiter.message);
+ final long end = System.currentTimeMillis();
+
+ Mdb.awaiter.message = null;
+ setControl("stop");
+ // default would be wait 10s, but if machine is slow we compute it from the first msg stats
+ final long waitWithoutResponse = Math.max(10, 5 * (end - start) / 1000);
+ System.out.println("We'll wait " + waitWithoutResponse + "s to get a message on a stopped listener");
+ assertFalse(Mdb.awaiter.message, sendAndWait("Will be received after", waitWithoutResponse, TimeUnit.SECONDS));
+ assertNull(Mdb.awaiter.message);
+
+ setControl("start");
+ assertTrue(sendAndWait("Second", 1, TimeUnit.MINUTES));
+ assertEquals("Will be received after", Mdb.awaiter.message);
+
+ Mdb.awaiter.message = null;
+ assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, TimeUnit.MINUTES));
+ assertEquals("Second", Mdb.awaiter.message);
+ }
+
+ private void setControl(final String action) throws Exception {
+ LocalMBeanServer.get().invoke(
+ new ObjectName("default:type=test"),
+ action, new Object[0], new String[0]);
+ }
+
+ private boolean sendAndWait(final String second, final long wait, final TimeUnit unit) throws JMSException {
+ doSend(second);
+ try {
+ return Mdb.awaiter.semaphore.tryAcquire(wait, unit);
+ } catch (final InterruptedException e) {
+ Thread.interrupted();
+ fail();
+ return false;
+ }
+ }
+
+ private void doSend(final String txt) throws JMSException {
+ Connection c = null;
+ try {
+ c = connectionFactory.createConnection();
+ Session session = null;
+ try {
+ session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = null;
+ try {
+ producer = session.createProducer(queue);
+ producer.send(session.createTextMessage(txt));
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ }
+ } finally {
+ if (c != null) {
+ c.close();
+ }
+ }
+ }
+
+ @MessageDriven(name = "ejb/Mdb")
+ public static class Mdb implements MessageListener {
+ static final MessageAwaiter awaiter = new MessageAwaiter();
+
+ @Override
+ public synchronized void onMessage(final Message message) {
+ try {
+ awaiter.message = TextMessage.class.cast(message).getText();
+ } catch (final JMSException e) {
+ throw new IllegalStateException(e);
+ } finally {
+ awaiter.semaphore.release();
+ }
+ }
+ }
+
+ public static class MessageAwaiter {
+ private final Semaphore semaphore = new Semaphore(0);
+ private volatile String message;
+ }
+}
Re: tomee git commit: TOMEE-2021 allow to control through JMX MDB
listening state
Posted by Jonathan Gallimore <jo...@gmail.com>.
Hope no-one minds - I have backported this to 1.7.x:
https://github.com/apache/tomee/pull/62
If no-one objects, I'll merge this in.
Many thanks
Jon
On Thu, Mar 9, 2017 at 7:09 PM, <rm...@apache.org> wrote:
> Repository: tomee
> Updated Branches:
> refs/heads/master 933bd899d -> 9c661758a
>
>
> TOMEE-2021 allow to control through JMX MDB listening state
>
>
> Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
> Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/9c661758
> Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/9c661758
> Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/9c661758
>
> Branch: refs/heads/master
> Commit: 9c661758aeff813d05748e91d01c1e522c09c0a4
> Parents: 933bd89
> Author: rmannibucau <rm...@apache.org>
> Authored: Thu Mar 9 20:09:50 2017 +0100
> Committer: rmannibucau <rm...@apache.org>
> Committed: Thu Mar 9 20:09:50 2017 +0100
>
> ----------------------------------------------------------------------
> .../apache/openejb/core/mdb/MdbContainer.java | 21 ++-
> .../activemq/ActiveMQResourceAdapter.java | 179 ++++++++++++++++++
> .../ActiveMQResourceAdapterControlTest.java | 187 +++++++++++++++++++
> 3 files changed, 386 insertions(+), 1 deletion(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/
> container/openejb-core/src/main/java/org/apache/openejb/
> core/mdb/MdbContainer.java
> ----------------------------------------------------------------------
> diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
> b/container/openejb-core/src/main/java/org/apache/openejb/
> core/mdb/MdbContainer.java
> index 389d228..a17f342 100644
> --- a/container/openejb-core/src/main/java/org/apache/openejb/
> core/mdb/MdbContainer.java
> +++ b/container/openejb-core/src/main/java/org/apache/openejb/
> core/mdb/MdbContainer.java
> @@ -71,6 +71,9 @@ import static org.apache.openejb.core.transaction.
> EjbTransactionUtil.handleSyste
>
> public class MdbContainer implements RpcContainer {
> private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB,
> "org.apache.openejb.util.resources");
> +
> + private static final ThreadLocal<BeanContext> CURRENT = new
> ThreadLocal<>();
> +
> private static final Object[] NO_ARGS = new Object[0];
>
> private final Object containerID;
> @@ -183,6 +186,7 @@ public class MdbContainer implements RpcContainer {
> }
>
> // activate the endpoint
> + CURRENT.set(beanContext);
> try {
> resourceAdapter.endpointActivation(endpointFactory,
> activationSpec);
> } catch (final ResourceException e) {
> @@ -192,6 +196,8 @@ public class MdbContainer implements RpcContainer {
> deployments.remove(deploymentId);
>
> throw new OpenEJBException(e);
> + } finally {
> + CURRENT.remove();
> }
> }
>
> @@ -276,7 +282,12 @@ public class MdbContainer implements RpcContainer {
> try {
> final EndpointFactory endpointFactory = (EndpointFactory)
> beanContext.getContainerData();
> if (endpointFactory != null) {
> - resourceAdapter.endpointDeactivation(endpointFactory,
> endpointFactory.getActivationSpec());
> + CURRENT.set(beanContext);
> + try {
> + resourceAdapter.endpointDeactivation(endpointFactory,
> endpointFactory.getActivationSpec());
> + } finally {
> + CURRENT.remove();
> + }
>
> final MBeanServer server = LocalMBeanServer.get();
> for (final ObjectName objectName :
> endpointFactory.jmxNames) {
> @@ -481,6 +492,14 @@ public class MdbContainer implements RpcContainer {
> }
> }
>
> + public static BeanContext current() {
> + final BeanContext beanContext = CURRENT.get();
> + if (beanContext == null) {
> + CURRENT.remove();
> + }
> + return beanContext;
> + }
> +
> private static class MdbCallContext {
> private Method deliveryMethod;
> private TransactionPolicy txPolicy;
>
> http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/
> container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/
> ActiveMQResourceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/container/openejb-core/src/main/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapter.java
> b/container/openejb-core/src/main/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapter.java
> index dde1e2b..73762ce 100644
> --- a/container/openejb-core/src/main/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapter.java
> +++ b/container/openejb-core/src/main/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapter.java
> @@ -22,9 +22,15 @@ import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.RedeliveryPolicy;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
> +import org.apache.activemq.ra.ActiveMQEndpointActivationKey;
> +import org.apache.activemq.ra.ActiveMQEndpointWorker;
> import org.apache.activemq.ra.ActiveMQManagedConnection;
> import org.apache.activemq.ra.MessageActivationSpec;
> +import org.apache.openejb.BeanContext;
> +import org.apache.openejb.core.mdb.MdbContainer;
> import org.apache.openejb.loader.SystemInstance;
> +import org.apache.openejb.monitoring.LocalMBeanServer;
> +import org.apache.openejb.monitoring.ObjectNameBuilder;
> import org.apache.openejb.resource.AutoConnectionTracker;
> import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory;
> import org.apache.openejb.resource.activemq.jms2.
> TomEEManagedConnectionProxy;
> @@ -38,9 +44,28 @@ import org.apache.openejb.util.reflection.Reflections;
>
> import javax.jms.Connection;
> import javax.jms.JMSException;
> +import javax.management.Attribute;
> +import javax.management.AttributeList;
> +import javax.management.AttributeNotFoundException;
> +import javax.management.DynamicMBean;
> +import javax.management.InvalidAttributeValueException;
> +import javax.management.MBeanAttributeInfo;
> +import javax.management.MBeanConstructorInfo;
> +import javax.management.MBeanException;
> +import javax.management.MBeanInfo;
> +import javax.management.MBeanNotificationInfo;
> +import javax.management.MBeanOperationInfo;
> +import javax.management.MBeanParameterInfo;
> +import javax.management.MalformedObjectNameException;
> +import javax.management.ObjectName;
> +import javax.management.ReflectionException;
> import javax.naming.NamingException;
> +import javax.resource.NotSupportedException;
> +import javax.resource.ResourceException;
> +import javax.resource.spi.ActivationSpec;
> import javax.resource.spi.BootstrapContext;
> import javax.resource.spi.ResourceAdapterInternalException;
> +import javax.resource.spi.endpoint.MessageEndpointFactory;
> import java.lang.reflect.InvocationHandler;
> import java.lang.reflect.Method;
> import java.lang.reflect.Proxy;
> @@ -48,9 +73,13 @@ import java.net.URISyntaxException;
> import java.util.Collection;
> import java.util.Iterator;
> import java.util.Locale;
> +import java.util.Map;
> import java.util.Properties;
> +import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.TimeUnit;
>
> +import static javax.management.MBeanOperationInfo.ACTION;
> +
> @SuppressWarnings("UnusedDeclaration")
> public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQResourceAdapter
> {
>
> @@ -58,6 +87,7 @@ public class ActiveMQResourceAdapter extends
> org.apache.activemq.ra.ActiveMQReso
> private String useDatabaseLock;
> private String startupTimeout = "60000";
> private BootstrapContext bootstrapContext;
> + private final Map<BeanContext, ObjectName> mbeanNames = new
> ConcurrentHashMap<>();
>
> public String getDataSource() {
> return dataSource;
> @@ -161,6 +191,87 @@ public class ActiveMQResourceAdapter extends
> org.apache.activemq.ra.ActiveMQReso
> }
>
> @Override
> + public void endpointActivation(final MessageEndpointFactory
> endpointFactory, final ActivationSpec activationSpec) throws
> ResourceException {
> + final BeanContext current = MdbContainer.current();
> + if (current != null && "false".equalsIgnoreCase(
> current.getProperties().getProperty("MdbActiveOnStartup"))) {
> + if (!equals(activationSpec.getResourceAdapter())) {
> + throw new ResourceException("Activation spec not
> initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter()
> + " != " + this + ")");
> + }
> + if (!(activationSpec instanceof MessageActivationSpec)) {
> + throw new NotSupportedException("That type of
> ActivationSpec not supported: " + activationSpec.getClass());
> + }
> +
> + final ActiveMQEndpointActivationKey key = new
> ActiveMQEndpointActivationKey(endpointFactory,
> MessageActivationSpec.class.cast(activationSpec));
> + Map.class.cast(Reflections.get(this,
> "endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) {
> + });
> + // we dont want that worker.start();
> + } else {
> + super.endpointActivation(endpointFactory, activationSpec);
> + }
> +
> + if (current != null) {
> + addJMxControl(current, current.getProperties().
> getProperty("MdbJMXControl"));
> + }
> + }
> +
> + private void addJMxControl(final BeanContext current, final String
> name) throws ResourceException {
> + if (name == null || "false".equalsIgnoreCase(name)) {
> + return;
> + }
> +
> + final ActiveMQEndpointWorker worker = getWorker(current);
> + final ObjectName jmxName;
> + try {
> + jmxName = "true".equalsIgnoreCase(name) ? new
> ObjectNameBuilder()
> + .set("J2EEServer", "openejb")
> + .set("J2EEApplication", null)
> + .set("EJBModule", current.getModuleID())
> + .set("StatelessSessionBean", current.getEjbName())
> + .set("j2eeType", "control")
> + .set("name", current.getEjbName())
> + .build() : new ObjectName(name);
> + } catch (final MalformedObjectNameException e) {
> + throw new IllegalArgumentException(e);
> + }
> + mbeanNames.put(current, jmxName);
> +
> + final MBeanInfo info = new MBeanInfo(
> + "com.tomitribe.tomee.mdb.MdbControl",
> + "Allows to control a MDB listener",
> + new MBeanAttributeInfo[0],
> + new MBeanConstructorInfo[0],
> + new MBeanOperationInfo[]{
> + new MBeanOperationInfo("start", "Ensure the
> listener is active.", new MBeanParameterInfo[0], "void", ACTION),
> + new MBeanOperationInfo("stop", "Ensure the
> listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
> + },
> + new MBeanNotificationInfo[0]
> + );
> + LocalMBeanServer.registerSilently(new MdbJmxControl(worker),
> jmxName);
> + log.info("Deployed MDB control for " + current.getDeploymentID()
> + " on " + jmxName);
> + }
> +
> + @Override
> + public void endpointDeactivation(final MessageEndpointFactory
> endpointFactory, final ActivationSpec activationSpec) {
> + final BeanContext current = MdbContainer.current();
> + if (current != null && "true".equalsIgnoreCase(
> current.getProperties().getProperty("MdbJMXControl"))) {
> + LocalMBeanServer.unregisterSilently(mbeanNames.
> remove(current));
> + log.info("Undeployed MDB control for " +
> current.getDeploymentID());
> + }
> + super.endpointDeactivation(endpointFactory, activationSpec);
> + }
> +
> + private ActiveMQEndpointWorker getWorker(final BeanContext
> beanContext) throws ResourceException {
> + final Map<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>
> workers = Map.class.cast(Reflections.get(
> + MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(),
> "endpointWorkers"));
> + for (final Map.Entry<ActiveMQEndpointActivationKey,
> ActiveMQEndpointWorker> entry : workers.entrySet()) {
> + if (entry.getKey().getMessageEndpointFactory() ==
> beanContext.getContainerData()) {
> + return entry.getValue();
> + }
> + }
> + throw new IllegalStateException("No worker for " +
> beanContext.getDeploymentID());
> + }
> +
> + @Override
> public BootstrapContext getBootstrapContext() {
> return this.bootstrapContext;
> }
> @@ -296,4 +407,72 @@ public class ActiveMQResourceAdapter extends
> org.apache.activemq.ra.ActiveMQReso
> //Ignore
> }
> }
> +
> + public static class MdbJmxControl implements DynamicMBean {
> + private static final AttributeList ATTRIBUTE_LIST = new
> AttributeList();
> + private static final MBeanInfo INFO = new MBeanInfo(
> + "org.apache.openejb.resource.activemq.
> ActiveMQResourceAdapter.MdbJmxControl",
> + "Allows to control a MDB (start/stop)",
> + new MBeanAttributeInfo[0],
> + new MBeanConstructorInfo[0],
> + new MBeanOperationInfo[]{
> + new MBeanOperationInfo("start", "Ensure the
> listener is active.", new MBeanParameterInfo[0], "void", ACTION),
> + new MBeanOperationInfo("stop", "Ensure the
> listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
> + },
> + new MBeanNotificationInfo[0]);
> +
> + private final ActiveMQEndpointWorker worker;
> +
> + private MdbJmxControl(final ActiveMQEndpointWorker worker) {
> + this.worker = worker;
> + }
> +
> + @Override
> + public Object invoke(final String actionName, final Object[]
> params, final String[] signature) throws MBeanException,
> ReflectionException {
> + switch (actionName) {
> + case "stop":
> + try {
> + worker.stop();
> + } catch (final InterruptedException e) {
> + Thread.interrupted();
> + }
> + break;
> + case "start":
> + try {
> + worker.start();
> + } catch (ResourceException e) {
> + throw new MBeanException(new
> IllegalStateException(e.getMessage()));
> + }
> + break;
> + default:
> + throw new MBeanException(new IllegalStateException("unsupported
> operation: " + actionName));
> + }
> + return null;
> + }
> +
> + @Override
> + public MBeanInfo getMBeanInfo() {
> + return INFO;
> + }
> +
> + @Override
> + public Object getAttribute(final String attribute) throws
> AttributeNotFoundException, MBeanException, ReflectionException {
> + throw new AttributeNotFoundException();
> + }
> +
> + @Override
> + public void setAttribute(final Attribute attribute) throws
> AttributeNotFoundException, InvalidAttributeValueException,
> MBeanException, ReflectionException {
> + throw new AttributeNotFoundException();
> + }
> +
> + @Override
> + public AttributeList getAttributes(final String[] attributes) {
> + return ATTRIBUTE_LIST;
> + }
> +
> + @Override
> + public AttributeList setAttributes(final AttributeList
> attributes) {
> + return ATTRIBUTE_LIST;
> + }
> + }
> }
>
> http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/
> container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/
> ActiveMQResourceAdapterControlTest.java
> ----------------------------------------------------------------------
> diff --git a/container/openejb-core/src/test/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapterControlTest.java
> b/container/openejb-core/src/test/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapterControlTest.java
> new file mode 100644
> index 0000000..a574f59
> --- /dev/null
> +++ b/container/openejb-core/src/test/java/org/apache/openejb/
> resource/activemq/ActiveMQResourceAdapterControlTest.java
> @@ -0,0 +1,187 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.openejb.resource.activemq;
> +
> +import org.apache.openejb.config.EjbModule;
> +import org.apache.openejb.jee.EjbJar;
> +import org.apache.openejb.jee.oejb3.EjbDeployment;
> +import org.apache.openejb.jee.oejb3.OpenejbJar;
> +import org.apache.openejb.junit.ApplicationComposer;
> +import org.apache.openejb.monitoring.LocalMBeanServer;
> +import org.apache.openejb.testing.Classes;
> +import org.apache.openejb.testing.Configuration;
> +import org.apache.openejb.testing.Module;
> +import org.apache.openejb.testng.PropertiesBuilder;
> +import org.junit.Test;
> +import org.junit.runner.RunWith;
> +
> +import javax.annotation.Resource;
> +import javax.ejb.MessageDriven;
> +import javax.jms.Connection;
> +import javax.jms.ConnectionFactory;
> +import javax.jms.JMSException;
> +import javax.jms.Message;
> +import javax.jms.MessageListener;
> +import javax.jms.MessageProducer;
> +import javax.jms.Queue;
> +import javax.jms.Session;
> +import javax.jms.TextMessage;
> +import javax.management.ObjectName;
> +import java.util.Properties;
> +import java.util.concurrent.Semaphore;
> +import java.util.concurrent.TimeUnit;
> +
> +import static org.junit.Assert.assertEquals;
> +import static org.junit.Assert.assertFalse;
> +import static org.junit.Assert.assertNull;
> +import static org.junit.Assert.assertTrue;
> +import static org.junit.Assert.fail;
> +
> +@RunWith(ApplicationComposer.class)
> +public class ActiveMQResourceAdapterControlTest {
> + @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb")
> + private Queue queue;
> +
> + @Resource
> + private ConnectionFactory connectionFactory;
> +
> + @Configuration
> + public Properties config() {
> + return new PropertiesBuilder()
> + .p("ra", "new://Resource?type=ActiveMQResourceAdapter")
> + .p("ra.brokerXmlConfig", "broker:(vm://localhost)?
> useJmx=false&persistent=false")
> + .p("ra.serverUrl", "vm://localhost")
> +
> + .p("mdb", "new://Container?type=MESSAGE")
> + .p("mdb.resourceAdapter", "ra")
> +
> + .p("cf", "new://Resource?type=javax.
> jms.ConnectionFactory")
> + .p("cf.resourceAdapter", "ra")
> +
> + .p("openejb.deploymentId.format",
> "{appId}/{ejbJarId}/{ejbName}")
> + .build();
> + }
> +
> + @Module
> + @Classes(value = Mdb.class)
> + public EjbModule app() {
> + return new EjbModule(new EjbJar("test"), new OpenejbJar() {{
> + setId("test");
> + getEjbDeployment().add(new EjbDeployment() {{
> + setEjbName("ejb/Mdb");
> + getProperties().put("MdbActiveOnStartup", "false");
> + getProperties().put("MdbJMXControl",
> "default:type=test");
> + }});
> + }});
> + }
> +
> + @Test
> + public void ensureControl() throws Exception {
> + assertFalse(Mdb.awaiter.message, sendAndWait("Will be received
> after", 10, TimeUnit.SECONDS));
> +
> + setControl("start");
> + assertTrue(Mdb.awaiter.semaphore.tryAcquire(1,
> TimeUnit.MINUTES));
> + assertEquals("Will be received after", Mdb.awaiter.message);
> +
> + final long start = System.currentTimeMillis();
> + assertTrue(sendAndWait("First", 1, TimeUnit.MINUTES));
> + assertEquals("First", Mdb.awaiter.message);
> + final long end = System.currentTimeMillis();
> +
> + Mdb.awaiter.message = null;
> + setControl("stop");
> + // default would be wait 10s, but if machine is slow we compute
> it from the first msg stats
> + final long waitWithoutResponse = Math.max(10, 5 * (end - start) /
> 1000);
> + System.out.println("We'll wait " + waitWithoutResponse + "s to
> get a message on a stopped listener");
> + assertFalse(Mdb.awaiter.message, sendAndWait("Will be received
> after", waitWithoutResponse, TimeUnit.SECONDS));
> + assertNull(Mdb.awaiter.message);
> +
> + setControl("start");
> + assertTrue(sendAndWait("Second", 1, TimeUnit.MINUTES));
> + assertEquals("Will be received after", Mdb.awaiter.message);
> +
> + Mdb.awaiter.message = null;
> + assertTrue(Mdb.awaiter.semaphore.tryAcquire(1,
> TimeUnit.MINUTES));
> + assertEquals("Second", Mdb.awaiter.message);
> + }
> +
> + private void setControl(final String action) throws Exception {
> + LocalMBeanServer.get().invoke(
> + new ObjectName("default:type=test"),
> + action, new Object[0], new String[0]);
> + }
> +
> + private boolean sendAndWait(final String second, final long wait,
> final TimeUnit unit) throws JMSException {
> + doSend(second);
> + try {
> + return Mdb.awaiter.semaphore.tryAcquire(wait, unit);
> + } catch (final InterruptedException e) {
> + Thread.interrupted();
> + fail();
> + return false;
> + }
> + }
> +
> + private void doSend(final String txt) throws JMSException {
> + Connection c = null;
> + try {
> + c = connectionFactory.createConnection();
> + Session session = null;
> + try {
> + session = c.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> + MessageProducer producer = null;
> + try {
> + producer = session.createProducer(queue);
> + producer.send(session.createTextMessage(txt));
> + } finally {
> + if (producer != null) {
> + producer.close();
> + }
> + }
> + } finally {
> + if (session != null) {
> + session.close();
> + }
> + }
> + } finally {
> + if (c != null) {
> + c.close();
> + }
> + }
> + }
> +
> + @MessageDriven(name = "ejb/Mdb")
> + public static class Mdb implements MessageListener {
> + static final MessageAwaiter awaiter = new MessageAwaiter();
> +
> + @Override
> + public synchronized void onMessage(final Message message) {
> + try {
> + awaiter.message = TextMessage.class.cast(
> message).getText();
> + } catch (final JMSException e) {
> + throw new IllegalStateException(e);
> + } finally {
> + awaiter.semaphore.release();
> + }
> + }
> + }
> +
> + public static class MessageAwaiter {
> + private final Semaphore semaphore = new Semaphore(0);
> + private volatile String message;
> + }
> +}
>
>