You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/07/26 09:42:29 UTC
activemq git commit: AMQ-7021 - add unsynchronised accessors to
destination map for usage with rw lock from abstract region;
allow concurrent read of the destination map
Repository: activemq
Updated Branches:
refs/heads/master 28819aea4 -> 0b76d3a0e
AMQ-7021 - add unsynchronised accessors to destination map for usage with rw lock from abstract region; allow concurrent read of the destination map
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0b76d3a0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0b76d3a0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0b76d3a0
Branch: refs/heads/master
Commit: 0b76d3a0eac9802941b9dfc2a85589dac95ed40a
Parents: 28819ae
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 26 10:42:10 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Jul 26 10:42:10 2018 +0100
----------------------------------------------------------------------
.../activemq/broker/region/AbstractRegion.java | 26 ++--
.../region/virtual/MappedQueueFilter.java | 5 +-
.../apache/activemq/filter/DestinationMap.java | 28 +++-
.../VirtualTopicDestinationMapAccessTest.java | 136 +++++++++++++++++++
4 files changed, 172 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/0b76d3a0/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 50cd324..a18e793 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -162,7 +162,7 @@ public abstract class AbstractRegion implements Region {
addSubscriptionsForDestination(context, dest);
destinations.put(destination, dest);
updateRegionDestCounts(destination, 1);
- destinationMap.put(destination, dest);
+ destinationMap.unsynchronizedPut(destination, dest);
}
if (dest == null) {
throw new DestinationDoesNotExistException(destination.getQualifiedName());
@@ -217,7 +217,7 @@ public abstract class AbstractRegion implements Region {
// If a destination isn't specified, then just count up
// non-advisory destinations (ie count all destinations)
int destinationSize = (int) (entry.getDestination() != null ?
- destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
+ destinationMap.unsynchronizedGet(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
if (destinationSize >= entry.getMaxDestinations()) {
if (entry.getDestination() != null) {
throw new IllegalStateException(
@@ -296,7 +296,7 @@ public abstract class AbstractRegion implements Region {
dest.removeSubscription(context, sub, 0l);
}
}
- destinationMap.remove(destination, dest);
+ destinationMap.unsynchronizedRemove(destination, dest);
dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) {
@@ -321,7 +321,7 @@ public abstract class AbstractRegion implements Region {
public Set<Destination> getDestinations(ActiveMQDestination destination) {
destinationsLock.readLock().lock();
try{
- return destinationMap.get(destination);
+ return destinationMap.unsynchronizedGet(destination);
} finally {
destinationsLock.readLock().unlock();
}
@@ -387,7 +387,7 @@ public abstract class AbstractRegion implements Region {
List<Destination> addList = new ArrayList<Destination>();
destinationsLock.readLock().lock();
try {
- for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
+ for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
addList.add(dest);
}
// ensure sub visible to any new dest addSubscriptionsForDestination
@@ -467,7 +467,7 @@ public abstract class AbstractRegion implements Region {
List<Destination> removeList = new ArrayList<Destination>();
destinationsLock.readLock().lock();
try {
- for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
+ for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
removeList.add(dest);
}
} finally {
@@ -552,15 +552,7 @@ public abstract class AbstractRegion implements Region {
// Try to auto create the destination... re-invoke broker
// from the
// top so that the proper security checks are performed.
- context.getBroker().addDestination(context, destination, createTemporary);
- dest = addDestination(context, destination, false);
- // We should now have the dest created.
- destinationsLock.readLock().lock();
- try {
- dest = destinations.get(destination);
- } finally {
- destinationsLock.readLock().unlock();
- }
+ dest = context.getBroker().addDestination(context, destination, createTemporary);
}
if (dest == null) {
@@ -644,7 +636,7 @@ public abstract class AbstractRegion implements Region {
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationsLock.readLock().lock();
try {
- for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
+ for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
dest.addProducer(context, info);
}
} finally {
@@ -665,7 +657,7 @@ public abstract class AbstractRegion implements Region {
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationsLock.readLock().lock();
try {
- for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
+ for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
dest.removeProducer(context, info);
}
} finally {
http://git-wip-us.apache.org/repos/asf/activemq/blob/0b76d3a0/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
index 490bf7b..2baa33a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -59,7 +59,7 @@ public class MappedQueueFilter extends DestinationFilter {
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
final ActiveMQDestination newDestination = sub.getActiveMQDestination();
- final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
+ BaseDestination regionDest = null;
for (Destination virtualDest : virtualDests) {
if (virtualDest.getActiveMQDestination().isTopic() &&
@@ -75,6 +75,9 @@ public class MappedQueueFilter extends DestinationFilter {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
+ if (regionDest == null) {
+ regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
+ }
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/0b76d3a0/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
index 624b10f..e16d80e 100644
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.activemq.command.ActiveMQDestination;
@@ -60,13 +59,20 @@ public class DestinationMap {
* matching values.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
- public synchronized Set get(ActiveMQDestination key) {
+ public Set get(ActiveMQDestination key) {
+ synchronized (this) {
+ return unsynchronizedGet(key);
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public Set unsynchronizedGet(ActiveMQDestination key) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
Set answer = new HashSet(destinations.length);
for (int i = 0; i < destinations.length; i++) {
ActiveMQDestination childDestination = destinations[i];
- Object value = get(childDestination);
+ Object value = unsynchronizedGet(childDestination);
if (value instanceof Set) {
answer.addAll((Set) value);
} else if (value != null) {
@@ -78,7 +84,13 @@ public class DestinationMap {
return findWildcardMatches(key);
}
- public synchronized void put(ActiveMQDestination key, Object value) {
+ public void put(ActiveMQDestination key, Object value) {
+ synchronized (this) {
+ unsynchronizedPut(key, value);
+ }
+ }
+
+ public void unsynchronizedPut(ActiveMQDestination key, Object value) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {
@@ -95,7 +107,13 @@ public class DestinationMap {
/**
* Removes the value from the associated destination
*/
- public synchronized void remove(ActiveMQDestination key, Object value) {
+ public void remove(ActiveMQDestination key, Object value) {
+ synchronized (this) {
+ unsynchronizedRemove(key, value);
+ }
+ }
+
+ public void unsynchronizedRemove(ActiveMQDestination key, Object value) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/0b76d3a0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java
new file mode 100644
index 0000000..c2f2b0e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class VirtualTopicDestinationMapAccessTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDestinationMapAccessTest.class);
+
+ BrokerService brokerService;
+ ConnectionFactory connectionFactory;
+
+ @Before
+ public void createBroker() throws Exception {
+ createBroker(true);
+ }
+
+ public void createBroker(boolean delete) throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setDeleteAllMessagesOnStartup(delete);
+ brokerService.setAdvisorySupport(false);
+ brokerService.start();
+
+ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+ ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
+ zeroPrefetch.setAll(0);
+ activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
+ connectionFactory = activeMQConnectionFactory;
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ brokerService.stop();
+ }
+
+ @Test
+ @Ignore("perf test that needs manual comparator")
+ public void testX() throws Exception {
+
+ final int numConnections = 200;
+ final int numDestinations = 10000;
+ final AtomicInteger numConsumers = new AtomicInteger(numDestinations);
+ final AtomicInteger numProducers = new AtomicInteger(numDestinations);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(numConnections);
+
+ // precreate dests to accentuate read access
+ for (int i=0; i<numDestinations; i++ ) {
+ brokerService.getRegionBroker().addDestination(
+ brokerService.getAdminConnectionContext(),
+ new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i),
+ false);
+ brokerService.getRegionBroker().addDestination(
+ brokerService.getAdminConnectionContext(), new ActiveMQTopic("VirtualTopic.TEST-" + i), false);
+
+ }
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ int opsCount = 0;
+
+ Connection connection1 = connectionFactory.createConnection();
+ connection1.start();
+
+ Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ do {
+ boolean consumerOrProducer = opsCount++ % 2 == 0;
+ int i = consumerOrProducer ? numConsumers.decrementAndGet() : numProducers.decrementAndGet();
+ if (i > 0) {
+ if (consumerOrProducer) {
+ session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i));
+ } else {
+ producer.send(new ActiveMQTopic("VirtualTopic.TEST-" + i), new ActiveMQMessage());
+ }
+ }
+ } while (numConsumers.get() > 0 || numProducers.get() > 0);
+ connection1.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ for (int i = 0; i < numConnections; i++) {
+ executorService.execute(runnable);
+ }
+
+ long start = System.currentTimeMillis();
+ LOG.info("Starting timer: " + start);
+ executorService.shutdown();
+ executorService.awaitTermination(5, TimeUnit.MINUTES);
+ LOG.info("Done, duration: " + (System.currentTimeMillis() - start));
+
+ }
+}