You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/01 09:56:01 UTC
[pulsar] 02/02: [Broker] Fix NonDurableCursorImpl initialPosition
by startCursorPosition greater than lastConfirmedEntry problem. (#10095)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 925fb1da99a0a9daea894b9c2b1d722d54dabcab
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Apr 1 16:18:00 2021 +0800
[Broker] Fix NonDurableCursorImpl initialPosition by startCursorPosition greater than lastConfirmedEntry problem. (#10095)
## Motivation
In order to fix reader set ```startMessageId``` greater than ```lastConfirmedEntry``` probelm.
When the reader set ```startMessageId``` greater than ```lastConfirmedEntry```, generate ```NonDurableCursorImpl``` will carry a ```MarkdeletePosition``` greater than lastConfirmedEntry.
When use ```getNumberOfEntriesInBacklog``` will throw
![image](https://user-images.githubusercontent.com/39078850/113100735-5f4d8980-922e-11eb-8758-7d3cfb1435f3.png)
## implement
When ```startMessageId``` greater than ```lastConfirmedEntry``` use ```lastConfirmedEntry``` as ```markdeletePosition```
### Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
(cherry picked from commit 7d84e72a08e3baca8cb5fdbfc4bbfc3a1ae3ec92)
---
.../org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java | 2 +-
.../org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java | 7 +++++++
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 6d1bdfb..167bcec 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -40,7 +40,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
// store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
// both ledgerId and entryId to be Long.max()
- if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
+ if (startCursorPosition == null || startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) {
// Start from last entry
switch (initialPosition) {
case Latest:
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 7a15035..3d806c9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -323,6 +323,13 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
}
@Test(timeOut = 20000)
+ void markDeleteGreaterThanLastConfirmedEntry() throws Exception {
+ ManagedLedger ml1 = factory.open("my_test_ledger");
+ ManagedCursor mc1 = ml1.newNonDurableCursor(PositionImpl.get(Long.MAX_VALUE - 1, Long.MAX_VALUE - 1));
+ assertEquals(mc1.getMarkDeletedPosition(), ml1.getLastConfirmedEntry());
+ }
+
+ @Test(timeOut = 20000)
void testResetCursor() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(10));