You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/05/05 18:14:32 UTC
[1/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6100 - Virtual topic message
destination should be the target queue
Repository: activemq
Updated Branches:
refs/heads/activemq-5.13.x 0e78877f6 -> 6abf89f0a
https://issues.apache.org/jira/browse/AMQ-6100 - Virtual topic message destination should be the target queue
(cherry picked from commit 4e63ee7cc7c4ed7d1fb8ae916c0984b974c175c0)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c276e2e6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c276e2e6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c276e2e6
Branch: refs/heads/activemq-5.13.x
Commit: c276e2e652d2964cdc69ebf9b72663f0d985031d
Parents: 0e78877
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Mon Dec 21 15:19:01 2015 +0100
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu May 5 14:14:11 2016 -0400
----------------------------------------------------------------------
.../region/virtual/VirtualTopicInterceptor.java | 11 +-
.../MessageDestinationVirtualTopicTest.java | 120 +++++++++++++++++++
.../broker/virtual/SimpleMessageListener.java | 79 ++++++++++++
.../broker/virtual/VirtualTopicDLQTest.java | 2 +-
.../virtual/virtual-topic-network-test.xml | 100 ++++++++++++++++
5 files changed, 309 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
index f673770..65d3efc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
@@ -91,7 +91,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
public void run() {
try {
if (exceptionAtomicReference.get() == null) {
- dest.send(context, message.copy());
+ dest.send(context, copy(message, dest.getActiveMQDestination()));
}
} catch (Exception e) {
exceptionAtomicReference.set(e);
@@ -112,7 +112,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
} else {
for (final Destination dest : destinations) {
if (shouldDispatch(broker, message, dest)) {
- dest.send(context, message.copy());
+ dest.send(context, copy(message, dest.getActiveMQDestination()));
}
}
}
@@ -121,6 +121,13 @@ public class VirtualTopicInterceptor extends DestinationFilter {
}
}
+ private Message copy(Message original, ActiveMQDestination target) {
+ Message msg = original.copy();
+ msg.setDestination(target);
+ msg.setOriginalDestination(original.getDestination());
+ return msg;
+ }
+
private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
LocalTransactionId result = null;
if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
new file mode 100644
index 0000000..f370efc
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import javax.annotation.Resource;
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration({ "virtual-topic-network-test.xml" })
+public class MessageDestinationVirtualTopicTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MessageDestinationVirtualTopicTest.class);
+
+ private SimpleMessageListener listener1;
+
+ private SimpleMessageListener listener2;
+
+ @Resource(name = "broker1")
+ private BrokerService broker1;
+
+ @Resource(name = "broker2")
+ private BrokerService broker2;
+
+ private MessageProducer producer;
+
+ private Session session1;
+
+ public void init() throws JMSException {
+ // Create connection on Broker B2
+ ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:62616");
+ Connection connection2 = broker2ConnectionFactory.createConnection();
+ connection2.start();
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1");
+
+ // Bind listener on queue for consumer D
+ MessageConsumer consumer = session2.createConsumer(consumerDQueue);
+ listener2 = new SimpleMessageListener();
+ consumer.setMessageListener(listener2);
+
+ // Create connection on Broker B1
+ ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ Connection connection1 = broker1ConnectionFactory.createConnection();
+ connection1.start();
+ session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1");
+
+ // Bind listener on queue for consumer D
+ MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
+ listener1 = new SimpleMessageListener();
+ consumer1.setMessageListener(listener1);
+
+ // Create producer for topic, on B1
+ Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1");
+ producer = session1.createProducer(virtualTopicT1);
+ }
+
+ @Test
+ public void testDestinationNames() throws Exception {
+
+ LOG.info("Started waiting for broker 1 and 2");
+ broker1.waitUntilStarted();
+ broker2.waitUntilStarted();
+ LOG.info("Broker 1 and 2 have started");
+
+ init();
+
+ // Create a monitor
+ CountDownLatch monitor = new CountDownLatch(2);
+ listener1.setCountDown(monitor);
+ listener2.setCountDown(monitor);
+
+ LOG.info("Sending message");
+ // Send a message on the topic
+ TextMessage message = session1.createTextMessage("Hello World !");
+ producer.send(message);
+ LOG.info("Waiting for message reception");
+ // Wait the two messages in the related queues
+ monitor.await();
+
+ // Get the message destinations
+ String lastJMSDestination2 = listener2.getLastJMSDestination();
+ System.err.println(lastJMSDestination2);
+ String lastJMSDestination1 = listener1.getLastJMSDestination();
+ System.err.println(lastJMSDestination1);
+
+ // The destination names
+ assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2);
+ assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1);
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java
new file mode 100644
index 0000000..166bfb5
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleMessageListener implements MessageListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageListener.class);
+
+ private CountDownLatch messageReceivedToken;
+
+ private String lastJMSDestination;
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ Thread.sleep(2000L);
+ if (message instanceof TextMessage) {
+ LOG.info("Dest:" + message.getJMSDestination());
+ lastJMSDestination = message.getJMSDestination().toString();
+
+ Enumeration propertyNames = message.getPropertyNames();
+ while (propertyNames.hasMoreElements()) {
+ Object object = propertyNames.nextElement();
+ }
+
+ }
+ messageReceivedToken.countDown();
+
+ }
+ catch (JMSException e) {
+ LOG.error("Error while listening to a message", message);
+ }
+ catch (InterruptedException e) {
+ LOG.error("Interrupted while listening to a message", message);
+ }
+ }
+
+ /**
+ * @param countDown
+ * the countDown to set
+ */
+ public void setCountDown(CountDownLatch countDown) {
+ this.messageReceivedToken = countDown;
+ }
+
+ /**
+ * @return the lastJMSDestination
+ */
+ public String getLastJMSDestination() {
+ return lastJMSDestination;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
index 7c853cf..11e2d7f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
@@ -69,7 +69,7 @@ public class VirtualTopicDLQTest extends TestCase {
// Expected Individual Dead Letter Queue names that are tied to the
// Subscriber Queues
- private static final String dlqPrefix = "ActiveMQ.DLQ.Topic.";
+ private static final String dlqPrefix = "ActiveMQ.DLQ.Queue.";
// Number of messages
private static final int numberMessages = 6;
http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
new file mode 100644
index 0000000..0c2b1ec
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
@@ -0,0 +1,100 @@
+<!-- START SNIPPET: xbean -->
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:jms="http://www.springframework.org/schema/jms"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.11.0.xsd">
+
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <!-- Broker 1 definition -->
+ <amq:broker xmlns="http://activemq.apache.org/schema/core" id="broker1" brokerName="B1" useJmx="false" useShutdownHook="false" useVirtualTopics="true" persistent="false" start="true" startAsync="true">
+
+ <!-- Transport protocol -->
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:61616" />
+ </amq:transportConnectors>
+
+ <!-- Network of brokers setup -->
+ <!--amq:networkConnectors>
+ <amq:networkConnector name="linkToBrokerB2" uri="static:(tcp://localhost:62616)" networkTTL="1" duplex="false"/>
+ </amq:networkConnectors-->
+
+ <amq:destinationInterceptors>
+
+ <amq:virtualDestinationInterceptor>
+ <amq:virtualDestinations>
+ <!-- Virtual topic policies -->
+ <!-- they should be local to avoid message duplicate -->
+ <amq:virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/>
+ </amq:virtualDestinations>
+ </amq:virtualDestinationInterceptor>
+ </amq:destinationInterceptors>
+
+
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue=">" producerFlowControl="true" memoryLimit="4 mb">
+ <networkBridgeFilterFactory>
+ <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
+ </networkBridgeFilterFactory>
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+
+ <amq:destinations>
+ <!-- topics -->
+ <amq:topic physicalName="VirtualTopic.T1" />
+ </amq:destinations>
+
+ </amq:broker>
+
+ <!-- Broker 2 definition -->
+ <amq:broker xmlns="http://activemq.apache.org/schema/core" id="broker2" brokerName="B2" useJmx="false" useShutdownHook="false" useVirtualTopics="true" persistent="false" start="true" startAsync="true">
+
+ <!-- Transport protocol -->
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:62616" />
+ </amq:transportConnectors>
+
+ <!-- Network of brokers setup -->
+ <amq:networkConnectors>
+ <amq:networkConnector name="linkToBrokerB1" uri="static:(tcp://localhost:61616)" networkTTL="1" duplex="true" />
+ </amq:networkConnectors>
+
+ <amq:destinationInterceptors>
+
+ <amq:virtualDestinationInterceptor>
+ <amq:virtualDestinations>
+ <!-- Virtual topic policies -->
+ <!-- they should be local to avoid message duplicate -->
+ <amq:virtualTopic name=">" prefix="Consumer.*."/>
+ </amq:virtualDestinations>
+ </amq:virtualDestinationInterceptor>
+ </amq:destinationInterceptors>
+
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue=">" producerFlowControl="true" memoryLimit="4 mb">
+ <networkBridgeFilterFactory>
+ <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
+ </networkBridgeFilterFactory>
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+ <amq:destinations>
+ <!-- topics -->
+ <amq:topic physicalName="VirtualTopic.T1" />
+ </amq:destinations>
+
+ </amq:broker>
+
+</beans>
+ <!-- END SNIPPET: xbean -->
[2/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6100
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6100
Adding missing LICENSE header
(cherry picked from commit b7db97d8e215ba16a812b5f8577f2e8d91fc7787)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6abf89f0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6abf89f0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6abf89f0
Branch: refs/heads/activemq-5.13.x
Commit: 6abf89f0a5e2819d6eb280de3949e13c4a8531de
Parents: c276e2e
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Jan 6 15:49:50 2016 +0000
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu May 5 14:14:23 2016 -0400
----------------------------------------------------------------------
.../broker/virtual/virtual-topic-network-test.xml | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6abf89f0/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
index 0c2b1ec..6b6199c 100644
--- a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
@@ -1,4 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
<!-- START SNIPPET: xbean -->
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"