You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/12/14 21:01:33 UTC

activemq git commit: AMQ-6875 - Use the correct destination for Virtual destination consumers when using Virtual Topics

Repository: activemq
Updated Branches:
  refs/heads/master d3e439378 -> 56baba96c


AMQ-6875 - Use the correct destination for Virtual destination consumers
when using Virtual Topics


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/56baba96
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/56baba96
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/56baba96

Branch: refs/heads/master
Commit: 56baba96c657d4e44b88955a964d6c92ff39b822
Parents: d3e4393
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Dec 14 16:00:37 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Dec 14 16:01:10 2017 -0500

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 46 +++++++++++++++++++-
 .../network/VirtualConsumerDemandTest.java      | 24 ++++++++--
 2 files changed, 65 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/56baba96/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 1acd524..1508c61 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.advisory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -42,6 +43,7 @@ import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -58,6 +60,7 @@ import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SessionId;
+import org.apache.activemq.filter.DestinationPath;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -604,7 +607,7 @@ public class AdvisoryBroker extends BrokerFilter {
 
                 if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
                     LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
-                    info.setDestination(virtualDestination.getVirtualDestination());
+                    setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
                     ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
 
                     if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
@@ -616,7 +619,7 @@ public class AdvisoryBroker extends BrokerFilter {
         //this is the case of a real consumer coming online
         } else {
             info = info.copy();
-            info.setDestination(virtualDestination.getVirtualDestination());
+            setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
             ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
 
             if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
@@ -626,6 +629,45 @@ public class AdvisoryBroker extends BrokerFilter {
         }
     }
 
+    /**
+     * Sets the virtual destination on the ConsumerInfo
+     * If this is a VirtualTopic then the destination used will be the actual topic subscribed
+     * to in order to track demand properly
+     *
+     * @param info
+     * @param virtualDestination
+     * @param activeMQDest
+     */
+    private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) {
+        info.setDestination(virtualDestination.getVirtualDestination());
+        if (virtualDestination instanceof VirtualTopic) {
+            VirtualTopic vt = (VirtualTopic) virtualDestination;
+            String prefix = vt.getPrefix() != null ? vt.getPrefix() : "";
+            String postfix = vt.getPostfix() != null ? vt.getPostfix() : "";
+            if (prefix.endsWith(".")) {
+                prefix = prefix.substring(0, prefix.length() - 1);
+            }
+            if (postfix.startsWith(".")) {
+                postfix = postfix.substring(1, postfix.length());
+            }
+            ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null;
+            ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null;
+
+            String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {};
+            String[] activeMQDestPaths = activeMQDest.getDestinationPaths();
+            String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {};
+
+            //sanity check
+            if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) {
+                String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length,
+                        activeMQDestPaths.length - postfixPaths.length);
+
+                ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath));
+                info.setDestination(newTopic);
+            }
+        }
+    }
+
     @Override
     public void virtualDestinationRemoved(ConnectionContext context,
             VirtualDestination virtualDestination) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/56baba96/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index 782f53f..af5c316 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -118,31 +118,49 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
      * @throws Exception
      */
     @Test(timeout = 60 * 1000)
-    public void testVirtualTopic() throws Exception {
+    public void testVirtualTopics() throws Exception {
         Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
         doSetUp(true, null);
 
         MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
 
         MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar"));
+        MessageProducer includedProducer2 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar2"));
+        MessageProducer includedProducer3 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar3"));
         Thread.sleep(2000);
         Message test = localSession.createTextMessage("test");
 
         final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
+        final DestinationStatistics destinationStatistics2 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar2")).getDestinationStatistics();
+
+        //No queue destination on the remote side so should not forward
+        final DestinationStatistics destinationStatistics3 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar3")).getDestinationStatistics();
 
         //this will create the destination so messages accumulate
         final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics();
+        final DestinationStatistics remoteStats2 = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar2")).getDestinationStatistics();
+
         waitForConsumerCount(destinationStatistics, 1);
+        waitForConsumerCount(destinationStatistics2, 1);
 
         includedProducer.send(test);
+        includedProducer2.send(localSession.createTextMessage("test2"));
+        includedProducer3.send(localSession.createTextMessage("test3"));
 
         //assert statistics
         waitForDispatchFromLocalBroker(destinationStatistics, 1);
+        waitForDispatchFromLocalBroker(destinationStatistics2, 1);
         assertLocalBrokerStatistics(destinationStatistics, 1);
+        assertLocalBrokerStatistics(destinationStatistics2, 1);
         assertEquals("remote dest messages", 1, remoteStats.getMessages().getCount());
+        assertEquals("remote dest messages", 1, remoteStats2.getMessages().getCount());
 
-        assertRemoteAdvisoryCount(advisoryConsumer, 1);
-        assertAdvisoryBrokerCounts(1,1,1);
+        assertRemoteAdvisoryCount(advisoryConsumer, 2);
+        assertAdvisoryBrokerCounts(1,2,2);
+
+        //should not have forwarded for 3rd topic
+        Thread.sleep(1000);
+        assertEquals("local broker dest stat dispatched", 0, destinationStatistics3.getDispatched().getCount());
     }