You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/26 12:16:28 UTC
[pulsar] 01/04: [Broker] Fix the backlog issue with
--precise-backlog=true (#10966)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2f9b6721c579300e6b1db3f0c062e8dbe38d1925
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jun 22 07:31:53 2021 +0800
[Broker] Fix the backlog issue with --precise-backlog=true (#10966)
fix backlog issuse with --precise-backlog=true.
Now when `managedLedger` create a new `ledger` complete. if `markDelete` is the `previousLedger` LAC it will delete the previousLedger from `managedLedger` . when get backlog we will use range.close to get `getNumberOfEntries` -1, if previousLedger not exist will get the wrong number.
![image](https://user-images.githubusercontent.com/39078850/122502847-fce47800-d029-11eb-81b3-abc9e595d93e.png)
use range.openClose() to `getBacklog`.
Add the tests for it
(cherry picked from commit e3a97ee4caa80522663f5900b07ae331927de68e)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +--
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 30 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f732a75..8f671c5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -858,13 +858,13 @@ public class ManagedCursorImpl implements ManagedCursor {
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
- return getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
+ return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}
long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
- backlog = getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
+ backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}
return backlog;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4f6e1a0..cfa7f0b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -45,6 +45,8 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
@@ -79,6 +81,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -3421,5 +3424,32 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
dirtyFactory.shutdown();
}
+ @Test
+ public void testCursorGetBacklog() throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("get-backlog", managedLedgerConfig);
+ ManagedCursor managedCursor = ledger.openCursor("test");
+
+ PositionImpl position = (PositionImpl) ledger.addEntry("test".getBytes(Encoding));
+ ledger.addEntry("test".getBytes(Encoding));
+ PositionImpl position1 = (PositionImpl) ledger.addEntry("test".getBytes(Encoding));
+ ledger.addEntry("test".getBytes(Encoding));
+
+ Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(true), 4);
+ Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
+ Field field = ManagedLedgerImpl.class.getDeclaredField("ledgers");
+ field.setAccessible(true);
+
+ ((ConcurrentSkipListMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>) field.get(ledger)).remove(position.ledgerId);
+ field = ManagedCursorImpl.class.getDeclaredField("markDeletePosition");
+ field.setAccessible(true);
+ field.set(managedCursor, PositionImpl.get(position1.ledgerId, -1));
+
+
+ Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(true), 2);
+ Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
+ }
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}