You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/12/11 15:50:05 UTC
[1/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5483 - fix and test - assigned
group counts are updated when lru map evicts a group assignment
Repository: activemq
Updated Branches:
refs/heads/trunk 1409acb36 -> d25c52ccb
https://issues.apache.org/jira/browse/AMQ-5483 - fix and test - assigned group counts are updated when lru map evicts a group assignment
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d25c52cc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d25c52cc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d25c52cc
Branch: refs/heads/trunk
Commit: d25c52ccb2c9c23535f9d4488fe8be8600148852
Parents: 15eba48
Author: gtully <ga...@gmail.com>
Authored: Thu Dec 11 14:40:56 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Dec 11 14:42:00 2014 +0000
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 1 +
.../region/group/CachedMessageGroupMap.java | 30 +++++++-
.../region/group/MessageGroupHashBucket.java | 3 +
.../broker/region/group/MessageGroupMap.java | 3 +
.../region/group/SimpleMessageGroupMap.java | 3 +
.../apache/activemq/command/ConsumerInfo.java | 4 +
.../MessageGroupReconnectDistributionTest.java | 79 +++++++++++++-------
7 files changed, 96 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index f8373ac..a6515c4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1013,6 +1013,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
public MessageGroupMap getMessageGroupOwners() {
if (messageGroupOwners == null) {
messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
+ messageGroupOwners.setDestination(this);
}
return messageGroupOwners;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
index 084e8d0..b17f8ce 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
@@ -20,6 +20,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.memory.LRUMap;
@@ -31,8 +33,26 @@ import org.apache.activemq.memory.LRUMap;
public class CachedMessageGroupMap implements MessageGroupMap {
private final LRUMap<String, ConsumerId> cache;
private final int maximumCacheSize;
+ Destination destination;
+
CachedMessageGroupMap(int size){
- cache = new LRUMap<String, ConsumerId>(size);
+ cache = new LRUMap<String, ConsumerId>(size) {
+ @Override
+ public boolean removeEldestEntry(final Map.Entry eldest) {
+ boolean remove = super.removeEldestEntry(eldest);
+ if (remove) {
+ if (destination != null) {
+ for (Subscription s : destination.getConsumers()) {
+ if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) {
+ s.getConsumerInfo().decrementAssignedGroupCount();
+ break;
+ }
+ }
+ }
+ }
+ return remove;
+ }
+ };
maximumCacheSize = size;
}
public synchronized void put(String groupId, ConsumerId consumerId) {
@@ -68,6 +88,11 @@ public class CachedMessageGroupMap implements MessageGroupMap {
@Override
public synchronized void removeAll(){
cache.clear();
+ if (destination != null) {
+ for (Subscription s : destination.getConsumers()) {
+ s.getConsumerInfo().clearAssignedGroupCount();
+ }
+ }
}
@Override
@@ -92,4 +117,7 @@ public class CachedMessageGroupMap implements MessageGroupMap {
return "message groups: " + cache.size();
}
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
index 652b985..5145b16 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.group;
import java.util.Map;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.memory.LRUMap;
@@ -93,6 +94,8 @@ public class MessageGroupHashBucket implements MessageGroupMap {
return "bucket";
}
+ public void setDestination(Destination destination) {}
+
public int getBucketCount(){
return bucketCount;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
index c952c94..8998c23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.group;
import java.util.Map;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ConsumerId;
/**
@@ -44,4 +45,6 @@ public interface MessageGroupMap {
String getType();
+ void setDestination(Destination destination);
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
index e3fd4ed..941477a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ConsumerId;
/**
@@ -78,6 +79,8 @@ public class SimpleMessageGroupMap implements MessageGroupMap {
return "simple";
}
+ public void setDestination(Destination destination) {}
+
public String toString() {
return "message groups: " + map.size();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
index c9fc3e6..09b6be5 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
@@ -498,6 +498,10 @@ public class ConsumerInfo extends BaseCommand {
this.assignedGroupCount++;
}
+ public void clearAssignedGroupCount() {
+ this.assignedGroupCount=0;
+ }
+
public void decrementAssignedGroupCount() {
this.assignedGroupCount--;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
index 1fd7e07..da2f367 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.usecases;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -40,7 +41,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,38 +49,54 @@ import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-@RunWith(BlockJUnit4ClassRunner.class)
+@RunWith(Parameterized.class)
public class MessageGroupReconnectDistributionTest {
public static final Logger LOG = LoggerFactory.getLogger(MessageGroupReconnectDistributionTest.class);
final Random random = new Random();
protected Connection connection;
protected Session session;
protected MessageProducer producer;
- protected Destination destination;
+ protected ActiveMQQueue destination = new ActiveMQQueue("GroupQ");
protected TransportConnector connector;
+ ActiveMQConnectionFactory connFactory;
BrokerService broker;
+ int numMessages = 10000;
+ int groupSize = 10;
+ int batchSize = 20;
+
+ @Parameterized.Parameter(0)
+ public int numConsumers = 4;
+
+ @Parameterized.Parameter(1)
+ public boolean consumerPriority = true;
+
+ @Parameterized.Parameters(name="numConsumers={0},consumerPriority={1}")
+ public static Iterable<Object[]> combinations() {
+ return Arrays.asList(new Object[][]{{4, true}, {10, true}});
+ }
@Before
public void setUp() throws Exception {
broker = createBroker();
broker.start();
- ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=30");
+ connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=200");
+ connFactory.setWatchTopicAdvisories(false);
connection = connFactory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- destination = new ActiveMQQueue("GroupQ");
producer = session.createProducer(destination);
connection.start();
}
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
+ service.setAdvisorySupport(false);
service.setPersistent(false);
service.setUseJmx(true);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
- policy.setUseConsumerPriority(true);
- policy.setMessageGroupMapFactoryType("cached");
+ policy.setUseConsumerPriority(consumerPriority);
+ policy.setMessageGroupMapFactoryType("cached?cacheSize=" + (numConsumers - 1));
policyMap.setDefaultEntry(policy);
service.setDestinationPolicy(policyMap);
@@ -95,35 +112,35 @@ public class MessageGroupReconnectDistributionTest {
broker.stop();
}
- public int getBatchSize(int bound) throws Exception {
- return bound + random.nextInt(bound);
- }
-
@Test(timeout = 5 * 60 * 1000)
public void testReconnect() throws Exception {
- final int numMessages = 50000;
- final int numConsumers = 10;
final AtomicLong totalConsumed = new AtomicLong(0);
- produceMessages(numMessages);
-
- ExecutorService executorService = Executors.newCachedThreadPool();
+ ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);
final ArrayList<AtomicLong> consumedCounters = new ArrayList<AtomicLong>(numConsumers);
+ final ArrayList<AtomicLong> batchCounters = new ArrayList<AtomicLong>(numConsumers);
+
for (int i = 0; i < numConsumers; i++) {
consumedCounters.add(new AtomicLong(0l));
+ batchCounters.add(new AtomicLong(0l));
+
final int id = i;
executorService.submit(new Runnable() {
- Session connectionSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ int getBatchSize() {
+ return (id + 1) * batchSize;
+ }
@Override
public void run() {
try {
- MessageConsumer messageConsumer = connectionSession.createConsumer(destination);
+ Session connectionSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ int batchSize = getBatchSize();
+ MessageConsumer messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
- long batchSize = getBatchSize(numConsumers);
Message message;
AtomicLong consumed = consumedCounters.get(id);
+ AtomicLong batches = batchCounters.get(id);
LOG.info("Consumer: " + id + ", batchSize:" + batchSize + ", totalConsumed:" + totalConsumed.get() + ", consumed:" + consumed.get());
@@ -138,19 +155,21 @@ public class MessageGroupReconnectDistributionTest {
if (totalConsumed.get() == numMessages) {
break;
} else {
- messageConsumer = connectionSession.createConsumer(destination);
+ batchSize = getBatchSize();
+ messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
+ batches.incrementAndGet();
continue;
}
}
- message.acknowledge();
consumed.incrementAndGet();
totalConsumed.incrementAndGet();
- if (consumed.get() > 0 && consumed.longValue() % batchSize == 0) {
+ if (consumed.get() > 0 && consumed.intValue() % batchSize == 0) {
messageConsumer.close();
- messageConsumer = connectionSession.createConsumer(destination);
- batchSize = getBatchSize(numConsumers);
+ batchSize = getBatchSize();
+ messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
+ batches.incrementAndGet();
}
}
} catch (Exception e) {
@@ -158,14 +177,19 @@ public class MessageGroupReconnectDistributionTest {
}
}
});
+ TimeUnit.MILLISECONDS.sleep(200);
}
+ TimeUnit.SECONDS.sleep(1);
+ produceMessages(numMessages);
+
executorService.shutdown();
assertTrue("threads done on time", executorService.awaitTermination(10, TimeUnit.MINUTES));
assertEquals("All consumed", numMessages, totalConsumed.intValue());
LOG.info("Distribution: " + consumedCounters);
+ LOG.info("Batches: " + batchCounters);
double max = consumedCounters.get(0).longValue() * 1.5;
double min = consumedCounters.get(0).longValue() * 0.5;
@@ -176,11 +200,14 @@ public class MessageGroupReconnectDistributionTest {
}
}
+ private Destination destWithPrefetch(ActiveMQQueue destination) throws Exception {
+ return destination;
+ }
+
private void produceMessages(int numMessages) throws JMSException {
int groupID=0;
for (int i = 0; i < numMessages; i++) {
- // groups of 10
- if (i>0 && i%10==0) {
+ if (i>0 && i%groupSize==0) {
groupID++;
}
TextMessage msga = session.createTextMessage("hello " + i);
[2/2] activemq git commit: fix mvn warn on deploy profile
Posted by gt...@apache.org.
fix mvn warn on deploy profile
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/15eba485
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/15eba485
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/15eba485
Branch: refs/heads/trunk
Commit: 15eba485643d710a55e82c9ed10516374b1ae857
Parents: 1409acb
Author: gtully <ga...@gmail.com>
Authored: Wed Dec 10 12:27:09 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Dec 11 14:42:00 2014 +0000
----------------------------------------------------------------------
pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/15eba485/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0711032..c27a2bd 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1563,6 +1563,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
+ <version>${maven-javadoc-plugin-version}</version>
<executions>
<execution>
<id>attach-javadocs</id>