You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/06/22 10:01:51 UTC
[pulsar] branch branch-2.8 updated: [Broker] Fix the backlog issue
with --precise-backlog=true (#10966)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new b61832d [Broker] Fix the backlog issue with --precise-backlog=true (#10966)
b61832d is described below
commit b61832d4e6bafddd2b2d61fb29baa98bc46cbe19
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jun 22 07:31:53 2021 +0800
[Broker] Fix the backlog issue with --precise-backlog=true (#10966)
## Motivation
fix backlog issuse with --precise-backlog=true.
Now when `managedLedger` create a new `ledger` complete. if `markDelete` is the `previousLedger` LAC it will delete the previousLedger from `managedLedger` . when get backlog we will use range.close to get `getNumberOfEntries` -1, if previousLedger not exist will get the wrong number.
![image](https://user-images.githubusercontent.com/39078850/122502847-fce47800-d029-11eb-81b3-abc9e595d93e.png)
## implement
use range.openClose() to `getBacklog`.
### Verifying this change
Add the tests for it
(cherry picked from commit e3a97ee4caa80522663f5900b07ae331927de68e)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +--
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 31 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1c120a5..bc19373 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -883,13 +883,13 @@ public class ManagedCursorImpl implements ManagedCursor {
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
- return getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
+ return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}
long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
- backlog = getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
+ backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}
return backlog;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 03327ff..2a648ea 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -46,6 +46,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
@@ -82,6 +83,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -3526,5 +3528,34 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
ledger.close();
}
+
+ @Test
+ public void testCursorGetBacklog() throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("get-backlog", managedLedgerConfig);
+ ManagedCursor managedCursor = ledger.openCursor("test");
+
+ Position position = ledger.addEntry("test".getBytes(Encoding));
+ ledger.addEntry("test".getBytes(Encoding));
+ Position position1 = ledger.addEntry("test".getBytes(Encoding));
+ ledger.addEntry("test".getBytes(Encoding));
+
+ Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(true), 4);
+ Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
+ Field field = ManagedLedgerImpl.class.getDeclaredField("ledgers");
+ field.setAccessible(true);
+
+ ((ConcurrentSkipListMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>) field.get(ledger)).remove(position.getLedgerId());
+ field = ManagedCursorImpl.class.getDeclaredField("markDeletePosition");
+ field.setAccessible(true);
+ field.set(managedCursor, PositionImpl.get(position1.getLedgerId(), -1));
+
+
+ Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(true), 2);
+ Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}