You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/11 19:19:14 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #7236: Fixed readers backlog stats after data is skipped

merlimat commented on a change in pull request #7236:
URL: https://github.com/apache/pulsar/pull/7236#discussion_r439014421



##########
File path: managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
##########
@@ -678,6 +681,48 @@ public void testGetSlowestConsumer() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testBacklogStatsWhenDroppingData() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);
+
+        c1.markDelete(positions.get(4));
+        assertEquals(c1.getNumberOfEntries(), 5);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);
+
+        // Since the durable cursor has moved, the data will be trimmed
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);

Review comment:
       That's because the other cursor is positioned at the end of the 5th ledger, but not on the 6th. That means that only 4 ledgers are deleted. That cursor would move forward on the next mark-delete.
   
   When advancing the non-durable cursor, we advance to the first available ledger and that might be before the durable cursor mark-delete position, but that's ok.
   

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2125,6 +2128,37 @@ public void operationFailed(MetaStoreException e) {
         }
     }
 
+    /**
+     * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
+     * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
+     * entries and the stats are reported correctly.
+     */
+    private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        long firstNonDeletedLedger = ledgers
+                .ceilingKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId() + 1);
+        PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
+
+        cursors.forEach(cursor -> {
+            if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {

Review comment:
       No need for that, a durable cursor would have been already moved ahead, otherwise we wouldn't be trimming that ledger. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org