You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/04/13 11:17:00 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5718 - don't add messages to subscriber while it's discarding

Repository: activemq
Updated Branches:
  refs/heads/master b29eb384b -> 2562cf21a


https://issues.apache.org/jira/browse/AMQ-5718 - don't add messages to subscriber while it's discarding


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2562cf21
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2562cf21
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2562cf21

Branch: refs/heads/master
Commit: 2562cf21a2fdfd3b0301160e28b7109d85f38cfe
Parents: b29eb38
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Mon Apr 13 11:16:35 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon Apr 13 11:16:55 2015 +0200

----------------------------------------------------------------------
 .../broker/region/TopicSubscription.java        | 33 +++++++++++++-------
 .../apache/activemq/advisory/AdvisoryTests.java | 28 +++++++++++++++--
 2 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2562cf21/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index e81be74..fe3d911 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -73,6 +73,7 @@ public class TopicSubscription extends AbstractSubscription {
     protected boolean enableAudit = false;
     protected ActiveMQMessageAudit audit;
     protected boolean active = false;
+    protected boolean discarding = false;
 
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
         super(broker, context, info);
@@ -107,6 +108,11 @@ public class TopicSubscription extends AbstractSubscription {
         node = new IndirectMessageReference(node.getMessage());
         enqueueCounter.incrementAndGet();
         synchronized (matchedListMutex) {
+            // if this subscriber is already discarding a message, we don't want to add
+            // any more messages to it as those messages can only be advisories generated in the process,
+            // which can trigger the recursive call loop
+            if (discarding) return;
+
             if (!isFull() && matched.isEmpty()) {
                 // if maximumPendingMessages is set we will only discard messages which
                 // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@@ -639,18 +645,23 @@ public class TopicSubscription extends AbstractSubscription {
     }
 
     private void discard(MessageReference message) {
-        message.decrementReferenceCount();
-        matched.remove(message);
-        discarded++;
-        if(destination != null) {
-            destination.getDestinationStatistics().getDequeues().increment();
-        }
-        LOG.debug("{}, discarding message {}", this, message);
-        Destination dest = (Destination) message.getRegionDestination();
-        if (dest != null) {
-            dest.messageDiscarded(getContext(), this, message);
+        discarding = true;
+        try {
+            message.decrementReferenceCount();
+            matched.remove(message);
+            discarded++;
+            if (destination != null) {
+                destination.getDestinationStatistics().getDequeues().increment();
+            }
+            LOG.debug("{}, discarding message {}", this, message);
+            Destination dest = (Destination) message.getRegionDestination();
+            if (dest != null) {
+                dest.messageDiscarded(getContext(), this, message);
+            }
+            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
+        } finally {
+            discarding = false;
         }
-        broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/2562cf21/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 4bb9053..5e5eb7f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -33,11 +33,11 @@ import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.*;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
 
 /**
  *
@@ -161,6 +161,28 @@ public class AdvisoryTests extends TestCase {
         assertNotNull(msg);
     }
 
+    public void testMessageDLQd() throws Exception {
+        ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
+        policy.setTopicPrefetch(2);
+        ((ActiveMQConnection)connection).setPrefetchPolicy(policy);
+        Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = s.createTopic(getClass().getName());
+
+        Topic advisoryTopic = s.createTopic(">");
+        for (int i = 0; i < 100; i++) {
+            MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        }
+
+
+        MessageProducer producer = s.createProducer(topic);
+        int count = 10;
+        for (int i = 0; i < count; i++) {
+            BytesMessage m = s.createBytesMessage();
+            producer.send(m);
+        }
+        // we should get here without StackOverflow
+    }
+
     public void xtestMessageDiscardedAdvisory() throws Exception {
         Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic topic = s.createTopic(getClass().getName());