You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/07/28 19:03:53 UTC

[pulsar] branch master updated: Issue 16802: fix Repeated messages of shared dispatcher (#16812)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 825b68db7be Issue 16802: fix Repeated messages of shared dispatcher (#16812)
825b68db7be is described below

commit 825b68db7bed1c79af4b7b69b48bee76ebe75af5
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Jul 28 21:03:43 2022 +0200

    Issue 16802: fix Repeated messages of shared dispatcher (#16812)
---
 .../PersistentDispatcherMultipleConsumers.java     | 47 ++++++++++----
 ...istentStickyKeyDispatcherMultipleConsumers.java | 13 ++--
 .../service/persistent/DelayedDeliveryTest.java    |  1 -
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 75 +++++++++++++---------
 4 files changed, 87 insertions(+), 49 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 71faeb7adba..cf58bfd43ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -89,7 +89,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
-
+    protected boolean sendInProgress;
     protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -240,6 +240,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     public synchronized void readMoreEntries() {
+        if (sendInProgress) {
+            // we cannot read more entries while sending the previous batch
+            // otherwise we could re-read the same entries and send duplicates
+            return;
+        }
         if (shouldPauseDeliveryForDelayTracker()) {
             return;
         }
@@ -496,7 +501,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     @Override
-    public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
+    public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
         ReadType readType = (ReadType) ctx;
         if (readType == ReadType.Normal) {
             havePendingRead = false;
@@ -528,18 +533,39 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
         }
 
+        // dispatch messages to a separate thread, but still in order for this subscription
+        // sendMessagesToConsumers is responsible for running broker-side filters
+        // that may be quite expensive
         if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
-            // dispatch messages to a separate thread, but still in order for this subscription
-            // sendMessagesToConsumers is responsible for running broker-side filters
-            // that may be quite expensive
+            // setting sendInProgress here, because sendMessagesToConsumers will be executed
+            // in a separate thread, and we want to prevent more reads
+            sendInProgress = true;
             dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
         } else {
             sendMessagesToConsumers(readType, entries);
         }
     }
 
-    protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+    protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+        sendInProgress = true;
+        boolean readMoreEntries;
+        try {
+            readMoreEntries = trySendMessagesToConsumers(readType, entries);
+        } finally {
+            sendInProgress = false;
+        }
+        if (readMoreEntries) {
+            readMoreEntries();
+        }
+    }
 
+    /**
+     * Dispatch the messages to the Consumers.
+     * @return true if you want to trigger a new read.
+     * This method is overridden by other classes, please take a look to other implementations
+     * if you need to change it.
+     */
+    protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
         if (needTrimAckedMessages()) {
             cursor.trimDeletedEntries(entries);
         }
@@ -547,8 +573,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         int entriesToDispatch = entries.size();
         // Trigger read more messages
         if (entriesToDispatch == 0) {
-            readMoreEntries();
-            return;
+            return true;
         }
         final MessageMetadata[] metadataArray = entries.stream()
                 .map(entry -> Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
@@ -578,7 +603,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size());
                 entries.subList(start, entries.size()).forEach(Entry::release);
                 cursor.rewind();
-                return;
+                return false;
             }
 
             // round-robin dispatch batch size for this consumer
@@ -623,7 +648,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             entriesToDispatch -= messagesForC;
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
                     -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
-            if (log.isDebugEnabled()){
+            if (log.isDebugEnabled()) {
                 log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
                                 + "PersistentDispatcherMultipleConsumers",
                         name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
@@ -658,7 +683,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 entry.release();
             });
         }
-        readMoreEntries();
+        return true;
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 558e3f129ce..e42995e9247 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -152,7 +152,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             };
 
     @Override
-    protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+    protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
         long totalEntries = 0;
@@ -160,14 +160,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         // Trigger read more messages
         if (entriesCount == 0) {
-            readMoreEntries();
-            return;
+            return true;
         }
 
         if (consumerSet.isEmpty()) {
             entries.forEach(Entry::release);
             cursor.rewind();
-            return;
+            return false;
         }
 
         // A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
@@ -201,8 +200,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                         } else if (readType == ReadType.Replay) {
                             entries.forEach(Entry::release);
                         }
-                        readMoreEntries();
-                        return;
+                        return true;
                     }
                 }
             }
@@ -331,14 +329,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             }
             // readMoreEntries should run regardless whether or not stuck is caused by
             // stuckConsumers for avoid stopping dispatch.
