You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jg...@apache.org on 2022/04/13 13:03:17 UTC

[tomee] branch tomee-8.x updated: TOMEE-3902: Allow properties to be injected into any activation properties

This is an automated email from the ASF dual-hosted git repository.

jgallimore pushed a commit to branch tomee-8.x
in repository https://gitbox.apache.org/repos/asf/tomee.git


The following commit(s) were added to refs/heads/tomee-8.x by this push:
     new 01bf4103c3 TOMEE-3902: Allow properties to be injected into any activation properties
     new 9866a4d5e2 Merge remote-tracking branch 'apache/tomee-8.x' into tomee-8.x
01bf4103c3 is described below

commit 01bf4103c321dc265ab8c346b1478079343a7c8f
Author: Jonathan Gallimore <jo...@jrg.me.uk>
AuthorDate: Wed Apr 13 11:37:11 2022 +0100

    TOMEE-3902: Allow properties to be injected into any activation properties
---
 .../org/apache/openejb/core/mdb/MdbContainer.java  |  45 ++++--
 .../openejb/core/mdb/ActivationConfigTest.java     | 131 +++++++++++++++++
 .../mdb/MdbContainerClientIdActivationTest.java    | 159 +++++++++++++++++++++
 itests/ejb/pom.xml                                 |   6 +
 .../apache/tomee/itests/ejb/MessageCounter.java    |  37 +++++
 .../apache/tomee/itests/ejb/MessageReceiver.java   |  38 +++++
 .../apache/tomee/itests/ejb/MessageResource.java   |  53 +++++++
 .../org/apache/tomee/itests/ejb/MessageSender.java |  54 +++++++
 .../itests/ejb/MultiTomEETopicSubscriberTest.java  | 115 +++++++++++++++
 .../org/apache/tomee/itests/ejb/PortReuseTest.java |   2 +-
 itests/pom.xml                                     |   1 +
 11 files changed, 631 insertions(+), 10 deletions(-)

diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
index 0f763c1392..e2d9343b87 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
@@ -38,6 +38,7 @@ import org.apache.openejb.resource.XAResourceWrapper;
 import org.apache.openejb.spi.SecurityService;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
+import org.apache.openejb.util.StringTemplate;
 import org.apache.xbean.recipe.ObjectRecipe;
 import org.apache.xbean.recipe.Option;
 
@@ -67,12 +68,9 @@ import javax.validation.ConstraintViolationException;
 import javax.validation.Validator;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -274,17 +272,46 @@ public class MdbContainer implements RpcContainer, BaseMdbContainer {
         }
     }
 
