You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/19 01:04:39 UTC

[pulsar] 02/02: KeyShared dispatcher on non-persistent topics was not respecting consumer flow-control (#11692)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bd5f75f6f351bb0ac0dfeb50fec1dbac501fe748
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 17 18:22:11 2021 -0700

    KeyShared dispatcher on non-persistent topics was not respecting consumer flow-control (#11692)
    
    ### Motivation
    
    Fixes #10734
    
    The KeyShared dispatcher for non-persistent topics is not taking the flow control advertised by consumers to apply back pressure from consumer. That results in broker in pushing messages to consumer without restriction, causing memory issue in consumers.
    
    (cherry picked from commit 5835fd23e5347dd73ff073c371366a6b4ba8d6c4)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 25 +++++++++++++---
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 34 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index dc0d8a6..704fd93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.protocol.Commands;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
 
@@ -89,7 +90,11 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
 
         for (Entry entry : entries) {
             Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer()));
-            groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry);
+            if (consumer != null) {
+                groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry);
+            } else {
+                entry.release();
+            }
         }
 
         for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : groupedEntries.entrySet()) {
@@ -99,9 +104,21 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size());
             filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, false);
-            consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
-                    sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
-            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
+
+            if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) {
+                consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
+                        sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
+                        getRedeliveryTracker());
+                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
+            } else {
+                entriesForConsumer.forEach(e -> {
+                    int totalMsgs = Commands.getNumberOfMessagesInBatch(e.getDataBuffer(), subscription.toString(), -1);
+                    if (totalMsgs > 0) {
+                        msgDrop.recordEvent(totalMsgs);
+                    }
+                    e.release();
+                });
+            }
         }
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index 49dac2a..990bd8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -58,6 +58,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
@@ -118,6 +119,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
     @Test(timeOut = 10000)
     public void testSendMessage() throws BrokerServiceException {
         Consumer consumerMock = mock(Consumer.class);
+        when(consumerMock.getAvailablePermits()).thenReturn(1000);
+        when(consumerMock.isWritable()).thenReturn(true);
         nonpersistentDispatcher.addConsumer(consumerMock);
 
         List<Entry> entries = new ArrayList<>();
@@ -146,6 +149,37 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
                 eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
     }
 
+    @Test(timeOut = 10000)
+    public void testSendMessageRespectFlowControl() throws BrokerServiceException {
+        Consumer consumerMock = mock(Consumer.class);
+        nonpersistentDispatcher.addConsumer(consumerMock);
+
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+        entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+        doAnswer(invocationOnMock -> {
+            ChannelPromise mockPromise = mock(ChannelPromise.class);
+            List<Entry> receivedEntries = invocationOnMock.getArgument(0, List.class);
+            for (int index = 1; index <= receivedEntries.size(); index++) {
+                Entry entry = receivedEntries.get(index - 1);
+                assertEquals(entry.getLedgerId(), 1);
+                assertEquals(entry.getEntryId(), index);
+                ByteBuf byteBuf = entry.getDataBuffer();
+                MessageMetadata messageMetadata = Commands.parseMessageMetadata(byteBuf);
+                assertEquals(byteBuf.toString(UTF_8), "message" + index);
+            };
+            return mockPromise;
+        }).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(),
+                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+        try {
+            nonpersistentDispatcher.sendMessages(entries);
+        } catch (Exception e) {
+            fail("Failed to sendMessages.", e);
+        }
+        verify(consumerMock, times(0)).sendMessages(any(List.class), any(EntryBatchSizes.class),
+                eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+    }
+
     private ByteBuf createMessage(String message, int sequenceId) {
         return createMessage(message, sequenceId, "testKey");
     }