+            sendInProgress = false;
             topic.getBrokerService().executor().execute(() -> readMoreEntries());
         }  else if (currentThreadKeyNumber == 0) {
+            sendInProgress = false;
             topic.getBrokerService().executor().schedule(() -> {
                 synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
                     readMoreEntries();
                 }
             }, 100, TimeUnit.MILLISECONDS);
         }
+        return false;
     }
 
     private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index aa787907329..8b62845572d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -278,7 +278,6 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
         for (int i = 0; i < N; i++) {
             msg = consumer.receive(10, TimeUnit.SECONDS);
             receivedMsgs.add(msg.getValue());
-            consumer.acknowledge(msg);
         }
 
         assertEquals(receivedMsgs.size(), N);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 72286b01c76..aa87b2aaa25 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import io.netty.channel.EventLoopGroup;
@@ -69,6 +70,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -99,6 +101,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
         doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
         doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
+        doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
 
         pulsarMock = mock(PulsarService.class);
         doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -115,7 +118,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
         doReturn(eventLoopGroup).when(brokerMock).executor();
         doAnswer(invocation -> {
-            ((Runnable)invocation.getArguments()[0]).run();
+            orderedExecutor.execute(((Runnable)invocation.getArguments()[0]));
             return null;
         }).when(eventLoopGroup).execute(any(Runnable.class));
 
@@ -180,19 +183,21 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
             fail("Failed to readEntriesComplete.", e);
         }
 
-        ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
-        verify(consumerMock, times(1)).sendMessages(
-                anyList(),
-                any(EntryBatchSizes.class),
-                any(EntryBatchIndexesAcks.class),
-                totalMessagesCaptor.capture(),
-                anyLong(),
-                anyLong(),
-                any(RedeliveryTracker.class)
-        );
-
-        List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
-        Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
+        Awaitility.await().untilAsserted(() -> {
+                    ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
+                    verify(consumerMock, times(1)).sendMessages(
+                            anyList(),
+                            any(EntryBatchSizes.class),
+                            any(EntryBatchIndexesAcks.class),
+                            totalMessagesCaptor.capture(),
+                            anyLong(),
+                            anyLong(),
+                            any(RedeliveryTracker.class)
+                    );
+
+                    List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
+                    Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
+                });
     }
 
     @Test(timeOut = 10000)
@@ -283,21 +288,23 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         // and then stop to dispatch to slowConsumer
         persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries);
 
-        verify(consumerMock, times(1)).sendMessages(
-                argThat(arg -> {
-                    assertEquals(arg.size(), 1);
-                    Entry entry = arg.get(0);
-                    assertEquals(entry.getLedgerId(), 1);
-                    assertEquals(entry.getEntryId(), 3);
-                    return true;
-                }),
-                any(EntryBatchSizes.class),
-                any(EntryBatchIndexesAcks.class),
-                anyInt(),
-                anyLong(),
-                anyLong(),
-                any(RedeliveryTracker.class)
-        );
+        Awaitility.await().untilAsserted(() -> {
+            verify(consumerMock, times(1)).sendMessages(
+                    argThat(arg -> {
+                        assertEquals(arg.size(), 1);
+                        Entry entry = arg.get(0);
+                        assertEquals(entry.getLedgerId(), 1);
+                        assertEquals(entry.getEntryId(), 3);
+                        return true;
+                    }),
+                    any(EntryBatchSizes.class),
+                    any(EntryBatchIndexesAcks.class),
+                    anyInt(),
+                    anyLong(),
+                    anyLong(),
+                    any(RedeliveryTracker.class)
+            );
+        });
         verify(slowConsumerMock, times(0)).sendMessages(
                 anyList(),
                 any(EntryBatchSizes.class),
@@ -399,9 +406,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
                 eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean());
 
         // Mock Cursor#asyncReadEntriesOrWait
+        AtomicBoolean asyncReadEntriesOrWaitCalled = new AtomicBoolean();
         doAnswer(invocationOnMock -> {
-            ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
-                    .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+            if (asyncReadEntriesOrWaitCalled.compareAndSet(false, true)) {
+                ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
+                        .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+            } else {
+                ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
+                        .readEntriesComplete(Collections.emptyList(), PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+            }
             return null;
         }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(),
                 any(PersistentStickyKeyDispatcherMultipleConsumers.class),