You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/04/23 17:12:47 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5077 - provide concurrentSend option to composite destinations such that fanout can occur in parallel, resulting in write batching. little perf test that shows a large improvement in send rate w/o con

Repository: activemq
Updated Branches:
  refs/heads/trunk 44bb9fbea -> 08bb172f3


https://issues.apache.org/jira/browse/AMQ-5077 - provide concurrentSend option to composite destinations such that fanout can occur in parallel, resulting in write batching. little perf test that shows a large improvement in send rate w/o concurrentStoreAndDispatch


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/08bb172f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/08bb172f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/08bb172f

Branch: refs/heads/trunk
Commit: 08bb172f3c6fc39e5164b5ee8f875a809faa126b
Parents: 44bb9fb
Author: gtully <ga...@gmail.com>
Authored: Wed Apr 23 14:59:03 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Apr 23 15:02:24 2014 +0100

----------------------------------------------------------------------
 .../region/virtual/CompositeDestination.java    |  14 +-
 .../virtual/CompositeDestinationFilter.java     |  65 +++++++--
 .../broker/virtual/VirtualDestPerfTest.java     | 141 +++++++++++++++++++
 3 files changed, 205 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/08bb172f/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index e3416f3..72c35b6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -33,9 +33,10 @@ public abstract class CompositeDestination implements VirtualDestination {
     private Collection forwardTo;
     private boolean forwardOnly = true;
     private boolean copyMessage = true;
+    private boolean concurrentSend = false;
 
     public Destination intercept(Destination destination) {
-        return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage());
+        return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage(), isConcurrentSend());
     }
     
     public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {
@@ -92,4 +93,15 @@ public abstract class CompositeDestination implements VirtualDestination {
         this.copyMessage = copyMessage;
     }
 
+    /**
+     * when true, sends are done in parallel with the broker executor
+     */
+    public void setConcurrentSend(boolean concurrentSend) {
+        this.concurrentSend = concurrentSend;
+    }
+
+    public boolean isConcurrentSend() {
+        return this.concurrentSend;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/08bb172f/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
index 7ca0049..14e52e7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
@@ -18,8 +18,12 @@ package org.apache.activemq.broker.region.virtual;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
@@ -39,17 +43,20 @@ public class CompositeDestinationFilter extends DestinationFilter {
     private Collection forwardDestinations;
     private boolean forwardOnly;
     private boolean copyMessage;
+    private boolean concurrentSend = false;
 
-    public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage) {
+    public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage, boolean concurrentSend) {
         super(next);
         this.forwardDestinations = forwardDestinations;
         this.forwardOnly = forwardOnly;
         this.copyMessage = copyMessage;
+        this.concurrentSend = concurrentSend;
     }
 
-    public void send(ProducerBrokerExchange context, Message message) throws Exception {
+    public void send(final ProducerBrokerExchange context, final Message message) throws Exception {
         MessageEvaluationContext messageContext = null;
 
+        Collection<ActiveMQDestination> matchingDestinations = new LinkedList<ActiveMQDestination>();
         for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
             ActiveMQDestination destination = null;
             Object value = iter.next();
@@ -70,23 +77,53 @@ public class CompositeDestinationFilter extends DestinationFilter {
             if (destination == null) {
                 continue;
             }
+            matchingDestinations.add(destination);
+        }
 
-            Message forwarded_message;
-            if (copyMessage) {
-                forwarded_message = message.copy();
-                forwarded_message.setDestination(destination);
-            }
-            else {
-                forwarded_message = message;
+        final CountDownLatch concurrent = new CountDownLatch(concurrentSend ? matchingDestinations.size() : 0);
+        final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
+        final BrokerService brokerService = context.getConnectionContext().getBroker().getBrokerService();
+        for (final ActiveMQDestination destination : matchingDestinations) {
+            if (concurrent.getCount() > 0) {
+                brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            if (exceptionAtomicReference.get() == null) {
+                                doForward(context.copy(), message, brokerService.getRegionBroker(), destination);
+                            }
+                        } catch (Exception e) {
+                            exceptionAtomicReference.set(e);
+                        } finally {
+                            concurrent.countDown();
+                        }
+                    }
+                });
+            } else {
+                doForward(context, message, brokerService.getRegionBroker(), destination);
             }
-
-            // Send it back through the region broker for routing.
-            context.setMutable(true);
-            Broker regionBroker = context.getConnectionContext().getBroker().getBrokerService().getRegionBroker();
-            regionBroker.send(context, forwarded_message);
         }
         if (!forwardOnly) {
             super.send(context, message);
         }
+        concurrent.await();
+        if (exceptionAtomicReference.get() != null) {
+            throw exceptionAtomicReference.get();
+        }
+    }
+
+    private void doForward(ProducerBrokerExchange context, Message message, Broker regionBroker, ActiveMQDestination destination) throws Exception {
+        Message forwarded_message;
+        if (copyMessage) {
+            forwarded_message = message.copy();
+            forwarded_message.setDestination(destination);
+        }
+        else {
+            forwarded_message = message;
+        }
+
+        // Send it back through the region broker for routing.
+        context.setMutable(true);
+        regionBroker.send(context, forwarded_message);
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/08bb172f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
new file mode 100644
index 0000000..1f80473
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.virtual.CompositeTopic;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class VirtualDestPerfTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class);
+    ActiveMQTopic target = new ActiveMQTopic("target");
+    BrokerService brokerService;
+    ActiveMQConnectionFactory connectionFactory;
+
+    @Test
+    @Ignore("comparison test - takes too long and really needs a peek at the graph")
+    public void testPerf() throws Exception {
+        LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<Integer, Long>();
+        LinkedHashMap<Integer, Long> resultsF = new LinkedHashMap<Integer, Long>();
+
+        for (int i=2;i<11;i++) {
+            for (Boolean concurrent : new Boolean[]{true, false}) {
+                startBroker(i, concurrent);
+
+                long startTime = System.currentTimeMillis();
+                produceMessages();
+                long endTime = System.currentTimeMillis();
+                long seconds = (endTime - startTime) / 1000;
+                LOG.info("For routes {} duration {}", i, seconds);
+                if (concurrent) {
+                    resultsT.put(i, seconds);
+                } else {
+                    resultsF.put(i, seconds);
+                }
+                brokerService.stop();
+                brokerService.waitUntilStopped();
+            }
+        }
+        LOG.info("results T{} F{}", resultsT, resultsF);
+        LOG.info("http://www.chartgo.com/samples.do?chart=line&border=1&show3d=0&width=600&height=500&roundedge=1&transparency=1&legend=1&title=Send:10k::Concurrent-v-Serial&xtitle=routes&ytitle=Duration(seconds)&chrtbkgndcolor=white&threshold=0.0&lang=en"
+                + "&xaxis1=" + toStr(resultsT.keySet())
+                + "&yaxis1=" + toStr(resultsT.values())
+                + "&group1=concurrent"
+                + "&xaxis2=" + toStr(resultsF.keySet())
+                + "&yaxis2=" + toStr(resultsF.values())
+                + "&group2=serial"
+                + "&from=linejsp");
+    }
+
+    private String toStr(Collection set) {
+        return set.toString().replace(",","%0D%0A").replace("[","").replace("]","").replace(" ", "");
+    }
+
+
+    protected void produceMessages() throws Exception {
+        Connection connection = connectionFactory.createConnection();
+        MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(target);
+        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+        message.setContent(new ByteSequence(new byte[5*1024]));
+        for (int i=0; i<10000; i++) {
+            messageProducer.send(message);
+        }
+        connection.close();
+    }
+
+    private void startBroker(int fanoutCount, boolean concurrentSend) throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseVirtualTopics(true);
+        brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.setAdvisorySupport(false);
+        PolicyMap destPolicyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(0);
+        destPolicyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(destPolicyMap);
+
+        CompositeTopic route = new CompositeTopic();
+        route.setName("target");
+        route.setForwardOnly(true);
+        route.setConcurrentSend(concurrentSend);
+        Collection<ActiveMQQueue> routes = new ArrayList<ActiveMQQueue>();
+        for (int i=0; i<fanoutCount; i++) {
+            routes.add(new ActiveMQQueue("route." + i));
+        }
+        route.setForwardTo(routes);
+        VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+        interceptor.setVirtualDestinations(new VirtualDestination[]{route});
+        brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
+        brokerService.start();
+
+        connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
+        connectionFactory.setUseAsyncSend(false);
+        if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+
+            //with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default
+            // will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce
+            // order issues
+            ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
+        }
+    }
+}