You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/15 22:49:39 UTC
[activemq-artemis] branch master updated: ARTEMIS-2339
Compatibility around prefixing
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new b9b6fdd ARTEMIS-2339 Compatibility around prefixing
new ea973ce This closes #2669
b9b6fdd is described below
commit b9b6fddeea4082bd11d9dc1d3748ab80684bfd78
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed May 15 09:10:53 2019 -0400
ARTEMIS-2339 Compatibility around prefixing
There are a few issues with prefixing and compatibility.
This is basically an issue when integrated with Wildfly or any other case
where prefix is activated
and playing with older versions.
---
.../artemis/jms/client/ActiveMQSession.java | 6 +-
.../compatible1X/ActiveMQCompatibleMessage.java | 2 +-
...age.java => ActiveMQTextCompatibleMessage.java} | 15 ++-
.../protocol/core/ServerSessionPacketHandler.java | 10 +-
.../resources/jmsReplyToQueue/artemisServer.groovy | 53 +++++++++
.../jmsReplyToQueue/receiveMessages.groovy | 55 +++++++++
.../jmsReplyToQueue/sendMessagesAddress.groovy | 55 +++++++++
.../jmsReplyToTempQueue/artemisServer.groovy | 49 ++++++++
.../jmsReplyToTempQueue/receiveMessages.groovy | 55 +++++++++
.../jmsReplyToTempQueue/sendMessagesAddress.groovy | 55 +++++++++
.../jmsReplyToTempTopic/artemisServer.groovy | 52 +++++++++
.../jmsReplyToTempTopic/receiveMessages.groovy | 58 ++++++++++
.../jmsReplyToTempTopic/sendMessagesAddress.groovy | 71 ++++++++++++
.../resources/jmsReplyToTopic/artemisServer.groovy | 49 ++++++++
.../jmsReplyToTopic/receiveMessages.groovy | 55 +++++++++
.../jmsReplyToTopic/sendMessagesAddress.groovy | 55 +++++++++
.../tests/compatibility/JmsReplyToQueueTest.java | 125 +++++++++++++++++++++
.../compatibility/JmsReplyToTempQueueTest.java | 125 +++++++++++++++++++++
.../compatibility/JmsReplyToTempTopicTest.java | 125 +++++++++++++++++++++
.../tests/compatibility/JmsReplyToTopicTest.java | 125 +++++++++++++++++++++
20 files changed, 1186 insertions(+), 9 deletions(-)
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index f6223a6..5041b57 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -68,7 +68,7 @@ import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMes
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQMapCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQObjectCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompatibleMessage;
-import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompabileMessage;
+import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompatibleMessage;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.CompositeAddress;
@@ -234,7 +234,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ActiveMQTextMessage msg;
if (enable1xPrefixes) {
- msg = new ActiveMQTextCompabileMessage(session);
+ msg = new ActiveMQTextCompatibleMessage(session);
} else {
msg = new ActiveMQTextMessage(session);
}
@@ -249,7 +249,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ActiveMQTextMessage msg;
if (enable1xPrefixes) {
- msg = new ActiveMQTextCompabileMessage(session);
+ msg = new ActiveMQTextCompatibleMessage(session);
} else {
msg = new ActiveMQTextMessage(session);
}
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java
index ec4e720..9248e8e 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java
@@ -190,7 +190,7 @@ public class ActiveMQCompatibleMessage extends ActiveMQMessage {
}
case ActiveMQTextMessage.TYPE: // 3
{
- msg = new ActiveMQTextCompabileMessage(message, session);
+ msg = new ActiveMQTextCompatibleMessage(message, session);
break;
}
default: {
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompatibleMessage.java
similarity index 75%
rename from artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java
rename to artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompatibleMessage.java
index ae8aa52..4b0d09c 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompatibleMessage.java
@@ -21,11 +21,12 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
-public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage {
+public class ActiveMQTextCompatibleMessage extends ActiveMQTextMessage {
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
@@ -40,15 +41,21 @@ public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage {
return replyTo;
}
- public ActiveMQTextCompabileMessage(ClientSession session) {
+ public ActiveMQTextCompatibleMessage(ClientSession session) {
super(session);
}
- public ActiveMQTextCompabileMessage(ClientMessage message, ClientSession session) {
+ public ActiveMQTextCompatibleMessage(ClientMessage message, ClientSession session) {
super(message, session);
}
- public ActiveMQTextCompabileMessage(TextMessage foreign, ClientSession session) throws JMSException {
+ public ActiveMQTextCompatibleMessage(TextMessage foreign, ClientSession session) throws JMSException {
super(foreign, session);
}
+
+
+ @Override
+ protected SimpleString checkPrefix(SimpleString address) {
+ return ActiveMQCompatibleMessage.checkPrefix1X(address);
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 6730b15..f32c013 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -347,7 +347,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case CREATE_QUEUE: {
CreateQueueMessage request = (CreateQueueMessage) packet;
requiresResponse = request.isRequiresResponse();
- session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable());
+ session.createQueue(request.getAddress(), request.getQueueName(), getRoutingTypeFromAddress(request.getAddress()), request.getFilterString(), request.isTemporary(), request.isDurable());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
@@ -634,6 +634,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
}
+ private RoutingType getRoutingTypeFromAddress(SimpleString address) {
+ if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX) || address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
+ return RoutingType.ANYCAST;
+ }
+ return RoutingType.MULTICAST;
+ }
+
+
private Packet createNullResponseMessage(Packet packet) {
final Packet response;
if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) {
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/artemisServer.groovy
new file mode 100644
index 0000000..444eaf1
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/artemisServer.groovy
@@ -0,0 +1,53 @@
+package jmsReplyToQueue
+
+import org.apache.activemq.artemis.api.core.RoutingType
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
+import org.apache.activemq.artemis.core.server.JournalType
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// starts an artemis server
+import org.apache.activemq.artemis.core.server.impl.AddressInfo
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
+
+String folder = arg[0];
+String queueAddress = "jms.queue.myQueue";
+String replyQueueAddress = "jms.queue.myReplyQueue";
+
+configuration = new ConfigurationImpl();
+configuration.setJournalType(JournalType.NIO);
+configuration.setBrokerInstance(new File(folder + "/server"));
+configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
+configuration.setSecurityEnabled(false);
+configuration.setPersistenceEnabled(false);
+
+
+jmsConfiguration = new JMSConfigurationImpl();
+
+server = new EmbeddedJMS();
+server.setConfiguration(configuration);
+server.setJmsConfiguration(jmsConfiguration);
+server.start();
+
+server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
+server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
+
+server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(replyQueueAddress), RoutingType.ANYCAST));
+server.getActiveMQServer().createQueue(SimpleString.toSimpleString(replyQueueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(replyQueueAddress), null, true, false);
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/receiveMessages.groovy
new file mode 100644
index 0000000..2eb9fc5
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/receiveMessages.groovy
@@ -0,0 +1,55 @@
+package jmsReplyToQueue
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+try {
+ cf.setEnable1xPrefixes(true);
+} catch (Throwable totallyIgnored) {
+ // older versions will not have this method, dont even bother about seeing the stack trace or exception
+}
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+Queue myQueue = session.createQueue("myQueue");
+MessageConsumer queueConsumer = session.createConsumer(myQueue);
+consumerCreated.countDown();
+connection.start()
+
+Message message = queueConsumer.receive(5000);
+GroovyRun.assertNotNull(message)
+session.commit();
+System.out.println("Received " + message + " from: " + myQueue);
+queueConsumer.close();
+
+System.out.println("Sending message to: " + message.getJMSReplyTo());
+MessageProducer producer = session.createProducer(message.getJMSReplyTo());
+message = session.createMessage();
+producer.send(message);
+session.commit();
+
+connection.close();
+
+latch.countDown();
+
+
+
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/sendMessagesAddress.groovy
new file mode 100644
index 0000000..fd6baf8
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/sendMessagesAddress.groovy
@@ -0,0 +1,55 @@
+package jmsReplyToQueue
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+try {
+ cf.setEnable1xPrefixes(true);
+} catch (Throwable totallyIgnored) {
+ // older versions will not have this method, dont even bother about seeing the stack trace or exception
+}
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+connection.start();
+
+Queue myQueue = session.createQueue("myQueue");
+Queue temporaryQueue = session.createQueue("myTemporaryQueue");
+MessageConsumer consumer = session.createConsumer(temporaryQueue);
+
+MessageProducer queueProducer = session.createProducer(myQueue)
+
+queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+Message message = session.createMessage();
+message.setJMSReplyTo(temporaryQueue);
+System.out.println("Sending " + message + " to: " + myQueue);
+queueProducer.send(message);
+session.commit();
+
+System.out.println("Receiving message from: " + temporaryQueue);
+message = consumer.receive(10000);
+GroovyRun.assertNotNull(message);
+session.commit();
+System.out.println("Received message: " + message);
+
+connection.close();
+senderLatch.countDown();
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/artemisServer.groovy
new file mode 100644
index 0000000..2b06830
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/artemisServer.groovy
@@ -0,0 +1,49 @@
+package jmsReplyToTempQueue
+
+import org.apache.activemq.artemis.api.core.RoutingType
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
+import org.apache.activemq.artemis.core.server.JournalType
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// starts an artemis server
+import org.apache.activemq.artemis.core.server.impl.AddressInfo
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
+
+String folder = arg[0];
+String queueAddress = "jms.queue.myQueue";
+
+configuration = new ConfigurationImpl();
+configuration.setJournalType(JournalType.NIO);
+configuration.setBrokerInstance(new File(folder + "/server"));
+configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
+configuration.setSecurityEnabled(false);
+configuration.setPersistenceEnabled(false);
+
+
+jmsConfiguration = new JMSConfigurationImpl();
+
+server = new EmbeddedJMS();
+server.setConfiguration(configuration);
+server.setJmsConfiguration(jmsConfiguration);
+server.start();
+
+server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
+server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/receiveMessages.groovy
new file mode 100644
index 0000000..a7a1157
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/receiveMessages.groovy
@@ -0,0 +1,55 @@
+package jmsReplyToTempQueue
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+try {
+ cf.setEnable1xPrefixes(true);
+} catch (Throwable totallyIgnored) {
+ // older versions will not have this method, dont even bother about seeing the stack trace or exception
+}
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+Queue myQueue = session.createQueue("myQueue");
+MessageConsumer queueConsumer = session.createConsumer(myQueue);
+consumerCreated.countDown();
+connection.start()
+
+Message message = queueConsumer.receive(5000);
+GroovyRun.assertNotNull(message)
+session.commit();
+System.out.println("Received " + message + " from: " + myQueue);
+queueConsumer.close();
+
+System.out.println("Sending message to: " + message.getJMSReplyTo());
+MessageProducer producer = session.createProducer(message.getJMSReplyTo());
+message = session.createMessage();
+producer.send(message);
+session.commit();
+
+connection.close();
+
+latch.countDown();
+
+
+
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/sendMessagesAddress.groovy
new file mode 100644
index 0000000..968b8ad
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/sendMessagesAddress.groovy
@@ -0,0 +1,55 @@
+package jmsReplyToTempQueue
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+try {
+ cf.setEnable1xPrefixes(true);
+} catch (Throwable totallyIgnored) {
+ // older versions will not have this method, dont even bother about seeing the stack trace or exception
+}
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+connection.start();
+
+Queue myQueue = session.createQueue("myQueue");
+Queue temporaryQueue = session.createTemporaryQueue();
+MessageConsumer consumer = session.createConsumer(temporaryQueue);
+
+MessageProducer queueProducer = session.createProducer(myQueue)
+
+queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+Message message = session.createMessage();
+message.setJMSReplyTo(temporaryQueue);
+System.out.println("Sending " + message + " to: " + myQueue);
+queueProducer.send(message);
+session.commit();
+
+System.out.println("Receiving message from: " + temporaryQueue);
+message = consumer.receive(10000);
+GroovyRun.assertNotNull(message);
+session.commit();
+System.out.println("Received message: " + message);
+
+connection.close();
+senderLatch.countDown();
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/artemisServer.groovy
new file mode 100644
index 0000000..9e85473
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/artemisServer.groovy
@@ -0,0 +1,52 @@
+package jmsReplyToTempTopic
+
+import org.apache.activemq.artemis.api.core.RoutingType
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
+import org.apache.activemq.artemis.core.server.JournalType
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// starts an artemis server
+import org.apache.activemq.artemis.core.server.impl.AddressInfo
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
+
+String folder = arg[0];
+String queueAddress = "jms.queue.myQueue";
+String replyTopicAddress = "jms.topic.myReplyTopic";
+
+configuration = new ConfigurationImpl();
+configuration.setJournalType(JournalType.NIO);
+configuration.setBrokerInstance(new File(folder + "/server"));
+configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
+configuration.setSecurityEnabled(false);
+configuration.setPersistenceEnabled(false);
+
+
+jmsConfiguration = new JMSConfigurationImpl();
+
+server = new EmbeddedJMS();
+server.setConfiguration(configuration);
+server.setJmsConfiguration(jmsConfiguration);
+server.start();
+
+server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
+server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
+
+server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(replyTopicAddress), RoutingType.MULTICAST));
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/receiveMessages.groovy
new file mode 100644
index 0000000..0ab25bf
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/receiveMessages.groovy
@@ -0,0 +1,58 @@
+package jmsReplyToTempTopic
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+try {
+ cf.setEnable1xPrefixes(true);
+} catch (Throwable totallyIgnored) {
+ // older versions will not have this method, dont even bother about seeing the stack trace or exception
+}
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+Queue myQueue = session.createQueue("myQueue");
+MessageConsumer queueConsumer = session.createConsumer(myQueue);
+consumerCreated.countDown();
+connection.start()
+
+for (int i = 0; i < 5; i++) {
+ Message message = queueConsumer.receive(5000);
+ GroovyRun.assertNotNull(message)
+ System.out.println("Received " + message + " from: " + myQueue);
+
+ GroovyRun.assertEquals("myQueue", ((Queue)message.getJMSDestination()).getQueueName());
+
+ System.out.println("Sending message to: " + message.getJMSReplyTo());
+ MessageProducer producer = session.createProducer(message.getJMSReplyTo());
+ message = session.createMessage();
+ producer.send(message);
+}
+queueConsumer.close();
+session.commit();
+
+connection.close();
+
+latch.countDown();
+
+
+
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/sendMessagesAddress.groovy
new file mode 100644
index 0000000..97b9fed
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/sendMessagesAddress.groovy
@@ -0,0 +1,71 @@
+package jmsReplyToTempTopic
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+try {
+ cf.setEnable1xPrefixes(true);
+} catch (Throwable totallyIgnored) {
+ // older versions will not have this method, dont even bother about seeing the stack trace or exception
+}
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+connection.start();
+
+Queue myQueue = session.createQueue("myQueue");
+
+System.out.println("myQueue::" + myQueue);
+TemporaryTopic replyTopic = session.createTemporaryTopic();
+MessageConsumer consumer = session.createConsumer(replyTopic);
+
+System.out.println("Temporary Topic " + replyTopic);
+
+MessageProducer queueProducer = session.createProducer(myQueue)
+
+queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+sendMessage(session.createTextMessage("hello"), replyTopic, myQueue, queueProducer);
+sendMessage(session.createMapMessage(), replyTopic, myQueue, queueProducer);
+sendMessage(session.createObjectMessage(), replyTopic, myQueue, queueProducer);
+sendMessage(session.createStreamMessage(), replyTopic, myQueue, queueProducer);
+sendMessage(session.createMessage(), replyTopic, myQueue, queueProducer);
+session.commit();
+
+
+System.out.println("Receiving message from: " + replyTopic);
+for (int i = 0; i < 5; i++) {
+ message = consumer.receive(10000);
+ GroovyRun.assertNotNull(message);
+}
+GroovyRun.assertNull(consumer.receiveNoWait());
+session.commit();
+System.out.println("Received message: " + message);
+
+connection.close();
+senderLatch.countDown();
+
+
+void sendMessage(Message message, TemporaryTopic replyTopic, Queue myQueue, MessageProducer queueProducer) {
+ message.setJMSReplyTo(replyTopic);
+ System.out.println("Sending " + message + " to: " + myQueue);
+ queueProducer.send(message);
+}
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/artemisServer.groovy
new file mode 100644
index 0000000..37a6aa0
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/artemisServer.groovy
@@ -0,0 +1,49 @@
+package jmsReplyToTopic
+
+import org.apache.activemq.artemis.api.core.RoutingType
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
+import org.apache.activemq.artemis.core.server.JournalType
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// starts an artemis server
+import org.apache.activemq.artemis.core.server.impl.AddressInfo
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
+
+String folder = arg[0];
+String queueAddress = "jms.queue.myQueue";
+
+configuration = new ConfigurationImpl();
+configuration.setJournalType(JournalType.NIO);
+configuration.setBrokerInstance(new File(folder + "/server"));
+configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
+configuration.setSecurityEnabled(false);
+configuration.setPersistenceEnabled(false);
+
+
+jmsConfiguration = new JMSConfigurationImpl();
+
+server = new EmbeddedJMS();
+server.setConfiguration(configuration);
+server.setJmsConfiguration(jmsConfiguration);
+server.start();
+
+server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
+server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/receiveMessages.groovy
new file mode 100644
index 0000000..6883f4e
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/receiveMessages.groovy
@@ -0,0 +1,55 @@
+package jmsReplyToTopic
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+try {
+ cf.setEnable1xPrefixes(true);
+} catch (Throwable totallyIgnored) {
+ // older versions will not have this method, dont even bother about seeing the stack trace or exception
+}
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+Queue myQueue = session.createQueue("myQueue");
+MessageConsumer queueConsumer = session.createConsumer(myQueue);
+consumerCreated.countDown();
+connection.start()
+
+Message message = queueConsumer.receive(5000);
+GroovyRun.assertNotNull(message)
+session.commit();
+System.out.println("Received " + message + " from: " + myQueue);
+queueConsumer.close();
+
+System.out.println("Sending message to: " + message.getJMSReplyTo());
+MessageProducer producer = session.createProducer(message.getJMSReplyTo());
+message = session.createMessage();
+producer.send(message);
+session.commit();
+
+connection.close();
+
+latch.countDown();
+
+
+
diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/sendMessagesAddress.groovy
new file mode 100644
index 0000000..02f6137
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/sendMessagesAddress.groovy
@@ -0,0 +1,55 @@
+package jmsReplyToTopic
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+try {
+ cf.setEnable1xPrefixes(true);
+} catch (Throwable totallyIgnored) {
+ // older versions will not have this method, dont even bother about seeing the stack trace or exception
+}
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+connection.start();
+
+Queue myQueue = session.createQueue("myQueue");
+Topic replyTopic = session.createTopic("myReplyTopic");
+MessageConsumer consumer = session.createConsumer(replyTopic);
+
+MessageProducer queueProducer = session.createProducer(myQueue)
+
+queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+Message message = session.createMessage();
+message.setJMSReplyTo(replyTopic);
+System.out.println("Sending " + message + " to: " + myQueue);
+queueProducer.send(message);
+session.commit();
+
+System.out.println("Receiving message from: " + replyTopic);
+message = consumer.receive(10000);
+GroovyRun.assertNotNull(message);
+session.commit();
+System.out.println("Received message: " + message);
+
+connection.close();
+senderLatch.countDown();
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToQueueTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToQueueTest.java
new file mode 100644
index 0000000..3930544
--- /dev/null
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToQueueTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+
+@RunWith(Parameterized.class)
+public class JmsReplyToQueueTest extends VersionedBase {
+
+ @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
+ public static Collection getParameters() {
+ List<Object[]> combinations = new ArrayList<>();
+ combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
+ combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
+ combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
+ return combinations;
+ }
+
+ public JmsReplyToQueueTest(String server, String sender, String receiver) throws Exception {
+ super(server, sender, receiver);
+ }
+
+
+ @Before
+ public void setUp() throws Throwable {
+ FileUtil.deleteDirectory(serverFolder.getRoot());
+ }
+
+ @After
+ public void stopTest() throws Exception {
+ execute(serverClassloader, "server.stop()");
+ }
+
+ @Test
+ public void testJmsReplyToQueue() throws Throwable {
+ evaluate(serverClassloader, "jmsReplyToQueue/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
+
+ CountDownLatch consumerCreated = new CountDownLatch(1);
+ CountDownLatch receiverLatch = new CountDownLatch(1);
+ CountDownLatch senderLatch = new CountDownLatch(1);
+
+ setVariable(receiverClassloader, "latch", receiverLatch);
+ setVariable(receiverClassloader, "consumerCreated", consumerCreated);
+
+ AtomicInteger errors = new AtomicInteger(0);
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ evaluate(receiverClassloader, "jmsReplyToQueue/receiveMessages.groovy", receiver);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t1.start();
+
+ Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
+
+ setVariable(senderClassloader, "senderLatch", senderLatch);
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ evaluate(senderClassloader, "jmsReplyToQueue/sendMessagesAddress.groovy", sender);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t2.start();
+
+ try {
+ Assert.assertTrue("Sender did not get message from queue", senderLatch.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
+ } finally {
+
+ t1.join(TimeUnit.SECONDS.toMillis(1));
+ t2.join(TimeUnit.SECONDS.toMillis(1));
+
+ if (t1.isAlive()) {
+ t1.interrupt();
+ }
+
+ if (t2.isAlive()) {
+ t2.interrupt();
+ }
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempQueueTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempQueueTest.java
new file mode 100644
index 0000000..3bb5cc0
--- /dev/null
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempQueueTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+
+@RunWith(Parameterized.class)
+public class JmsReplyToTempQueueTest extends VersionedBase {
+
+ @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
+ public static Collection getParameters() {
+ List<Object[]> combinations = new ArrayList<>();
+ combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
+ combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
+ combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
+ return combinations;
+ }
+
+ public JmsReplyToTempQueueTest(String server, String sender, String receiver) throws Exception {
+ super(server, sender, receiver);
+ }
+
+
+ @Before
+ public void setUp() throws Throwable {
+ FileUtil.deleteDirectory(serverFolder.getRoot());
+ }
+
+ @After
+ public void stopTest() throws Exception {
+ execute(serverClassloader, "server.stop()");
+ }
+
+ @Test
+ public void testJmsReplyToTempQueue() throws Throwable {
+ evaluate(serverClassloader, "jmsReplyToTempQueue/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
+
+ CountDownLatch consumerCreated = new CountDownLatch(1);
+ CountDownLatch receiverLatch = new CountDownLatch(1);
+ CountDownLatch senderLatch = new CountDownLatch(1);
+
+ setVariable(receiverClassloader, "latch", receiverLatch);
+ setVariable(receiverClassloader, "consumerCreated", consumerCreated);
+
+ AtomicInteger errors = new AtomicInteger(0);
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ evaluate(receiverClassloader, "jmsReplyToTempQueue/receiveMessages.groovy", receiver);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t1.start();
+
+ Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
+
+ setVariable(senderClassloader, "senderLatch", senderLatch);
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ evaluate(senderClassloader, "jmsReplyToTempQueue/sendMessagesAddress.groovy", sender);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t2.start();
+
+ try {
+ Assert.assertTrue("Sender did not get message from temporary queue", senderLatch.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
+ } finally {
+
+ t1.join(TimeUnit.SECONDS.toMillis(1));
+ t2.join(TimeUnit.SECONDS.toMillis(1));
+
+ if (t1.isAlive()) {
+ t1.interrupt();
+ }
+
+ if (t2.isAlive()) {
+ t2.interrupt();
+ }
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempTopicTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempTopicTest.java
new file mode 100644
index 0000000..1efa3de
--- /dev/null
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempTopicTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.compatibility;
+
+import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+
+@RunWith(Parameterized.class)
+public class JmsReplyToTempTopicTest extends VersionedBase {
+
+ @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
+ public static Collection getParameters() {
+ List<Object[]> combinations = new ArrayList<>();
+ combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
+ combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
+ combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
+ return combinations;
+ }
+
+ public JmsReplyToTempTopicTest(String server, String sender, String receiver) throws Exception {
+ super(server, sender, receiver);
+ }
+
+
+ @Before
+ public void setUp() throws Throwable {
+ FileUtil.deleteDirectory(serverFolder.getRoot());
+ }
+
+ @After
+ public void stopTest() throws Exception {
+ execute(serverClassloader, "server.stop()");
+ }
+
+ @Test
+ public void testJmsReplyToTempTopic() throws Throwable {
+ evaluate(serverClassloader, "jmsReplyToTempTopic/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
+
+ CountDownLatch consumerCreated = new CountDownLatch(1);
+ CountDownLatch receiverLatch = new CountDownLatch(1);
+ CountDownLatch senderLatch = new CountDownLatch(1);
+
+ setVariable(receiverClassloader, "latch", receiverLatch);
+ setVariable(receiverClassloader, "consumerCreated", consumerCreated);
+
+ AtomicInteger errors = new AtomicInteger(0);
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ evaluate(receiverClassloader, "jmsReplyToTempTopic/receiveMessages.groovy", receiver);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t1.start();
+
+ Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
+
+ setVariable(senderClassloader, "senderLatch", senderLatch);
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ evaluate(senderClassloader, "jmsReplyToTempTopic/sendMessagesAddress.groovy", sender);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t2.start();
+
+ try {
+ Assert.assertTrue("Sender did not get message from temporary topic", senderLatch.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
+ } finally {
+
+ t1.join(TimeUnit.SECONDS.toMillis(1));
+ t2.join(TimeUnit.SECONDS.toMillis(1));
+
+ if (t1.isAlive()) {
+ t1.interrupt();
+ }
+
+ if (t2.isAlive()) {
+ t2.interrupt();
+ }
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTopicTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTopicTest.java
new file mode 100644
index 0000000..3d2406b
--- /dev/null
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTopicTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+
+@RunWith(Parameterized.class)
+public class JmsReplyToTopicTest extends VersionedBase {
+
+ @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
+ public static Collection getParameters() {
+ List<Object[]> combinations = new ArrayList<>();
+ combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
+ combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
+ combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
+ return combinations;
+ }
+
+ public JmsReplyToTopicTest(String server, String sender, String receiver) throws Exception {
+ super(server, sender, receiver);
+ }
+
+
+ @Before
+ public void setUp() throws Throwable {
+ FileUtil.deleteDirectory(serverFolder.getRoot());
+ }
+
+ @After
+ public void stopTest() throws Exception {
+ execute(serverClassloader, "server.stop()");
+ }
+
+ @Test
+ public void testJmsReplyToTopic() throws Throwable {
+ evaluate(serverClassloader, "jmsReplyToTopic/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
+
+ CountDownLatch consumerCreated = new CountDownLatch(1);
+ CountDownLatch receiverLatch = new CountDownLatch(1);
+ CountDownLatch senderLatch = new CountDownLatch(1);
+
+ setVariable(receiverClassloader, "latch", receiverLatch);
+ setVariable(receiverClassloader, "consumerCreated", consumerCreated);
+
+ AtomicInteger errors = new AtomicInteger(0);
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ evaluate(receiverClassloader, "jmsReplyToTopic/receiveMessages.groovy", receiver);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t1.start();
+
+ Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
+
+ setVariable(senderClassloader, "senderLatch", senderLatch);
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ evaluate(senderClassloader, "jmsReplyToTopic/sendMessagesAddress.groovy", sender);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t2.start();
+
+ try {
+ Assert.assertTrue("Sender did not get message from topic", senderLatch.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
+ } finally {
+
+ t1.join(TimeUnit.SECONDS.toMillis(1));
+ t2.join(TimeUnit.SECONDS.toMillis(1));
+
+ if (t1.isAlive()) {
+ t1.interrupt();
+ }
+
+ if (t2.isAlive()) {
+ t2.interrupt();
+ }
+ }
+
+ }
+
+}
\ No newline at end of file