You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/07/28 19:03:53 UTC
[pulsar] branch master updated: Issue 16802: fix Repeated messages of shared dispatcher (#16812)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 825b68db7be Issue 16802: fix Repeated messages of shared dispatcher (#16812)
825b68db7be is described below
commit 825b68db7bed1c79af4b7b69b48bee76ebe75af5
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Jul 28 21:03:43 2022 +0200
Issue 16802: fix Repeated messages of shared dispatcher (#16812)
---
.../PersistentDispatcherMultipleConsumers.java | 47 ++++++++++----
...istentStickyKeyDispatcherMultipleConsumers.java | 13 ++--
.../service/persistent/DelayedDeliveryTest.java | 1 -
...ntStickyKeyDispatcherMultipleConsumersTest.java | 75 +++++++++++++---------
4 files changed, 87 insertions(+), 49 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 71faeb7adba..cf58bfd43ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -89,7 +89,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
-
+ protected boolean sendInProgress;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -240,6 +240,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
public synchronized void readMoreEntries() {
+ if (sendInProgress) {
+ // we cannot read more entries while sending the previous batch
+ // otherwise we could re-read the same entries and send duplicates
+ return;
+ }
if (shouldPauseDeliveryForDelayTracker()) {
return;
}
@@ -496,7 +501,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
@Override
- public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
+ public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
ReadType readType = (ReadType) ctx;
if (readType == ReadType.Normal) {
havePendingRead = false;
@@ -528,18 +533,39 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}
+ // dispatch messages to a separate thread, but still in order for this subscription
+ // sendMessagesToConsumers is responsible for running broker-side filters
+ // that may be quite expensive
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
- // dispatch messages to a separate thread, but still in order for this subscription
- // sendMessagesToConsumers is responsible for running broker-side filters
- // that may be quite expensive
+ // setting sendInProgress here, because sendMessagesToConsumers will be executed
+ // in a separate thread, and we want to prevent more reads
+ sendInProgress = true;
dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
} else {
sendMessagesToConsumers(readType, entries);
}
}
- protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+ protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+ sendInProgress = true;
+ boolean readMoreEntries;
+ try {
+ readMoreEntries = trySendMessagesToConsumers(readType, entries);
+ } finally {
+ sendInProgress = false;
+ }
+ if (readMoreEntries) {
+ readMoreEntries();
+ }
+ }
+ /**
+ * Dispatch the messages to the Consumers.
+ * @return true if you want to trigger a new read.
+ * This method is overridden by other classes, please take a look to other implementations
+ * if you need to change it.
+ */
+ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
@@ -547,8 +573,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
int entriesToDispatch = entries.size();
// Trigger read more messages
if (entriesToDispatch == 0) {
- readMoreEntries();
- return;
+ return true;
}
final MessageMetadata[] metadataArray = entries.stream()
.map(entry -> Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
@@ -578,7 +603,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
- return;
+ return false;
}
// round-robin dispatch batch size for this consumer
@@ -623,7 +648,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
entriesToDispatch -= messagesForC;
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
+ "PersistentDispatcherMultipleConsumers",
name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
@@ -658,7 +683,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
entry.release();
});
}
- readMoreEntries();
+ return true;
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 558e3f129ce..e42995e9247 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -152,7 +152,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
};
@Override
- protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
@@ -160,14 +160,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// Trigger read more messages
if (entriesCount == 0) {
- readMoreEntries();
- return;
+ return true;
}
if (consumerSet.isEmpty()) {
entries.forEach(Entry::release);
cursor.rewind();
- return;
+ return false;
}
// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
@@ -201,8 +200,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
- readMoreEntries();
- return;
+ return true;
}
}
}
@@ -331,14 +329,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
// readMoreEntries should run regardless whether or not stuck is caused by
// stuckConsumers for avoid stopping dispatch.
+ sendInProgress = false;
topic.getBrokerService().executor().execute(() -> readMoreEntries());
} else if (currentThreadKeyNumber == 0) {
+ sendInProgress = false;
topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
readMoreEntries();
}
}, 100, TimeUnit.MILLISECONDS);
}
+ return false;
}
private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index aa787907329..8b62845572d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -278,7 +278,6 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
for (int i = 0; i < N; i++) {
msg = consumer.receive(10, TimeUnit.SECONDS);
receivedMsgs.add(msg.getValue());
- consumer.acknowledge(msg);
}
assertEquals(receivedMsgs.size(), N);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 72286b01c76..aa87b2aaa25 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.netty.channel.EventLoopGroup;
@@ -69,6 +70,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@@ -99,6 +101,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
+ doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -115,7 +118,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
doReturn(eventLoopGroup).when(brokerMock).executor();
doAnswer(invocation -> {
- ((Runnable)invocation.getArguments()[0]).run();
+ orderedExecutor.execute(((Runnable)invocation.getArguments()[0]));
return null;
}).when(eventLoopGroup).execute(any(Runnable.class));
@@ -180,19 +183,21 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
fail("Failed to readEntriesComplete.", e);
}
- ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
- verify(consumerMock, times(1)).sendMessages(
- anyList(),
- any(EntryBatchSizes.class),
- any(EntryBatchIndexesAcks.class),
- totalMessagesCaptor.capture(),
- anyLong(),
- anyLong(),
- any(RedeliveryTracker.class)
- );
-
- List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
- Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
+ Awaitility.await().untilAsserted(() -> {
+ ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
+ verify(consumerMock, times(1)).sendMessages(
+ anyList(),
+ any(EntryBatchSizes.class),
+ any(EntryBatchIndexesAcks.class),
+ totalMessagesCaptor.capture(),
+ anyLong(),
+ anyLong(),
+ any(RedeliveryTracker.class)
+ );
+
+ List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
+ Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
+ });
}
@Test(timeOut = 10000)
@@ -283,21 +288,23 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
// and then stop to dispatch to slowConsumer
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries);
- verify(consumerMock, times(1)).sendMessages(
- argThat(arg -> {
- assertEquals(arg.size(), 1);
- Entry entry = arg.get(0);
- assertEquals(entry.getLedgerId(), 1);
- assertEquals(entry.getEntryId(), 3);
- return true;
- }),
- any(EntryBatchSizes.class),
- any(EntryBatchIndexesAcks.class),
- anyInt(),
- anyLong(),
- anyLong(),
- any(RedeliveryTracker.class)
- );
+ Awaitility.await().untilAsserted(() -> {
+ verify(consumerMock, times(1)).sendMessages(
+ argThat(arg -> {
+ assertEquals(arg.size(), 1);
+ Entry entry = arg.get(0);
+ assertEquals(entry.getLedgerId(), 1);
+ assertEquals(entry.getEntryId(), 3);
+ return true;
+ }),
+ any(EntryBatchSizes.class),
+ any(EntryBatchIndexesAcks.class),
+ anyInt(),
+ anyLong(),
+ anyLong(),
+ any(RedeliveryTracker.class)
+ );
+ });
verify(slowConsumerMock, times(0)).sendMessages(
anyList(),
any(EntryBatchSizes.class),
@@ -399,9 +406,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean());
// Mock Cursor#asyncReadEntriesOrWait
+ AtomicBoolean asyncReadEntriesOrWaitCalled = new AtomicBoolean();
doAnswer(invocationOnMock -> {
- ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
- .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ if (asyncReadEntriesOrWaitCalled.compareAndSet(false, true)) {
+ ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
+ .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ } else {
+ ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
+ .readEntriesComplete(Collections.emptyList(), PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ }
return null;
}).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(),
any(PersistentStickyKeyDispatcherMultipleConsumers.class),