You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/08/06 13:14:15 UTC
activemq git commit: AMQ-5920 - use implicit broker transaction for
virtual topic fanout and add concurrentSend=true option to use an executor to
fanout. Combination gives a 3x reduction in fanout roundtrip for small
persistent messages to 100 consumer q
Repository: activemq
Updated Branches:
refs/heads/master d8c0ff141 -> 340728f2d
AMQ-5920 - use implicit broker transaction for virtual topic fanout and add concurrentSend=true option to use an executor to fanout. Combination gives a 3x reduction in fanout roundtrip for small persistent messages to 100 consumer queues
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/340728f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/340728f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/340728f2
Branch: refs/heads/master
Commit: 340728f2d1eb4c1aa74f4002c5b16bf7a70c57b7
Parents: d8c0ff1
Author: gtully <ga...@gmail.com>
Authored: Thu Aug 6 11:55:45 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Aug 6 11:56:47 2015 +0100
----------------------------------------------------------------------
.../SelectorAwareVirtualTopicInterceptor.java | 17 +--
.../broker/region/virtual/VirtualTopic.java | 17 ++-
.../region/virtual/VirtualTopicInterceptor.java | 95 ++++++++++++++++-
.../activemq/transaction/Transaction.java | 4 +-
.../VirtualTopicDisconnectSelectorTest.java | 10 +-
.../virtual/VirtualTopicFanoutPerfTest.java | 103 +++++++++++++++++++
6 files changed, 224 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
index 0c19565..b528f40 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
@@ -40,8 +40,8 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
private SubQueueSelectorCacheBroker selectorCachePlugin;
- public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
- super(next, prefix, postfix, local);
+ public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
+ super(next, virtualTopic);
}
/**
@@ -49,18 +49,7 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
* the virtual queues, hence there is no build up of unmatched messages on these destinations
*/
@Override
- protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
- Broker broker = context.getConnectionContext().getBroker();
- Set<Destination> destinations = broker.getDestinations(destination);
-
- for (Destination dest : destinations) {
- if (matchesSomeConsumer(broker, message, dest)) {
- dest.send(context, message.copy());
- }
- }
- }
-
- private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException {
+ protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException {
boolean matches = false;
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(dest.getActiveMQDestination());
http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
index 769c784..95fa333 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
@@ -42,6 +42,7 @@ public class VirtualTopic implements VirtualDestination {
private String name = ">";
private boolean selectorAware = false;
private boolean local = false;
+ private boolean concurrentSend = false;
@Override
public ActiveMQDestination getVirtualDestination() {
@@ -50,8 +51,8 @@ public class VirtualTopic implements VirtualDestination {
@Override
public Destination intercept(Destination destination) {
- return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : new VirtualTopicInterceptor(
- destination, getPrefix(), getPostfix(), isLocal());
+ return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, this) :
+ new VirtualTopicInterceptor(destination, this);
}
@Override
@@ -168,4 +169,16 @@ public class VirtualTopic implements VirtualDestination {
append(postfix).append(',').append(selectorAware).
append(',').append(local).toString();
}
+
+ public boolean isConcurrentSend() {
+ return concurrentSend;
+ }
+
+ /**
+ * When true, dispatch to matching destinations in parallel (in multiple threads)
+ * @param concurrentSend
+ */
+ public void setConcurrentSend(boolean concurrentSend) {
+ this.concurrentSend = concurrentSend;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
index 21c1d23..7967562 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
@@ -16,12 +16,21 @@
*/
package org.apache.activemq.broker.region.virtual;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.LRUCache;
@@ -33,13 +42,15 @@ public class VirtualTopicInterceptor extends DestinationFilter {
private final String prefix;
private final String postfix;
private final boolean local;
+ private final boolean concurrentSend;
private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
- public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
+ public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
super(next);
- this.prefix = prefix;
- this.postfix = postfix;
- this.local = local;
+ this.prefix = virtualTopic.getPrefix();
+ this.postfix = virtualTopic.getPostfix();
+ this.local = virtualTopic.isLocal();
+ this.concurrentSend = virtualTopic.isConcurrentSend();
}
public Topic getTopic() {
@@ -55,6 +66,82 @@ public class VirtualTopicInterceptor extends DestinationFilter {
super.send(context, message);
}
+ @Override
+ protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception {
+ final Broker broker = context.getConnectionContext().getBroker();
+ final Set<Destination> destinations = broker.getDestinations(destination);
+ final int numDestinations = destinations.size();
+
+ final LocalTransactionId localBrokerTransactionToCoalesceJournalSync =
+ beginLocalTransaction(numDestinations, context.getConnectionContext(), message);
+ try {
+ if (concurrentSend && numDestinations > 1) {
+
+ final CountDownLatch concurrent = new CountDownLatch(destinations.size());
+ final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
+ final BrokerService brokerService = broker.getBrokerService();
+
+ for (final Destination dest : destinations) {
+ if (shouldDispatch(broker, message, dest)) {
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (exceptionAtomicReference.get() == null) {
+ dest.send(context, message.copy());
+ }
+ } catch (Exception e) {
+ exceptionAtomicReference.set(e);
+ } finally {
+ concurrent.countDown();
+ }
+ }
+ });
+ } else {
+ concurrent.countDown();
+ }
+ }
+ concurrent.await();
+ if (exceptionAtomicReference.get() != null) {
+ throw exceptionAtomicReference.get();
+ }
+
+ } else {
+ for (final Destination dest : destinations) {
+ if (shouldDispatch(broker, message, dest)) {
+ dest.send(context, message.copy());
+ }
+ }
+ }
+ } finally {
+ commit(localBrokerTransactionToCoalesceJournalSync, context.getConnectionContext(), message);
+ }
+ }
+
+ private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
+ LocalTransactionId result = null;
+ if (numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
+ result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
+ connectionContext.getBroker().beginTransaction(connectionContext, result);
+ connectionContext.setTransaction(connectionContext.getTransactions().get(result));
+ message.setTransactionId(result);
+ }
+ return result;
+ }
+
+ private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception {
+ if (tx != null) {
+ connectionContext.getBroker().commitTransaction(connectionContext, tx, true);
+ connectionContext.getTransactions().remove(tx);
+ connectionContext.setTransaction(null);
+ message.setTransactionId(null);
+ }
+ }
+
+ protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException {
+ return true;
+ }
+
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
ActiveMQQueue queue;
synchronized (cache) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
index 99758d1..6843871 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
@@ -74,7 +74,9 @@ public abstract class Transaction {
}
public void addSynchronization(Synchronization r) {
- synchronizations.add(r);
+ synchronized (synchronizations) {
+ synchronizations.add(r);
+ }
if (state == START_STATE) {
state = IN_USE_STATE;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
index 8b95345..cf2c67e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
import org.apache.activemq.spring.ConsumerBean;
import org.apache.activemq.xbean.XBeanBrokerFactory;
import org.slf4j.Logger;
@@ -107,7 +108,7 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
assertMessagesArrived(messageList, expected ,10000);
}
- protected Destination getConsumerDsetination() {
+ protected ActiveMQQueue getConsumerDsetination() {
return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
}
@@ -182,4 +183,11 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
return answer;
}
+
+ protected void startBroker() throws Exception {
+ super.startBroker();
+ // start with a clean slate
+ SubQueueSelectorCacheBroker selectorCacheBroker = (SubQueueSelectorCacheBroker) broker.getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
+ selectorCacheBroker.deleteAllSelectorsForDestination(getConsumerDsetination().getQualifiedName());
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
new file mode 100644
index 0000000..90cdeea
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualTopicFanoutPerfTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicFanoutPerfTest.class);
+
+ int numConsumers = 100;
+ int total = 500;
+ BrokerService brokerService;
+ ConnectionFactory connectionFactory;
+
+ @Before
+ public void createBroker() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setDeleteAllMessagesOnStartup(true);
+ brokerService.start();
+
+ for (DestinationInterceptor destinationInterceptor : brokerService.getDestinationInterceptors()) {
+ for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) {
+ if (virtualDestination instanceof VirtualTopic) {
+ ((VirtualTopic) virtualDestination).setConcurrentSend(true);
+ }
+ }
+ }
+ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+ ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
+ zeroPrefetch.setAll(0);
+ activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
+ connectionFactory = activeMQConnectionFactory;
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ brokerService.stop();
+ }
+
+ @Test
+ @Ignore("comparison test - concurrentSend=true virtual topic, use transaction")
+ public void testFanoutDuration() throws Exception {
+
+
+ Session session = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+ for (int i=0; i<numConsumers; i++) {
+ session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"));
+ }
+
+ // create topic producer
+ Session producerSession = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(new ActiveMQTopic("VirtualTopic.TEST"));
+
+ long start = System.currentTimeMillis();
+ LOG.info("Starting producer: " + start);
+ for (int i = 0; i < total; i++) {
+ producer.send(producerSession.createTextMessage("message: " + i));
+ }
+ LOG.info("Done producer, duration: " + (System.currentTimeMillis() - start) );
+
+
+ }
+
+ private Connection createStartAndTrackConnection() throws Exception {
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+ return connection;
+ }
+
+}