You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/04/13 11:17:00 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5718 -
don't add messages to subscriber while it's discarding
Repository: activemq
Updated Branches:
refs/heads/master b29eb384b -> 2562cf21a
https://issues.apache.org/jira/browse/AMQ-5718 - don't add messages to subscriber while it's discarding
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2562cf21
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2562cf21
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2562cf21
Branch: refs/heads/master
Commit: 2562cf21a2fdfd3b0301160e28b7109d85f38cfe
Parents: b29eb38
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Mon Apr 13 11:16:35 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon Apr 13 11:16:55 2015 +0200
----------------------------------------------------------------------
.../broker/region/TopicSubscription.java | 33 +++++++++++++-------
.../apache/activemq/advisory/AdvisoryTests.java | 28 +++++++++++++++--
2 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/2562cf21/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index e81be74..fe3d911 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -73,6 +73,7 @@ public class TopicSubscription extends AbstractSubscription {
protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit;
protected boolean active = false;
+ protected boolean discarding = false;
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
@@ -107,6 +108,11 @@ public class TopicSubscription extends AbstractSubscription {
node = new IndirectMessageReference(node.getMessage());
enqueueCounter.incrementAndGet();
synchronized (matchedListMutex) {
+ // if this subscriber is already discarding a message, we don't want to add
+ // any more messages to it as those messages can only be advisories generated in the process,
+ // which can trigger the recursive call loop
+ if (discarding) return;
+
if (!isFull() && matched.isEmpty()) {
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@@ -639,18 +645,23 @@ public class TopicSubscription extends AbstractSubscription {
}
private void discard(MessageReference message) {
- message.decrementReferenceCount();
- matched.remove(message);
- discarded++;
- if(destination != null) {
- destination.getDestinationStatistics().getDequeues().increment();
- }
- LOG.debug("{}, discarding message {}", this, message);
- Destination dest = (Destination) message.getRegionDestination();
- if (dest != null) {
- dest.messageDiscarded(getContext(), this, message);
+ discarding = true;
+ try {
+ message.decrementReferenceCount();
+ matched.remove(message);
+ discarded++;
+ if (destination != null) {
+ destination.getDestinationStatistics().getDequeues().increment();
+ }
+ LOG.debug("{}, discarding message {}", this, message);
+ Destination dest = (Destination) message.getRegionDestination();
+ if (dest != null) {
+ dest.messageDiscarded(getContext(), this, message);
+ }
+ broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
+ } finally {
+ discarding = false;
}
- broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/2562cf21/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 4bb9053..5e5eb7f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -33,11 +33,11 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.*;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
/**
*
@@ -161,6 +161,28 @@ public class AdvisoryTests extends TestCase {
assertNotNull(msg);
}
+ public void testMessageDLQd() throws Exception {
+ ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
+ policy.setTopicPrefetch(2);
+ ((ActiveMQConnection)connection).setPrefetchPolicy(policy);
+ Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Topic topic = s.createTopic(getClass().getName());
+
+ Topic advisoryTopic = s.createTopic(">");
+ for (int i = 0; i < 100; i++) {
+ MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+ }
+
+
+ MessageProducer producer = s.createProducer(topic);
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ BytesMessage m = s.createBytesMessage();
+ producer.send(m);
+ }
+ // we should get here without StackOverflow
+ }
+
public void xtestMessageDiscardedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName());