You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/02/06 19:57:56 UTC

[pulsar] branch master updated: Prevent StackOverFlowException in KEY_SHARED subscription (#14121)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f97a76e  Prevent StackOverFlowException in KEY_SHARED subscription (#14121)
f97a76e is described below

commit f97a76e80a6966c601f73e24fe4cfb2af0114df4
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sun Feb 6 20:56:43 2022 +0100

    Prevent StackOverFlowException in KEY_SHARED subscription (#14121)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java           | 2 +-
 .../PersistentStickyKeyDispatcherMultipleConsumersTest.java       | 8 ++++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 94850a6..ed19bdc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -320,7 +320,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             }
             // readMoreEntries should run regardless whether or not stuck is caused by
             // stuckConsumers for avoid stopping dispatch.
-            readMoreEntries();
+            topic.getBrokerService().executor().execute(() -> readMoreEntries());
         }  else if (currentThreadKeyNumber == 0) {
             topic.getBrokerService().executor().schedule(() -> {
                 synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 9d085ff..db51af4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import io.netty.channel.EventLoopGroup;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
@@ -120,6 +121,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
         topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);
 
+        EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+        doReturn(eventLoopGroup).when(brokerMock).executor();
+        doAnswer(invocation -> {
+            ((Runnable)invocation.getArguments()[0]).run();
+            return null;
+        }).when(eventLoopGroup).execute(any(Runnable.class));
+
         topicMock = mock(PersistentTopic.class);
         doReturn(brokerMock).when(topicMock).getBrokerService();
         doReturn(topicName).when(topicMock).getName();