You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/11 01:22:12 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #7236: Fixed readers backlog stats after data is skipped

codelipenghui commented on a change in pull request #7236:
URL: https://github.com/apache/pulsar/pull/7236#discussion_r438490315



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2125,6 +2128,37 @@ public void operationFailed(MetaStoreException e) {
         }
     }
 
+    /**
+     * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
+     * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
+     * entries and the stats are reported correctly.
+     */
+    private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        long firstNonDeletedLedger = ledgers
+                .ceilingKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId() + 1);

Review comment:
       ```suggestion
           long firstNonDeletedLedger = ledgers
                   .higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
   ```

##########
File path: managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
##########
@@ -678,6 +681,48 @@ public void testGetSlowestConsumer() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testBacklogStatsWhenDroppingData() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);
+
+        c1.markDelete(positions.get(4));
+        assertEquals(c1.getNumberOfEntries(), 5);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);
+
+        // Since the durable cursor has moved, the data will be trimmed
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);

Review comment:
       Why durable cursor has 5 backlogs, non-durable cursor has 6 backlogs? Shouldn't they be the same?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2125,6 +2128,37 @@ public void operationFailed(MetaStoreException e) {
         }
     }
 
+    /**
+     * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
+     * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
+     * entries and the stats are reported correctly.
+     */
+    private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        long firstNonDeletedLedger = ledgers
+                .ceilingKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId() + 1);
+        PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
+
+        cursors.forEach(cursor -> {
+            if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {

Review comment:
       Shall we need to add check for non-durable cursor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org