You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/05 09:53:17 UTC
[pulsar] branch branch-2.7 updated: [Branch-2.7][Cherry-pick] Fix skips compacted data for reader/consumer (#16301)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 75e6a204d86 [Branch-2.7][Cherry-pick] Fix skips compacted data for reader/consumer (#16301)
75e6a204d86 is described below
commit 75e6a204d86d440196b13cc6c88358600b8257df
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jul 5 17:53:11 2022 +0800
[Branch-2.7][Cherry-pick] Fix skips compacted data for reader/consumer (#16301)
---
.../src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java | 6 +++++-
.../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++----
.../apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java | 2 +-
.../main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java | 7 ++++++-
4 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index a9724de2b39..763610f5740 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -398,7 +398,11 @@ public interface ManagedCursor {
* @param newReadPosition
* the position where to move the cursor
*/
- void seek(Position newReadPosition);
+ default void seek(Position newReadPosition) {
+ seek(newReadPosition, false);
+ }
+
+ void seek(Position newReadPosition, boolean force);
/**
* Clear the cursor backlog.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b932ebb78eb..77012b6db8d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2124,18 +2124,16 @@ public class ManagedCursorImpl implements ManagedCursor {
}
@Override
- public void seek(Position newReadPositionInt) {
+ public void seek(Position newReadPositionInt, boolean force) {
checkArgument(newReadPositionInt instanceof PositionImpl);
PositionImpl newReadPosition = (PositionImpl) newReadPositionInt;
lock.writeLock().lock();
try {
- if (newReadPosition.compareTo(markDeletePosition) <= 0) {
+ if (!force && newReadPosition.compareTo(markDeletePosition) <= 0) {
// Make sure the newReadPosition comes after the mark delete position
newReadPosition = ledger.getNextValidPosition(markDeletePosition);
}
-
- PositionImpl oldReadPosition = readPosition;
readPosition = newReadPosition;
} finally {
lock.writeLock().unlock();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index a43e6b4f922..3b64f3f0ee5 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -160,7 +160,7 @@ public class ManagedCursorContainerTest {
}
@Override
- public void seek(Position newReadPosition) {
+ public void seek(Position newReadPosition, boolean force) {
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 5d0ea33cc8a..5b95f1413ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -122,7 +122,12 @@ public class CompactedTopicImpl implements CompactedTopic {
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
Entry lastEntry = entries.get(entries.size() - 1);
- cursor.seek(lastEntry.getPosition().getNext());
+ // The compaction task depends on the last snapshot and the incremental
+ // entries to build the new snapshot. So for the compaction cursor, we
+ // need to force seek the read position to ensure the compactor can read
+ // the complete last snapshot because of the compactor will read the data
+ // before the compaction cursor mark delete position
+ cursor.seek(lastEntry.getPosition().getNext(), true);
callback.readEntriesComplete(entries, consumer);
});
}