You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/08/06 13:14:15 UTC

activemq git commit: AMQ-5920 - use implicit broker transaction for virtual topic fanout and add concurrentSend=true option to use an executor to fanout. Combination gives a 3x reduction in fanout roundtrip for small persistent messages to 100 consumer q

Repository: activemq
Updated Branches:
  refs/heads/master d8c0ff141 -> 340728f2d


AMQ-5920 - use implicit broker transaction for virtual topic fanout and add concurrentSend=true option to use an executor to fanout. Combination gives a 3x reduction in fanout roundtrip for small persistent messages to 100 consumer queues


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/340728f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/340728f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/340728f2

Branch: refs/heads/master
Commit: 340728f2d1eb4c1aa74f4002c5b16bf7a70c57b7
Parents: d8c0ff1
Author: gtully <ga...@gmail.com>
Authored: Thu Aug 6 11:55:45 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Aug 6 11:56:47 2015 +0100

----------------------------------------------------------------------
 .../SelectorAwareVirtualTopicInterceptor.java   |  17 +--
 .../broker/region/virtual/VirtualTopic.java     |  17 ++-
 .../region/virtual/VirtualTopicInterceptor.java |  95 ++++++++++++++++-
 .../activemq/transaction/Transaction.java       |   4 +-
 .../VirtualTopicDisconnectSelectorTest.java     |  10 +-
 .../virtual/VirtualTopicFanoutPerfTest.java     | 103 +++++++++++++++++++
 6 files changed, 224 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
index 0c19565..b528f40 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
@@ -40,8 +40,8 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
     LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
     private SubQueueSelectorCacheBroker selectorCachePlugin;
 
-    public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
-        super(next, prefix, postfix, local);
+    public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
+        super(next, virtualTopic);
     }
 
     /**
@@ -49,18 +49,7 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
      * the virtual queues, hence there is no build up of unmatched messages on these destinations
      */
     @Override
