You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/31 15:48:32 UTC
[pulsar] branch master updated: FIX Key_Shared dispatch ordering of
message redelivery. (#4406)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9db06a2 FIX Key_Shared dispatch ordering of message redelivery. (#4406)
9db06a2 is described below
commit 9db06a2c1e27dc61104ac97802a2bc07c2bf9dad
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri May 31 23:48:28 2019 +0800
FIX Key_Shared dispatch ordering of message redelivery. (#4406)
---
.../org/apache/bookkeeper/mledger/ManagedCursor.java | 19 ++++++++++++++++++-
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 ++++++++++++
.../mledger/impl/ManagedCursorContainerTest.java | 5 +++++
.../PersistentDispatcherMultipleConsumers.java | 9 ++++++---
...ersistentStickyKeyDispatcherMultipleConsumers.java | 7 +++++++
.../pulsar/client/api/KeySharedSubscriptionTest.java | 9 +++++++++
6 files changed, 57 insertions(+), 4 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index e1bcec7..fc3f6a4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -467,7 +467,7 @@ public interface ManagedCursor {
throws InterruptedException, ManagedLedgerException;
/**
- * Read the specified set of positions from ManagedLedger.
+ * Read the specified set of positions from ManagedLedger without ordering.
*
* @param positions
* set of positions to read
@@ -482,6 +482,23 @@ public interface ManagedCursor {
Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);
/**
+ * Read the specified set of positions from ManagedLedger.
+ *
+ * @param positions
+ * set of positions to read
+ * @param callback
+ * callback object returning the list of entries
+ * @param ctx
+ * opaque context
+ * @param sortEntries
+ * callback with sorted entry list.
+ * @return skipped positions
+ * set of positions which are already deleted/acknowledged and skipped while replaying them
+ */
+ Set<? extends Position> asyncReplayEntries(
+ Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx, boolean sortEntries);
+
+ /**
* Close the cursor and releases the associated resources.
*
* @throws InterruptedException
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index cdad87d..124f82a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
@@ -1027,6 +1028,12 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions,
ReadEntriesCallback callback, Object ctx) {
+ return asyncReplayEntries(positions, callback, ctx, false);
+ }
+
+ @Override
+ public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions,
+ ReadEntriesCallback callback, Object ctx, boolean sortEntries) {
List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
if (positions.isEmpty()) {
callback.readEntriesComplete(entries, ctx);
@@ -1062,6 +1069,11 @@ public class ManagedCursorImpl implements ManagedCursor {
} else {
entries.add(entry);
if (--pendingCallbacks == 0) {
+ if (sortEntries) {
+ entries.sort((e1, e2) -> ComparisonChain.start()
+ .compare(e1.getLedgerId(), e2.getLedgerId())
+ .compare(e1.getEntryId(), e2.getEntryId()).result());
+ }
callback.readEntriesComplete(entries, ctx);
}
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 458ca4c..dfd233a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -238,6 +238,11 @@ public class ManagedCursorContainerTest {
}
@Override
+ public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx, boolean sortEntries) {
+ return Sets.newConcurrentHashSet();
+ }
+
+ @Override
public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
throws InterruptedException, ManagedLedgerException {
return null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index fad8343..ffc9ce2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
/**
*/
-public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
+public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
@@ -303,8 +303,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
}
havePendingReplayRead = true;
- Set<? extends Position> deletedMessages = cursor.asyncReplayEntries(messagesToReplayNow, this,
- ReadType.Replay);
+ Set<? extends Position> deletedMessages = asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
@@ -335,6 +334,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
}
}
+ protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
+ return cursor.asyncReplayEntries(positions, this, ReadType.Replay);
+ }
+
@Override
public boolean isConsumerConnected() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ca4f525..e13a0c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -25,10 +25,12 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
@@ -161,6 +163,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return SubType.Key_Shared;
}
+ @Override
+ protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
+ return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
+ }
+
private byte[] peekStickyKey(ByteBuf metadataAndPayload) {
metadataAndPayload.markReaderIndex();
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 9c67817..a6f357e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -390,6 +390,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
int redeliveryCount = check.getValue() / 2;
log.info("[{}] Consumer wait for {} messages redelivery ...", redeliveryCount);
// messages not acked, test redelivery
+ lastMessageForKey = new HashMap<>();
for (int i = 0; i < redeliveryCount; i++) {
Message<Integer> message = check.getKey().receive();
received++;
@@ -397,6 +398,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
log.info("[{}] Receive redeliver message key: {} value: {} messageId: {}",
check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
+ // check redelivery messages is order by key
+ if (lastMessageForKey.get(key) == null) {
+ Assert.assertNotNull(message);
+ } else {
+ Assert.assertTrue(message.getValue()
+ .compareTo(lastMessageForKey.get(key).getValue()) > 0);
+ }
+ lastMessageForKey.put(key, message);
}
Message noMessages = null;
try {