You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/06 05:32:08 UTC

[pulsar] branch branch-2.7 updated: Fix lost compaction data due to compaction properties missed during reset-cursor. (#16404)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 77d5bb5ff1b Fix lost compaction data due to compaction properties missed during reset-cursor. (#16404)
77d5bb5ff1b is described below

commit 77d5bb5ff1bc68b7fff7ef869b35c071b550b670
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jul 6 13:31:55 2022 +0800

    Fix lost compaction data due to compaction properties missed during reset-cursor. (#16404)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java  | 10 ++++++++--
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  |  3 ++-
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 77012b6db8d..5986334cb06 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -188,6 +188,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     private long entriesReadCount;
     private long entriesReadSize;
     private int individualDeletedMessagesSerializedSize;
+    private static final String COMPACTION_CURSOR_NAME = "__compaction";
 
     class MarkDeleteEntry {
         final PositionImpl newPosition;
@@ -1033,7 +1034,8 @@ public class ManagedCursorImpl implements ManagedCursor {
                                 Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
                     }
                     markDeletePosition = newMarkDeletePosition;
-                    lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
+                    lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ?
+                            getProperties() : Collections.emptyMap(),
                             null, null);
                     individualDeletedMessages.clear();
                     if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
@@ -1084,7 +1086,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         };
 
         lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
-        internalAsyncMarkDelete(newPosition, Collections.emptyMap(), new MarkDeleteCallback() {
+        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
                 finalCallback.operationComplete();
@@ -3016,5 +3018,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         return Math.min(maxEntriesBasedOnSize, maxEntries);
     }
 
+    private boolean isCompactionCursor() {
+        return COMPACTION_CURSOR_NAME.equals(name);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f8d7098d042..b66b9a8aa8d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2294,7 +2294,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
                     && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
                     && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
-                cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
+                cursor.asyncMarkDelete(highestPositionToDelete, cursor.getProperties(), new MarkDeleteCallback() {
+
                     @Override
                     public void markDeleteComplete(Object ctx) {
                     }