-    protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
-        Broker broker = context.getConnectionContext().getBroker();
-        Set<Destination> destinations = broker.getDestinations(destination);
-
-        for (Destination dest : destinations) {
-            if (matchesSomeConsumer(broker, message, dest)) {
-                dest.send(context, message.copy());
-            }
-        }
-    }
-
-    private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException {
+    protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException {
         boolean matches = false;
         MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
         msgContext.setDestination(dest.getActiveMQDestination());

http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
index 769c784..95fa333 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
@@ -42,6 +42,7 @@ public class VirtualTopic implements VirtualDestination {
     private String name = ">";
     private boolean selectorAware = false;
     private boolean local = false;
+    private boolean concurrentSend = false;
 
     @Override
     public ActiveMQDestination getVirtualDestination() {
@@ -50,8 +51,8 @@ public class VirtualTopic implements VirtualDestination {
 
     @Override
     public Destination intercept(Destination destination) {
-        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : new VirtualTopicInterceptor(
-            destination, getPrefix(), getPostfix(), isLocal());
+        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, this) :
+                new VirtualTopicInterceptor(destination, this);
     }
 
     @Override
@@ -168,4 +169,16 @@ public class VirtualTopic implements VirtualDestination {
                                                   append(postfix).append(',').append(selectorAware).
                                                   append(',').append(local).toString();
     }
+
+    public boolean isConcurrentSend() {
+        return concurrentSend;
+    }
+
+    /**
+     * When true, dispatch to matching destinations in parallel (in multiple threads)
+     * @param concurrentSend
+     */
+    public void setConcurrentSend(boolean concurrentSend) {
+        this.concurrentSend = concurrentSend;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
index 21c1d23..7967562 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
@@ -16,12 +16,21 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.util.LRUCache;
 
@@ -33,13 +42,15 @@ public class VirtualTopicInterceptor extends DestinationFilter {
     private final String prefix;
     private final String postfix;
     private final boolean local;
+    private final boolean concurrentSend;
     private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
 
-    public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
+    public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
         super(next);
-        this.prefix = prefix;
-        this.postfix = postfix;
-        this.local = local;
+        this.prefix = virtualTopic.getPrefix();
+        this.postfix = virtualTopic.getPostfix();
+        this.local = virtualTopic.isLocal();
+        this.concurrentSend = virtualTopic.isConcurrentSend();
     }
 
     public Topic getTopic() {
@@ -55,6 +66,82 @@ public class VirtualTopicInterceptor extends DestinationFilter {
         super.send(context, message);
     }
 
+    @Override
+    protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception {
+        final Broker broker = context.getConnectionContext().getBroker();
+        final Set<Destination> destinations = broker.getDestinations(destination);
+        final int numDestinations = destinations.size();
+
+        final LocalTransactionId localBrokerTransactionToCoalesceJournalSync =
+                beginLocalTransaction(numDestinations, context.getConnectionContext(), message);
+        try {
+            if (concurrentSend && numDestinations > 1) {
+
+                final CountDownLatch concurrent = new CountDownLatch(destinations.size());
+                final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
+                final BrokerService brokerService = broker.getBrokerService();
+
+                for (final Destination dest : destinations) {
+                    if (shouldDispatch(broker, message, dest)) {
+                        brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                try {
+                                    if (exceptionAtomicReference.get() == null) {
+                                        dest.send(context, message.copy());
+                                    }
+                                } catch (Exception e) {
+                                    exceptionAtomicReference.set(e);
+                                } finally {
+                                    concurrent.countDown();
+                                }
+                            }
+                        });
+                    } else {
+                        concurrent.countDown();
+                    }
+                }
+                concurrent.await();
+                if (exceptionAtomicReference.get() != null) {
+                    throw exceptionAtomicReference.get();
+                }
+
+            } else {
+                for (final Destination dest : destinations) {
+                    if (shouldDispatch(broker, message, dest)) {
+                        dest.send(context, message.copy());
+                    }
+                }
+            }
+        } finally {
+            commit(localBrokerTransactionToCoalesceJournalSync, context.getConnectionContext(), message);
+        }
+    }
+
+    private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
+        LocalTransactionId result = null;
+        if (numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
+            result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
+            connectionContext.getBroker().beginTransaction(connectionContext, result);
+            connectionContext.setTransaction(connectionContext.getTransactions().get(result));
+            message.setTransactionId(result);
+        }
+        return result;
+    }
+
+    private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception {
+        if (tx != null) {
+            connectionContext.getBroker().commitTransaction(connectionContext, tx, true);
+            connectionContext.getTransactions().remove(tx);
+            connectionContext.setTransaction(null);
+            message.setTransactionId(null);
+        }
+    }
+
+    protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException {
+        return true;
+    }
+
     protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
         ActiveMQQueue queue;
         synchronized (cache) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
index 99758d1..6843871 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
@@ -74,7 +74,9 @@ public abstract class Transaction {
     }
 
     public void addSynchronization(Synchronization r) {
-        synchronizations.add(r);
+        synchronized (synchronizations) {
+            synchronizations.add(r);
+        }
         if (state == START_STATE) {
             state = IN_USE_STATE;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
index 8b95345..cf2c67e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
 import org.apache.activemq.spring.ConsumerBean;
 import org.apache.activemq.xbean.XBeanBrokerFactory;
 import org.slf4j.Logger;
@@ -107,7 +108,7 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
         assertMessagesArrived(messageList, expected ,10000);
     }
             
-    protected Destination getConsumerDsetination() {
+    protected ActiveMQQueue getConsumerDsetination() {
         return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
     }
 
@@ -182,4 +183,11 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
         return answer;
     }
 
+
+    protected void startBroker() throws Exception {
+        super.startBroker();
+        // start with a clean slate
+        SubQueueSelectorCacheBroker selectorCacheBroker  = (SubQueueSelectorCacheBroker) broker.getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
+        selectorCacheBroker.deleteAllSelectorsForDestination(getConsumerDsetination().getQualifiedName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
new file mode 100644
index 0000000..90cdeea
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualTopicFanoutPerfTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicFanoutPerfTest.class);
+
+    int numConsumers = 100;
+    int total = 500;
+    BrokerService brokerService;
+    ConnectionFactory connectionFactory;
+
+    @Before
+    public void createBroker() throws Exception  {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.start();
+
+        for (DestinationInterceptor destinationInterceptor  : brokerService.getDestinationInterceptors()) {
+                for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) {
+                    if (virtualDestination instanceof VirtualTopic) {
+                        ((VirtualTopic) virtualDestination).setConcurrentSend(true);
+                }
+            }
+        }
+        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
+        zeroPrefetch.setAll(0);
+        activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
+        connectionFactory = activeMQConnectionFactory;
+    }
+
+    @After
+    public void stopBroker() throws Exception  {
+        brokerService.stop();
+    }
+
+    @Test
+    @Ignore("comparison test - concurrentSend=true virtual topic, use transaction")
+	public void testFanoutDuration() throws Exception {
+
+
+        Session session = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+        for (int i=0; i<numConsumers; i++) {
+            session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"));
+        }
+
+        // create topic producer
+        Session producerSession = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(new ActiveMQTopic("VirtualTopic.TEST"));
+
+        long start = System.currentTimeMillis();
+        LOG.info("Starting producer: " + start);
+        for (int i = 0; i < total; i++) {
+            producer.send(producerSession.createTextMessage("message: " + i));
+        }
+        LOG.info("Done producer, duration: " + (System.currentTimeMillis() - start) );
+
+
+    }
+
+    private Connection createStartAndTrackConnection() throws Exception {
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        return connection;
+    }
+
+}