You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/07/26 09:42:29 UTC

activemq git commit: AMQ-7021 - add unsynchronised accessors to destination map for usage with rw lock from abstract region; allow concurrent read of the destination map

Repository: activemq
Updated Branches:
  refs/heads/master 28819aea4 -> 0b76d3a0e


AMQ-7021 - add unsynchronised accessors to destination map for usage with rw lock from abstract region; allow concurrent read of the destination map


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0b76d3a0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0b76d3a0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0b76d3a0

Branch: refs/heads/master
Commit: 0b76d3a0eac9802941b9dfc2a85589dac95ed40a
Parents: 28819ae
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 26 10:42:10 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Jul 26 10:42:10 2018 +0100

----------------------------------------------------------------------
 .../activemq/broker/region/AbstractRegion.java  |  26 ++--
 .../region/virtual/MappedQueueFilter.java       |   5 +-
 .../apache/activemq/filter/DestinationMap.java  |  28 +++-
 .../VirtualTopicDestinationMapAccessTest.java   | 136 +++++++++++++++++++
 4 files changed, 172 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0b76d3a0/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 50cd324..a18e793 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -162,7 +162,7 @@ public abstract class AbstractRegion implements Region {
                     addSubscriptionsForDestination(context, dest);
                     destinations.put(destination, dest);
                     updateRegionDestCounts(destination, 1);
-                    destinationMap.put(destination, dest);
+                    destinationMap.unsynchronizedPut(destination, dest);
                 }
                 if (dest == null) {
                     throw new DestinationDoesNotExistException(destination.getQualifiedName());
@@ -217,7 +217,7 @@ public abstract class AbstractRegion implements Region {
                 // If a destination isn't specified, then just count up
                 // non-advisory destinations (ie count all destinations)
                 int destinationSize = (int) (entry.getDestination() != null ?
-                        destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
+                        destinationMap.unsynchronizedGet(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
                 if (destinationSize >= entry.getMaxDestinations()) {
                     if (entry.getDestination() != null) {
                         throw new IllegalStateException(
@@ -296,7 +296,7 @@ public abstract class AbstractRegion implements Region {
                         dest.removeSubscription(context, sub, 0l);
                     }
                 }
-                destinationMap.remove(destination, dest);
+                destinationMap.unsynchronizedRemove(destination, dest);
                 dispose(context, dest);
                 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
                 if (destinationInterceptor != null) {
@@ -321,7 +321,7 @@ public abstract class AbstractRegion implements Region {
     public Set<Destination> getDestinations(ActiveMQDestination destination) {
         destinationsLock.readLock().lock();
         try{
-            return destinationMap.get(destination);
+            return destinationMap.unsynchronizedGet(destination);
         } finally {
             destinationsLock.readLock().unlock();
         }
@@ -387,7 +387,7 @@ public abstract class AbstractRegion implements Region {
             List<Destination> addList = new ArrayList<Destination>();
             destinationsLock.readLock().lock();
             try {
-                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
+                for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
                     addList.add(dest);
                 }
                 // ensure sub visible to any new dest addSubscriptionsForDestination
@@ -467,7 +467,7 @@ public abstract class AbstractRegion implements Region {
             List<Destination> removeList = new ArrayList<Destination>();
             destinationsLock.readLock().lock();
             try {
-                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
+                for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
                     removeList.add(dest);
                 }
             } finally {
@@ -552,15 +552,7 @@ public abstract class AbstractRegion implements Region {
                 // Try to auto create the destination... re-invoke broker
                 // from the
                 // top so that the proper security checks are performed.
-                context.getBroker().addDestination(context, destination, createTemporary);
-                dest = addDestination(context, destination, false);
-                // We should now have the dest created.
-                destinationsLock.readLock().lock();
-                try {
-                    dest = destinations.get(destination);
-                } finally {
-                    destinationsLock.readLock().unlock();
-                }
+                dest = context.getBroker().addDestination(context, destination, createTemporary);
             }
 
             if (dest == null) {
@@ -644,7 +636,7 @@ public abstract class AbstractRegion implements Region {
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         destinationsLock.readLock().lock();
         try {
-            for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
+            for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
                 dest.addProducer(context, info);
             }
         } finally {
@@ -665,7 +657,7 @@ public abstract class AbstractRegion implements Region {
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         destinationsLock.readLock().lock();
         try {
-            for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
+            for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
                 dest.removeProducer(context, info);
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/activemq/blob/0b76d3a0/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
index 490bf7b..2baa33a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -59,7 +59,7 @@ public class MappedQueueFilter extends DestinationFilter {
             final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
 
             final ActiveMQDestination newDestination = sub.getActiveMQDestination();
-            final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
+            BaseDestination regionDest = null;
 
             for (Destination virtualDest : virtualDests) {
                 if (virtualDest.getActiveMQDestination().isTopic() &&
@@ -75,6 +75,9 @@ public class MappedQueueFilter extends DestinationFilter {
                             final Message copy = message.copy();
                             copy.setOriginalDestination(message.getDestination());
                             copy.setDestination(newDestination);
+                            if (regionDest == null) {
+                                regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
+                            }
                             copy.setRegionDestination(regionDest);
                             sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0b76d3a0/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
index 624b10f..e16d80e 100644
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.activemq.command.ActiveMQDestination;
@@ -60,13 +59,20 @@ public class DestinationMap {
      *         matching values.
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public synchronized Set get(ActiveMQDestination key) {
+    public Set get(ActiveMQDestination key) {
+        synchronized (this) {
+            return unsynchronizedGet(key);
+        }
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public Set unsynchronizedGet(ActiveMQDestination key) {
         if (key.isComposite()) {
             ActiveMQDestination[] destinations = key.getCompositeDestinations();
             Set answer = new HashSet(destinations.length);
             for (int i = 0; i < destinations.length; i++) {
                 ActiveMQDestination childDestination = destinations[i];
-                Object value = get(childDestination);
+                Object value = unsynchronizedGet(childDestination);
                 if (value instanceof Set) {
                     answer.addAll((Set) value);
                 } else if (value != null) {
@@ -78,7 +84,13 @@ public class DestinationMap {
         return findWildcardMatches(key);
     }
 
-    public synchronized void put(ActiveMQDestination key, Object value) {
+    public void put(ActiveMQDestination key, Object value) {
+        synchronized (this) {
+            unsynchronizedPut(key, value);
+        }
+    }
+
+    public void unsynchronizedPut(ActiveMQDestination key, Object value) {
         if (key.isComposite()) {
             ActiveMQDestination[] destinations = key.getCompositeDestinations();
             for (int i = 0; i < destinations.length; i++) {
@@ -95,7 +107,13 @@ public class DestinationMap {
     /**
      * Removes the value from the associated destination
      */
-    public synchronized void remove(ActiveMQDestination key, Object value) {
+    public void remove(ActiveMQDestination key, Object value) {
+        synchronized (this) {
+            unsynchronizedRemove(key, value);
+        }
+    }
+
+    public void unsynchronizedRemove(ActiveMQDestination key, Object value) {
         if (key.isComposite()) {
             ActiveMQDestination[] destinations = key.getCompositeDestinations();
             for (int i = 0; i < destinations.length; i++) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/0b76d3a0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java
new file mode 100644
index 0000000..c2f2b0e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class VirtualTopicDestinationMapAccessTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDestinationMapAccessTest.class);
+
+    BrokerService brokerService;
+    ConnectionFactory connectionFactory;
+
+    @Before
+    public void createBroker() throws Exception {
+        createBroker(true);
+    }
+
+    public void createBroker(boolean delete) throws Exception  {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(delete);
+        brokerService.setAdvisorySupport(false);
+        brokerService.start();
+
+        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
+        zeroPrefetch.setAll(0);
+        activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
+        connectionFactory = activeMQConnectionFactory;
+    }
+
+    @After
+    public void stopBroker() throws Exception  {
+        brokerService.stop();
+    }
+
+    @Test
+    @Ignore("perf test that needs manual comparator")
+    public void testX() throws Exception {
+
+        final int numConnections = 200;
+        final int numDestinations = 10000;
+        final AtomicInteger numConsumers = new AtomicInteger(numDestinations);
+        final AtomicInteger numProducers = new AtomicInteger(numDestinations);
+
+        ExecutorService executorService = Executors.newFixedThreadPool(numConnections);
+
+        // precreate dests to accentuate read access
+        for (int i=0; i<numDestinations; i++ ) {
+            brokerService.getRegionBroker().addDestination(
+                    brokerService.getAdminConnectionContext(),
+                    new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i),
+                    false);
+            brokerService.getRegionBroker().addDestination(
+                    brokerService.getAdminConnectionContext(), new ActiveMQTopic("VirtualTopic.TEST-" + i), false);
+
+        }
+
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+
+                try {
+                    int opsCount = 0;
+
+                    Connection connection1 = connectionFactory.createConnection();
+                    connection1.start();
+
+                    Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(null);
+
+                    do {
+                        boolean consumerOrProducer = opsCount++ % 2 == 0;
+                        int i = consumerOrProducer ? numConsumers.decrementAndGet() : numProducers.decrementAndGet();
+                        if (i > 0) {
+                            if (consumerOrProducer) {
+                                session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i));
+                            } else {
+                                producer.send(new ActiveMQTopic("VirtualTopic.TEST-" + i), new ActiveMQMessage());
+                            }
+                        }
+                    } while (numConsumers.get() > 0 || numProducers.get() > 0);
+                    connection1.close();
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        for (int i = 0; i < numConnections; i++) {
+            executorService.execute(runnable);
+        }
+
+        long start = System.currentTimeMillis();
+        LOG.info("Starting timer: " + start);
+        executorService.shutdown();
+        executorService.awaitTermination(5, TimeUnit.MINUTES);
+        LOG.info("Done, duration: " + (System.currentTimeMillis() - start));
+
+    }
+}