You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/31 15:48:32 UTC

[pulsar] branch master updated: FIX Key_Shared dispatch ordering of message redelivery. (#4406)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9db06a2  FIX Key_Shared dispatch ordering of message redelivery. (#4406)
9db06a2 is described below

commit 9db06a2c1e27dc61104ac97802a2bc07c2bf9dad
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri May 31 23:48:28 2019 +0800

    FIX Key_Shared dispatch ordering of message redelivery. (#4406)
---
 .../org/apache/bookkeeper/mledger/ManagedCursor.java  | 19 ++++++++++++++++++-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java    | 12 ++++++++++++
 .../mledger/impl/ManagedCursorContainerTest.java      |  5 +++++
 .../PersistentDispatcherMultipleConsumers.java        |  9 ++++++---
 ...ersistentStickyKeyDispatcherMultipleConsumers.java |  7 +++++++
 .../pulsar/client/api/KeySharedSubscriptionTest.java  |  9 +++++++++
 6 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index e1bcec7..fc3f6a4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -467,7 +467,7 @@ public interface ManagedCursor {
             throws InterruptedException, ManagedLedgerException;
 
     /**
-     * Read the specified set of positions from ManagedLedger.
+     * Read the specified set of positions from ManagedLedger without ordering.
      *
      * @param positions
      *            set of positions to read
@@ -482,6 +482,23 @@ public interface ManagedCursor {
         Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);
 
     /**
+     * Read the specified set of positions from ManagedLedger.
+     *
+     * @param positions
+     *            set of positions to read
+     * @param callback
+     *            callback object returning the list of entries
+     * @param ctx
+     *            opaque context
+     * @param sortEntries
+     *            callback with sorted entry list.
+     * @return skipped positions
+     *              set of positions which are already deleted/acknowledged and skipped while replaying them
+     */
+    Set<? extends Position> asyncReplayEntries(
+            Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx, boolean sortEntries);
+
+    /**
      * Close the cursor and releases the associated resources.
      *
      * @throws InterruptedException
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index cdad87d..124f82a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
@@ -1027,6 +1028,12 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions,
             ReadEntriesCallback callback, Object ctx) {
+        return asyncReplayEntries(positions, callback, ctx, false);
+    }
+
+    @Override
+    public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions,
+            ReadEntriesCallback callback, Object ctx, boolean sortEntries) {
         List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
         if (positions.isEmpty()) {
             callback.readEntriesComplete(entries, ctx);
@@ -1062,6 +1069,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                 } else {
                     entries.add(entry);
                     if (--pendingCallbacks == 0) {
+                        if (sortEntries) {
+                            entries.sort((e1, e2) -> ComparisonChain.start()
+                                    .compare(e1.getLedgerId(), e2.getLedgerId())
+                                    .compare(e1.getEntryId(), e2.getEntryId()).result());
+                        }
                         callback.readEntriesComplete(entries, ctx);
                     }
                 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 458ca4c..dfd233a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -238,6 +238,11 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
+        public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx, boolean sortEntries) {
+            return Sets.newConcurrentHashSet();
+        }
+
+        @Override
         public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
                 throws InterruptedException, ManagedLedgerException {
             return null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index fad8343..ffc9ce2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  */
-public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
+public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
 
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
@@ -303,8 +303,7 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
                 }
 
                 havePendingReplayRead = true;
-                Set<? extends Position> deletedMessages = cursor.asyncReplayEntries(messagesToReplayNow, this,
-                        ReadType.Replay);
+                Set<? extends Position> deletedMessages = asyncReplayEntries(messagesToReplayNow);
                 // clear already acked positions from replay bucket
 
                 deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
@@ -335,6 +334,10 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
         }
     }
 
+    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
+        return cursor.asyncReplayEntries(positions, this, ReadType.Replay);
+    }
+
 
     @Override
     public boolean isConsumerConnected() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ca4f525..e13a0c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -25,10 +25,12 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -161,6 +163,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return SubType.Key_Shared;
     }
 
+    @Override
+    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
+        return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
+    }
+
     private byte[] peekStickyKey(ByteBuf metadataAndPayload) {
         metadataAndPayload.markReaderIndex();
         MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 9c67817..a6f357e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -390,6 +390,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
             int redeliveryCount = check.getValue() / 2;
             log.info("[{}] Consumer wait for {} messages redelivery ...", redeliveryCount);
             // messages not acked, test redelivery
+            lastMessageForKey = new HashMap<>();
             for (int i = 0; i < redeliveryCount; i++) {
                 Message<Integer> message = check.getKey().receive();
                 received++;
@@ -397,6 +398,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                 String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
                 log.info("[{}] Receive redeliver message key: {} value: {} messageId: {}",
                         check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
+                // check redelivery messages is order by key
+                if (lastMessageForKey.get(key) == null) {
+                    Assert.assertNotNull(message);
+                } else {
+                    Assert.assertTrue(message.getValue()
+                            .compareTo(lastMessageForKey.get(key).getValue()) > 0);
+                }
+                lastMessageForKey.put(key, message);
             }
             Message noMessages = null;
             try {