You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/05 09:53:17 UTC

[pulsar] branch branch-2.7 updated: [Branch-2.7][Cherry-pick] Fix skips compacted data for reader/consumer (#16301)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 75e6a204d86 [Branch-2.7][Cherry-pick] Fix skips compacted data for reader/consumer (#16301)
75e6a204d86 is described below

commit 75e6a204d86d440196b13cc6c88358600b8257df
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jul 5 17:53:11 2022 +0800

    [Branch-2.7][Cherry-pick] Fix skips compacted data for reader/consumer (#16301)
---
 .../src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java | 6 +++++-
 .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++----
 .../apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java | 2 +-
 .../main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java | 7 ++++++-
 4 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index a9724de2b39..763610f5740 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -398,7 +398,11 @@ public interface ManagedCursor {
      * @param newReadPosition
      *            the position where to move the cursor
      */
-    void seek(Position newReadPosition);
+    default void seek(Position newReadPosition) {
+        seek(newReadPosition, false);
+    }
+
+    void seek(Position newReadPosition, boolean force);
 
     /**
      * Clear the cursor backlog.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b932ebb78eb..77012b6db8d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2124,18 +2124,16 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @Override
-    public void seek(Position newReadPositionInt) {
+    public void seek(Position newReadPositionInt, boolean force) {
         checkArgument(newReadPositionInt instanceof PositionImpl);
         PositionImpl newReadPosition = (PositionImpl) newReadPositionInt;
 
         lock.writeLock().lock();
         try {
-            if (newReadPosition.compareTo(markDeletePosition) <= 0) {
+            if (!force && newReadPosition.compareTo(markDeletePosition) <= 0) {
                 // Make sure the newReadPosition comes after the mark delete position
                 newReadPosition = ledger.getNextValidPosition(markDeletePosition);
             }
-
-            PositionImpl oldReadPosition = readPosition;
             readPosition = newReadPosition;
         } finally {
             lock.writeLock().unlock();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index a43e6b4f922..3b64f3f0ee5 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -160,7 +160,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void seek(Position newReadPosition) {
+        public void seek(Position newReadPosition, boolean force) {
         }
 
         @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 5d0ea33cc8a..5b95f1413ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -122,7 +122,12 @@ public class CompactedTopicImpl implements CompactedTopic {
                                 return readEntries(context.ledger, startPoint, endPoint)
                                     .thenAccept((entries) -> {
                                         Entry lastEntry = entries.get(entries.size() - 1);
-                                        cursor.seek(lastEntry.getPosition().getNext());
+                                        // The compaction task depends on the last snapshot and the incremental
+                                        // entries to build the new snapshot. So for the compaction cursor, we
+                                        // need to force seek the read position to ensure the compactor can read
+                                        // the complete last snapshot because of the compactor will read the data
+                                        // before the compaction cursor mark delete position
+                                        cursor.seek(lastEntry.getPosition().getNext(), true);
                                         callback.readEntriesComplete(entries, consumer);
                                     });
                             }