-    private ActivationSpec createActivationSpec(final BeanContext beanContext) throws OpenEJBException {
+    // visibility to allow unit testing
+    public ActivationSpec createActivationSpec(final BeanContext beanContext) throws OpenEJBException {
         try {
             // initialize the object recipe
             final ObjectRecipe objectRecipe = new ObjectRecipe(activationSpecClass);
             objectRecipe.allow(Option.IGNORE_MISSING_PROPERTIES);
             objectRecipe.disallow(Option.FIELD_INJECTION);
 
-
             final Map<String, String> activationProperties = beanContext.getActivationProperties();
+
+            final Map<String, String> context = new HashMap<>();
+            context.put("ejbJarId", beanContext.getModuleContext().getId());
+            context.put("ejbName", beanContext.getEjbName());
+            context.put("appId", beanContext.getModuleContext().getAppContext().getId());
+
+            String hostname;
+            try {
+                hostname = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                hostname = "hostname-unknown";
+            }
+
+            context.put("hostName", hostname);
+
+            String uniqueId = Long.toString(System.currentTimeMillis());
+            try {
+                Class idGen = Class.forName("org.apache.activemq.util.IdGenerator");
+                final Object generator = idGen.getConstructor().newInstance();
+                final Method generateId = idGen.getDeclaredMethod("generateId");
+                final Object ID = generateId.invoke(generator);
+
+                uniqueId = ID.toString();
+            } catch (Exception e) {
+                // ignore and use the timestamp
+            }
+
+            context.put("uniqueId", uniqueId);
+
             for (final Map.Entry<String, String> entry : activationProperties.entrySet()) {
-                objectRecipe.setMethodProperty(entry.getKey(), entry.getValue());
+                objectRecipe.setMethodProperty(entry.getKey(), new StringTemplate(entry.getValue()).apply(context));
             }
             objectRecipe.setMethodProperty("beanClass", beanContext.getBeanClass());
 
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ActivationConfigTest.java b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ActivationConfigTest.java
new file mode 100644
index 0000000000..5a2b957338
--- /dev/null
+++ b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ActivationConfigTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.core.mdb;
+
+import org.apache.activemq.ra.ActiveMQActivationSpec;
+import org.apache.activemq.ra.ActiveMQResourceAdapter;
+import org.apache.openejb.AppContext;
+import org.apache.openejb.BeanContext;
+import org.apache.openejb.ModuleContext;
+import org.apache.openejb.core.ivm.naming.IvmContext;
+import org.apache.openejb.loader.SystemInstance;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.ResourceAdapter;
+import javax.validation.ConstraintViolation;
+import javax.validation.Validator;
+import javax.validation.executable.ExecutableValidator;
+import javax.validation.metadata.BeanDescriptor;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ActivationConfigTest {
+
+    @Test
+    public void testShouldResolvePlaceHolder() throws Exception {
+        final ResourceAdapter ra = new ActiveMQResourceAdapter();
+        final MdbContainer container = new MdbContainer("TestMdbContainer", null, ra, MessageListener.class, ActiveMQActivationSpec.class, 10, false);
+
+
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("clientId", "{appId}#{ejbJarId}#{ejbName}#{hostName}#{uniqueId}");
+        properties.put("subscriptionName", "subscription-{appId}#{ejbJarId}#{ejbName}#{hostName}#{uniqueId}");
+        properties.put("destination", "MyTopic");
+        properties.put("destinationType", "javax.jms.Topic");
+
+        final BeanContext beanContext = getMockBeanContext(properties);
+
+        final ActivationSpec activationSpec = container.createActivationSpec(beanContext);
+        Assert.assertTrue(activationSpec instanceof ActiveMQActivationSpec);
+
+        ActiveMQActivationSpec spec = (ActiveMQActivationSpec) activationSpec;
+
+        final String clientId = spec.getClientId();
+        final String[] parts = clientId.split("#");
+
+        final String hostname = (InetAddress.getLocalHost()).getHostName();
+
+        Assert.assertEquals("appId", parts[0]);
+        Assert.assertEquals("moduleId", parts[1]);
+        Assert.assertEquals("MyEjb", parts[2]);
+        Assert.assertEquals(hostname, parts[3]);
+
+        final Pattern pattern = Pattern.compile("ID:.*?-\\d+-\\d+-\\d+:\\d");
+        Assert.assertTrue(pattern.matcher(parts[4]).matches());
+    }
+
+    private BeanContext getMockBeanContext(final Map<String, String> properties) throws Exception {
+        final IvmContext context = new IvmContext();
+        context.bind("comp/Validator", new NoOpValidator());
+
+        final AppContext mockAppContext = new AppContext("appId", SystemInstance.get(),  this.getClass().getClassLoader(), context, context, false);
+        final ModuleContext mockModuleContext =  new ModuleContext("moduleId", new URI(""), "uniqueId", mockAppContext, context, this.getClass().getClassLoader());
+        final BeanContext mockBeanContext = new BeanContext("test", context, mockModuleContext, MyListener.class, MessageListener.class, properties);
+        mockBeanContext.setEjbName("MyEjb");
+
+        return mockBeanContext;
+    }
+
+    public static class MyListener implements MessageListener {
+
+        @Override
+        public void onMessage(Message message) {
+
+        }
+    }
+
+    public static class NoOpValidator implements Validator {
+        @Override
+        public <T> Set<ConstraintViolation<T>> validate(T object, Class<?>... groups) {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public <T> Set<ConstraintViolation<T>> validateProperty(T object, String propertyName, Class<?>... groups) {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public <T> Set<ConstraintViolation<T>> validateValue(Class<T> beanType, String propertyName, Object value, Class<?>... groups) {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public BeanDescriptor getConstraintsForClass(Class<?> clazz) {
+            return null;
+        }
+
+        @Override
+        public <T> T unwrap(Class<T> type) {
+            return null;
+        }
+
+        @Override
+        public ExecutableValidator forExecutables() {
+            return null;
+        }
+    }
+}
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MdbContainerClientIdActivationTest.java b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MdbContainerClientIdActivationTest.java
new file mode 100644
index 0000000000..68f4339397
--- /dev/null
+++ b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MdbContainerClientIdActivationTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.core.mdb;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.openejb.jee.MessageDrivenBean;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.testing.Configuration;
+import org.apache.openejb.testing.Module;
+import org.apache.openejb.testng.PropertiesBuilder;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.ActivationConfigProperty;
+import javax.ejb.MessageDriven;
+import javax.jms.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+
+@RunWith(ApplicationComposer.class)
+public class MdbContainerClientIdActivationTest {
+
+    private static BrokerService broker;
+
+    @Resource(name = "target")
+    private Topic destination;
+
+    @Resource(name = "cf")
+    private ConnectionFactory cf;
+
+    @Configuration
+    public Properties config() throws Exception{
+        final TransportConnector tc = broker.getTransportConnectors().iterator().next();
+        final int port = tc.getConnectUri().getPort();
+
+        return new PropertiesBuilder()
+
+                .p("amq", "new://Resource?type=ActiveMQResourceAdapter")
+
+                .p("amq.DataSource", "")
+                .p("amq.BrokerXmlConfig", "") //connect to an external broker
+                .p("amq.ServerUrl", "tcp://localhost:" + port)
+
+                .p("target", "new://Resource?type=Topic")
+
+                .p("mdbs", "new://Container?type=MESSAGE")
+                .p("mdbs.ResourceAdapter", "amq")
+                .p("mdbs.pool", "false")
+                .p("cf", "new://Resource?type=" + ConnectionFactory.class.getName())
+                .p("cf.ResourceAdapter", "amq")
+
+                .p("mdb.activation.clientId", "{ejbName}-{uniqueId}")
+
+                .build();
+    }
+
+    @Module
+    public MessageDrivenBean jar() {
+        return new MessageDrivenBean(Listener.class);
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:0"); // pick a random available port
+
+        broker.start();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        broker.stop();
+    }
+
+    @Test
+    public void shouldHaveAUniqueClientID() throws Exception {
+        final Connection connection = cf.createConnection();
+        connection.start();
+
+        final Session session = connection.createSession();
+        final MessageProducer producer = session.createProducer(this.destination);
+        final TextMessage msg = session.createTextMessage("Hello");
+        producer.send(msg);
+        producer.close();
+        session.close();
+        connection.close();
+
+        Listener.latch.await();
+
+
+        final MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
+        final Set<ObjectName> objectNames = platformMBeanServer.queryNames(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=target,endpoint=Consumer,*"), null);
+
+        ObjectName match = null;
+
+        for (final ObjectName objectName : objectNames) {
+            if (objectName.getKeyProperty("clientId").startsWith("testMDB")) {
+                match = objectName;
+                break;
+            }
+        }
+
+        Assert.assertNotNull(match);
+
+        final String clientId = match.getKeyProperty("clientId");
+        final String uniquePart = clientId.substring(8);
+
+        Assert.assertNotNull(clientId);
+        Assert.assertNotNull(uniquePart);
+
+        final Pattern pattern = Pattern.compile("ID_.*?-\\d+-\\d+-\\d+_\\d");
+        Assert.assertTrue(pattern.matcher(uniquePart).matches());
+    }
+
+    @MessageDriven(name="testMDB", activationConfig = {
+            @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
+            @ActivationConfigProperty(propertyName = "destination", propertyValue = "target")
+    })
+    public static class Listener implements MessageListener {
+        public static CountDownLatch latch = new CountDownLatch(1);
+
+        @Override
+        public void onMessage(final Message message) {
+            latch.countDown();
+        }
+
+
+    }
+
+}
\ No newline at end of file
diff --git a/itests/ejb/pom.xml b/itests/ejb/pom.xml
index 794139546d..c41ec3053c 100644
--- a/itests/ejb/pom.xml
+++ b/itests/ejb/pom.xml
@@ -77,6 +77,12 @@
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <version>${org.apache.activemq.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageCounter.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageCounter.java
new file mode 100644
index 0000000000..5687b26875
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageCounter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import javax.ejb.Lock;
+import javax.ejb.LockType;
+import javax.ejb.Singleton;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Singleton
+@Lock(LockType.READ)
+public class MessageCounter {
+
+    private static final AtomicInteger COUNTER = new AtomicInteger();
+
+    public void increment() {
+        COUNTER.incrementAndGet();
+    }
+
+    public int getCount() {
+        return COUNTER.get();
+    }
+}
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageReceiver.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageReceiver.java
new file mode 100644
index 0000000000..a554ffd280
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageReceiver.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import javax.ejb.ActivationConfigProperty;
+import javax.ejb.EJB;
+import javax.ejb.MessageDriven;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+@MessageDriven(name="Receiver", activationConfig = {
+        @ActivationConfigProperty(propertyName = "destination", propertyValue = "target"),
+        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic")
+})
+public class MessageReceiver implements MessageListener {
+
+    @EJB
+    private MessageCounter counter;
+
+    @Override
+    public void onMessage(Message message) {
+        counter.increment();
+    }
+}
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageResource.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageResource.java
new file mode 100644
index 0000000000..2a135a1389
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageResource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import javax.ejb.EJB;
+import javax.ejb.Lock;
+import javax.ejb.LockType;
+import javax.ejb.Singleton;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Singleton
+@Lock(LockType.READ)
+@Path("api/messages")
+public class MessageResource {
+
+    @EJB
+    private MessageCounter counter;
+
+    @EJB
+    private MessageSender sender;
+
+    @GET
+    @Path("count")
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response count() {
+        return Response.ok(counter.getCount()).build();
+    }
+
+    @GET
+    @Path("test")
+    public void sendTestMessages() {
+        sender.sendMessage();
+    }
+
+}
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageSender.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageSender.java
new file mode 100644
index 0000000000..56557b217a
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageSender.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import javax.annotation.Resource;
+import javax.ejb.Lock;
+import javax.ejb.LockType;
+import javax.ejb.Singleton;
+import javax.inject.Inject;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
+import javax.jms.Topic;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Singleton
+@Lock(LockType.READ)
+public class MessageSender {
+
+    @Resource
+    private ConnectionFactory cf;
+
+    @Inject
+    private JMSContext jmsContext;
+
+    @Resource(name = "target")
+    private Topic eventTopic;
+
+
+    public void sendMessage() {
+        final String message = "hello world";
+        for (int i = 0; i < 1000; i++) {
+            sendMessage(eventTopic, message);
+        }
+    }
+
+    private void sendMessage(final Topic topic, final String message) {
+        jmsContext.createProducer().send(topic, jmsContext.createTextMessage(message));
+    }
+
+}
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MultiTomEETopicSubscriberTest.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MultiTomEETopicSubscriberTest.java
new file mode 100644
index 0000000000..9c110a8651
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MultiTomEETopicSubscriberTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.tomee.server.composer.Archive;
+import org.apache.tomee.server.composer.TomEE;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.tomitribe.util.Files;
+import org.tomitribe.util.IO;
+
+import javax.jms.ConnectionFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+public class MultiTomEETopicSubscriberTest {
+
+    private BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        // start an ActiveMQ broker
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:0"); // pick a random available port
+
+        broker.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @Test
+    public void test() throws Exception {
+        // get the ActiveMQ OpenWire port
+        final TransportConnector tc = broker.getTransportConnectors().iterator().next();
+        final int port = tc.getConnectUri().getPort();
+
+        // start 2 TomEE servers
+        final TomEE tomee1 = buildTomEE(port);
+        final TomEE tomee2 = buildTomEE(port);
+
+        // the key thing here is that both of these servers should be able to subscribe to the topic
+        // from their respective MDBs without exceptions.
+
+        // lets send some test messages from 1 of the servers
+        IO.slurp(new URL("http://localhost:" + tomee1.getPort() + "/test/api/messages/test"));
+        Thread.sleep(5000);
+
+        // and check that all the messages were received on both servers
+        final String result1 = IO.slurp(new URL("http://localhost:" + tomee1.getPort() + "/test/api/messages/count"));
+        final String result2 = IO.slurp(new URL("http://localhost:" + tomee2.getPort() + "/test/api/messages/count"));
+
+        Assert.assertEquals(1000, Integer.parseInt(result1));
+        Assert.assertEquals(1000, Integer.parseInt(result2));
+    }
+
+    private TomEE buildTomEE(final int activemqPort) throws Exception {
+        return TomEE.plus()
+                .add("webapps/test/WEB-INF/lib/app.jar", Archive.archive()
+                        .add(MessageCounter.class)
+                        .add(MessageReceiver.class)
+                        .add(MessageResource.class)
+                        .add(MessageSender.class)
+                        .asJar())
+                .home(h -> updateSystemProperties(h, activemqPort))
+                .build();
+    }
+
+    private void updateSystemProperties(final File home, final int activemqPort) {
+        try {
+            final File systemProps = Files.file(home, "conf", "system.properties");
+            String props = IO.slurp(systemProps);
+
+            props = props + "\namq=new://Resource?type=ActiveMQResourceAdapter" +
+                    "\namq.DataSource=" +
+                    "\namq.BrokerXmlConfig=" +
+                    "\namq.ServerUrl=tcp://localhost:" + activemqPort +
+                    "\ntarget=new://Resource?type=Topic" +
+                    "\nmdbs=new://Container?type=MESSAGE" +
+                    "\nmdbs.ResourceAdapter=amq" +
+                    "\nmdbs.pool=false" +
+                    "\ncf=new://Resource?type=" + ConnectionFactory.class.getName() +
+                    "\ncf.ResourceAdapter=amq" +
+                    "\nmdb.activation.clientId={ejbName}-{uniqueId}";
+
+            IO.copy(IO.read(props), systemProps);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/PortReuseTest.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/PortReuseTest.java
index 54f897fad4..d35f9c5dbe 100644
--- a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/PortReuseTest.java
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/PortReuseTest.java
@@ -41,7 +41,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-public class PortReuse  Test {
+public class PortReuseTest {
 
     @Test
     public void test() throws Exception {
diff --git a/itests/pom.xml b/itests/pom.xml
index 9db4403743..db0eb4eaef 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -47,6 +47,7 @@
     <module>openejb-itests-servlets</module>
     <module>openejb-itests-web</module>
     <module>jaxrs</module>
+    <module>ejb</module>
   </modules>
 
 </project>