You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jg...@apache.org on 2022/04/13 13:03:17 UTC
[tomee] branch tomee-8.x updated: TOMEE-3902: Allow properties to be injected into any activation properties
This is an automated email from the ASF dual-hosted git repository.
jgallimore pushed a commit to branch tomee-8.x
in repository https://gitbox.apache.org/repos/asf/tomee.git
The following commit(s) were added to refs/heads/tomee-8.x by this push:
new 01bf4103c3 TOMEE-3902: Allow properties to be injected into any activation properties
new 9866a4d5e2 Merge remote-tracking branch 'apache/tomee-8.x' into tomee-8.x
01bf4103c3 is described below
commit 01bf4103c321dc265ab8c346b1478079343a7c8f
Author: Jonathan Gallimore <jo...@jrg.me.uk>
AuthorDate: Wed Apr 13 11:37:11 2022 +0100
TOMEE-3902: Allow properties to be injected into any activation properties
---
.../org/apache/openejb/core/mdb/MdbContainer.java | 45 ++++--
.../openejb/core/mdb/ActivationConfigTest.java | 131 +++++++++++++++++
.../mdb/MdbContainerClientIdActivationTest.java | 159 +++++++++++++++++++++
itests/ejb/pom.xml | 6 +
.../apache/tomee/itests/ejb/MessageCounter.java | 37 +++++
.../apache/tomee/itests/ejb/MessageReceiver.java | 38 +++++
.../apache/tomee/itests/ejb/MessageResource.java | 53 +++++++
.../org/apache/tomee/itests/ejb/MessageSender.java | 54 +++++++
.../itests/ejb/MultiTomEETopicSubscriberTest.java | 115 +++++++++++++++
.../org/apache/tomee/itests/ejb/PortReuseTest.java | 2 +-
itests/pom.xml | 1 +
11 files changed, 631 insertions(+), 10 deletions(-)
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
index 0f763c1392..e2d9343b87 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
@@ -38,6 +38,7 @@ import org.apache.openejb.resource.XAResourceWrapper;
import org.apache.openejb.spi.SecurityService;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
+import org.apache.openejb.util.StringTemplate;
import org.apache.xbean.recipe.ObjectRecipe;
import org.apache.xbean.recipe.Option;
@@ -67,12 +68,9 @@ import javax.validation.ConstraintViolationException;
import javax.validation.Validator;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -274,17 +272,46 @@ public class MdbContainer implements RpcContainer, BaseMdbContainer {
}
}
- private ActivationSpec createActivationSpec(final BeanContext beanContext) throws OpenEJBException {
+ // visibility to allow unit testing
+ public ActivationSpec createActivationSpec(final BeanContext beanContext) throws OpenEJBException {
try {
// initialize the object recipe
final ObjectRecipe objectRecipe = new ObjectRecipe(activationSpecClass);
objectRecipe.allow(Option.IGNORE_MISSING_PROPERTIES);
objectRecipe.disallow(Option.FIELD_INJECTION);
-
final Map<String, String> activationProperties = beanContext.getActivationProperties();
+
+ final Map<String, String> context = new HashMap<>();
+ context.put("ejbJarId", beanContext.getModuleContext().getId());
+ context.put("ejbName", beanContext.getEjbName());
+ context.put("appId", beanContext.getModuleContext().getAppContext().getId());
+
+ String hostname;
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ hostname = "hostname-unknown";
+ }
+
+ context.put("hostName", hostname);
+
+ String uniqueId = Long.toString(System.currentTimeMillis());
+ try {
+ Class idGen = Class.forName("org.apache.activemq.util.IdGenerator");
+ final Object generator = idGen.getConstructor().newInstance();
+ final Method generateId = idGen.getDeclaredMethod("generateId");
+ final Object ID = generateId.invoke(generator);
+
+ uniqueId = ID.toString();
+ } catch (Exception e) {
+ // ignore and use the timestamp
+ }
+
+ context.put("uniqueId", uniqueId);
+
for (final Map.Entry<String, String> entry : activationProperties.entrySet()) {
- objectRecipe.setMethodProperty(entry.getKey(), entry.getValue());
+ objectRecipe.setMethodProperty(entry.getKey(), new StringTemplate(entry.getValue()).apply(context));
}
objectRecipe.setMethodProperty("beanClass", beanContext.getBeanClass());
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ActivationConfigTest.java b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ActivationConfigTest.java
new file mode 100644
index 0000000000..5a2b957338
--- /dev/null
+++ b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ActivationConfigTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.core.mdb;
+
+import org.apache.activemq.ra.ActiveMQActivationSpec;
+import org.apache.activemq.ra.ActiveMQResourceAdapter;
+import org.apache.openejb.AppContext;
+import org.apache.openejb.BeanContext;
+import org.apache.openejb.ModuleContext;
+import org.apache.openejb.core.ivm.naming.IvmContext;
+import org.apache.openejb.loader.SystemInstance;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.ResourceAdapter;
+import javax.validation.ConstraintViolation;
+import javax.validation.Validator;
+import javax.validation.executable.ExecutableValidator;
+import javax.validation.metadata.BeanDescriptor;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ActivationConfigTest {
+
+ @Test
+ public void testShouldResolvePlaceHolder() throws Exception {
+ final ResourceAdapter ra = new ActiveMQResourceAdapter();
+ final MdbContainer container = new MdbContainer("TestMdbContainer", null, ra, MessageListener.class, ActiveMQActivationSpec.class, 10, false);
+
+
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("clientId", "{appId}#{ejbJarId}#{ejbName}#{hostName}#{uniqueId}");
+ properties.put("subscriptionName", "subscription-{appId}#{ejbJarId}#{ejbName}#{hostName}#{uniqueId}");
+ properties.put("destination", "MyTopic");
+ properties.put("destinationType", "javax.jms.Topic");
+
+ final BeanContext beanContext = getMockBeanContext(properties);
+
+ final ActivationSpec activationSpec = container.createActivationSpec(beanContext);
+ Assert.assertTrue(activationSpec instanceof ActiveMQActivationSpec);
+
+ ActiveMQActivationSpec spec = (ActiveMQActivationSpec) activationSpec;
+
+ final String clientId = spec.getClientId();
+ final String[] parts = clientId.split("#");
+
+ final String hostname = (InetAddress.getLocalHost()).getHostName();
+
+ Assert.assertEquals("appId", parts[0]);
+ Assert.assertEquals("moduleId", parts[1]);
+ Assert.assertEquals("MyEjb", parts[2]);
+ Assert.assertEquals(hostname, parts[3]);
+
+ final Pattern pattern = Pattern.compile("ID:.*?-\\d+-\\d+-\\d+:\\d");
+ Assert.assertTrue(pattern.matcher(parts[4]).matches());
+ }
+
+ private BeanContext getMockBeanContext(final Map<String, String> properties) throws Exception {
+ final IvmContext context = new IvmContext();
+ context.bind("comp/Validator", new NoOpValidator());
+
+ final AppContext mockAppContext = new AppContext("appId", SystemInstance.get(), this.getClass().getClassLoader(), context, context, false);
+ final ModuleContext mockModuleContext = new ModuleContext("moduleId", new URI(""), "uniqueId", mockAppContext, context, this.getClass().getClassLoader());
+ final BeanContext mockBeanContext = new BeanContext("test", context, mockModuleContext, MyListener.class, MessageListener.class, properties);
+ mockBeanContext.setEjbName("MyEjb");
+
+ return mockBeanContext;
+ }
+
+ public static class MyListener implements MessageListener {
+
+ @Override
+ public void onMessage(Message message) {
+
+ }
+ }
+
+ public static class NoOpValidator implements Validator {
+ @Override
+ public <T> Set<ConstraintViolation<T>> validate(T object, Class<?>... groups) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public <T> Set<ConstraintViolation<T>> validateProperty(T object, String propertyName, Class<?>... groups) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public <T> Set<ConstraintViolation<T>> validateValue(Class<T> beanType, String propertyName, Object value, Class<?>... groups) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public BeanDescriptor getConstraintsForClass(Class<?> clazz) {
+ return null;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> type) {
+ return null;
+ }
+
+ @Override
+ public ExecutableValidator forExecutables() {
+ return null;
+ }
+ }
+}
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MdbContainerClientIdActivationTest.java b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MdbContainerClientIdActivationTest.java
new file mode 100644
index 0000000000..68f4339397
--- /dev/null
+++ b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MdbContainerClientIdActivationTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.core.mdb;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.openejb.jee.MessageDrivenBean;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.testing.Configuration;
+import org.apache.openejb.testing.Module;
+import org.apache.openejb.testng.PropertiesBuilder;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.ActivationConfigProperty;
+import javax.ejb.MessageDriven;
+import javax.jms.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+
+@RunWith(ApplicationComposer.class)
+public class MdbContainerClientIdActivationTest {
+
+ private static BrokerService broker;
+
+ @Resource(name = "target")
+ private Topic destination;
+
+ @Resource(name = "cf")
+ private ConnectionFactory cf;
+
+ @Configuration
+ public Properties config() throws Exception{
+ final TransportConnector tc = broker.getTransportConnectors().iterator().next();
+ final int port = tc.getConnectUri().getPort();
+
+ return new PropertiesBuilder()
+
+ .p("amq", "new://Resource?type=ActiveMQResourceAdapter")
+
+ .p("amq.DataSource", "")
+ .p("amq.BrokerXmlConfig", "") //connect to an external broker
+ .p("amq.ServerUrl", "tcp://localhost:" + port)
+
+ .p("target", "new://Resource?type=Topic")
+
+ .p("mdbs", "new://Container?type=MESSAGE")
+ .p("mdbs.ResourceAdapter", "amq")
+ .p("mdbs.pool", "false")
+ .p("cf", "new://Resource?type=" + ConnectionFactory.class.getName())
+ .p("cf.ResourceAdapter", "amq")
+
+ .p("mdb.activation.clientId", "{ejbName}-{uniqueId}")
+
+ .build();
+ }
+
+ @Module
+ public MessageDrivenBean jar() {
+ return new MessageDrivenBean(Listener.class);
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(true);
+ broker.addConnector("tcp://localhost:0"); // pick a random available port
+
+ broker.start();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ broker.stop();
+ }
+
+ @Test
+ public void shouldHaveAUniqueClientID() throws Exception {
+ final Connection connection = cf.createConnection();
+ connection.start();
+
+ final Session session = connection.createSession();
+ final MessageProducer producer = session.createProducer(this.destination);
+ final TextMessage msg = session.createTextMessage("Hello");
+ producer.send(msg);
+ producer.close();
+ session.close();
+ connection.close();
+
+ Listener.latch.await();
+
+
+ final MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
+ final Set<ObjectName> objectNames = platformMBeanServer.queryNames(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=target,endpoint=Consumer,*"), null);
+
+ ObjectName match = null;
+
+ for (final ObjectName objectName : objectNames) {
+ if (objectName.getKeyProperty("clientId").startsWith("testMDB")) {
+ match = objectName;
+ break;
+ }
+ }
+
+ Assert.assertNotNull(match);
+
+ final String clientId = match.getKeyProperty("clientId");
+ final String uniquePart = clientId.substring(8);
+
+ Assert.assertNotNull(clientId);
+ Assert.assertNotNull(uniquePart);
+
+ final Pattern pattern = Pattern.compile("ID_.*?-\\d+-\\d+-\\d+_\\d");
+ Assert.assertTrue(pattern.matcher(uniquePart).matches());
+ }
+
+ @MessageDriven(name="testMDB", activationConfig = {
+ @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
+ @ActivationConfigProperty(propertyName = "destination", propertyValue = "target")
+ })
+ public static class Listener implements MessageListener {
+ public static CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void onMessage(final Message message) {
+ latch.countDown();
+ }
+
+
+ }
+
+}
\ No newline at end of file
diff --git a/itests/ejb/pom.xml b/itests/ejb/pom.xml
index 794139546d..c41ec3053c 100644
--- a/itests/ejb/pom.xml
+++ b/itests/ejb/pom.xml
@@ -77,6 +77,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>${org.apache.activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageCounter.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageCounter.java
new file mode 100644
index 0000000000..5687b26875
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageCounter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import javax.ejb.Lock;
+import javax.ejb.LockType;
+import javax.ejb.Singleton;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Singleton
+@Lock(LockType.READ)
+public class MessageCounter {
+
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+
+ public void increment() {
+ COUNTER.incrementAndGet();
+ }
+
+ public int getCount() {
+ return COUNTER.get();
+ }
+}
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageReceiver.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageReceiver.java
new file mode 100644
index 0000000000..a554ffd280
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageReceiver.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import javax.ejb.ActivationConfigProperty;
+import javax.ejb.EJB;
+import javax.ejb.MessageDriven;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+@MessageDriven(name="Receiver", activationConfig = {
+ @ActivationConfigProperty(propertyName = "destination", propertyValue = "target"),
+ @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic")
+})
+public class MessageReceiver implements MessageListener {
+
+ @EJB
+ private MessageCounter counter;
+
+ @Override
+ public void onMessage(Message message) {
+ counter.increment();
+ }
+}
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageResource.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageResource.java
new file mode 100644
index 0000000000..2a135a1389
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageResource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import javax.ejb.EJB;
+import javax.ejb.Lock;
+import javax.ejb.LockType;
+import javax.ejb.Singleton;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Singleton
+@Lock(LockType.READ)
+@Path("api/messages")
+public class MessageResource {
+
+ @EJB
+ private MessageCounter counter;
+
+ @EJB
+ private MessageSender sender;
+
+ @GET
+ @Path("count")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response count() {
+ return Response.ok(counter.getCount()).build();
+ }
+
+ @GET
+ @Path("test")
+ public void sendTestMessages() {
+ sender.sendMessage();
+ }
+
+}
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageSender.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageSender.java
new file mode 100644
index 0000000000..56557b217a
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MessageSender.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import javax.annotation.Resource;
+import javax.ejb.Lock;
+import javax.ejb.LockType;
+import javax.ejb.Singleton;
+import javax.inject.Inject;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
+import javax.jms.Topic;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Singleton
+@Lock(LockType.READ)
+public class MessageSender {
+
+ @Resource
+ private ConnectionFactory cf;
+
+ @Inject
+ private JMSContext jmsContext;
+
+ @Resource(name = "target")
+ private Topic eventTopic;
+
+
+ public void sendMessage() {
+ final String message = "hello world";
+ for (int i = 0; i < 1000; i++) {
+ sendMessage(eventTopic, message);
+ }
+ }
+
+ private void sendMessage(final Topic topic, final String message) {
+ jmsContext.createProducer().send(topic, jmsContext.createTextMessage(message));
+ }
+
+}
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MultiTomEETopicSubscriberTest.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MultiTomEETopicSubscriberTest.java
new file mode 100644
index 0000000000..9c110a8651
--- /dev/null
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/MultiTomEETopicSubscriberTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomee.itests.ejb;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.tomee.server.composer.Archive;
+import org.apache.tomee.server.composer.TomEE;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.tomitribe.util.Files;
+import org.tomitribe.util.IO;
+
+import javax.jms.ConnectionFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+public class MultiTomEETopicSubscriberTest {
+
+ private BrokerService broker;
+
+ @Before
+ public void setUp() throws Exception {
+ // start an ActiveMQ broker
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(true);
+ broker.addConnector("tcp://localhost:0"); // pick a random available port
+
+ broker.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ broker.stop();
+ }
+
+ @Test
+ public void test() throws Exception {
+ // get the ActiveMQ OpenWire port
+ final TransportConnector tc = broker.getTransportConnectors().iterator().next();
+ final int port = tc.getConnectUri().getPort();
+
+ // start 2 TomEE servers
+ final TomEE tomee1 = buildTomEE(port);
+ final TomEE tomee2 = buildTomEE(port);
+
+ // the key thing here is that both of these servers should be able to subscribe to the topic
+ // from their respective MDBs without exceptions.
+
+ // lets send some test messages from 1 of the servers
+ IO.slurp(new URL("http://localhost:" + tomee1.getPort() + "/test/api/messages/test"));
+ Thread.sleep(5000);
+
+ // and check that all the messages were received on both servers
+ final String result1 = IO.slurp(new URL("http://localhost:" + tomee1.getPort() + "/test/api/messages/count"));
+ final String result2 = IO.slurp(new URL("http://localhost:" + tomee2.getPort() + "/test/api/messages/count"));
+
+ Assert.assertEquals(1000, Integer.parseInt(result1));
+ Assert.assertEquals(1000, Integer.parseInt(result2));
+ }
+
+ private TomEE buildTomEE(final int activemqPort) throws Exception {
+ return TomEE.plus()
+ .add("webapps/test/WEB-INF/lib/app.jar", Archive.archive()
+ .add(MessageCounter.class)
+ .add(MessageReceiver.class)
+ .add(MessageResource.class)
+ .add(MessageSender.class)
+ .asJar())
+ .home(h -> updateSystemProperties(h, activemqPort))
+ .build();
+ }
+
+ private void updateSystemProperties(final File home, final int activemqPort) {
+ try {
+ final File systemProps = Files.file(home, "conf", "system.properties");
+ String props = IO.slurp(systemProps);
+
+ props = props + "\namq=new://Resource?type=ActiveMQResourceAdapter" +
+ "\namq.DataSource=" +
+ "\namq.BrokerXmlConfig=" +
+ "\namq.ServerUrl=tcp://localhost:" + activemqPort +
+ "\ntarget=new://Resource?type=Topic" +
+ "\nmdbs=new://Container?type=MESSAGE" +
+ "\nmdbs.ResourceAdapter=amq" +
+ "\nmdbs.pool=false" +
+ "\ncf=new://Resource?type=" + ConnectionFactory.class.getName() +
+ "\ncf.ResourceAdapter=amq" +
+ "\nmdb.activation.clientId={ejbName}-{uniqueId}";
+
+ IO.copy(IO.read(props), systemProps);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/PortReuseTest.java b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/PortReuseTest.java
index 54f897fad4..d35f9c5dbe 100644
--- a/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/PortReuseTest.java
+++ b/itests/ejb/src/test/java/org/apache/tomee/itests/ejb/PortReuseTest.java
@@ -41,7 +41,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-public class PortReuse Test {
+public class PortReuseTest {
@Test
public void test() throws Exception {
diff --git a/itests/pom.xml b/itests/pom.xml
index 9db4403743..db0eb4eaef 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -47,6 +47,7 @@
<module>openejb-itests-servlets</module>
<module>openejb-itests-web</module>
<module>jaxrs</module>
+ <module>ejb</module>
</modules>
</project>