You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/26 08:05:30 UTC

[GitHub] [pulsar] lhotari commented on a change in pull request #9732: Fix marking individual deletes as dirty

lhotari commented on a change in pull request #9732:
URL: https://github.com/apache/pulsar/pull/9732#discussion_r583451264



##########
File path: managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
##########
@@ -3401,5 +3401,61 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
         factory2.shutdown();
     }
 
+    @Test
+    public void testFlushCursorAfterIndividualDeleteInactivity() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setThrottleMarkDelete(1.0);
+
+        ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
+        factoryConfig.setCursorPositionFlushSeconds(1);
+        ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), factoryConfig);
+        ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config);
+        ManagedCursor c1 = ledger1.openCursor("c");
+        List<Position> positions = new ArrayList<Position>();
+
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        CountDownLatch latch = new CountDownLatch(positions.size());
+
+        positions.forEach(p -> c1.asyncDelete(p, new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object ctx) {
+                throw new RuntimeException(exception);
+            }
+        }, null));
+
+        latch.await();
+
+        assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+
+        // reopen the cursor and we should see entries not be flushed
+        ManagedLedgerFactory dirtyFactory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+        ManagedLedger ledgerDirty = dirtyFactory.open("testFlushCursorAfterInactivity", config);
+        ManagedCursor dirtyCursor = ledgerDirty.openCursor("c");
+
+        assertNotEquals(dirtyCursor.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+
+        // Give chance to the flush to be automatically triggered.
+        Thread.sleep(3000);

Review comment:
       Using `Thread.sleep` in test could lead to flaky tests. Is there a possibility to use Awaitility to poll for some condition or assertion? Similar logic with retrying using Awaitility:
   ```
           Awaitility.await()
                   // Give chance to the flush to be automatically triggered.
                   .pollDelay(Duration.ofMillis(3000))
                   .untilAsserted(() -> {
                       // Abruptly re-open the managed ledger without graceful close
                       ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
                       try {
                           ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
                           ManagedCursor c2 = ledger2.openCursor("c");
   
                           assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
                       } finally {
                           factory2.shutdown();
                       }
                   });
   ```
   By default, [Awaitility will retry up to 10 seconds](https://github.com/awaitility/awaitility/wiki/Usage#defaults). 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org