You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/19 18:35:23 UTC

[pulsar] branch master updated: [improve][broker] Create the cursor ledger lazily to improve the subscribe performance (#16389)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cccf2520f6b [improve][broker] Create the cursor ledger lazily to improve the subscribe performance (#16389)
cccf2520f6b is described below

commit cccf2520f6b6242388f6e4776b9649e4230a589a
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Jul 20 02:35:15 2022 +0800

    [improve][broker] Create the cursor ledger lazily to improve the subscribe performance (#16389)
    
    * [feature][broker] Support create ledger cursor lazily
    
    ### Motivation
    
    Provide a way to create the ledger cursor lazily. It can reduce the subscription creation time-consuming.
    In the case of millions of topics, consumers can complete subscriptions more quickly.
    After enabling this feature, the cursor ledger will create will be created during the message acknowledgment.
    If there are no message acknowledgments happened on a subscription, the cursor ledger will not be created.
    
    ### Modification
    
    Added new configuration to enable the cursor ledger lazy creation
    
    ```
    # Whether to create the cursor ledger lazily when recovering a managed cursor backing a durable subscription.
    # It can reduce the subscription creation time-consuming. In the case of millions of topics, consumers can complete
    # subscriptions more quickly.
    #
    # After enabling this option, the cursor ledger will create will be created during the message acknowledgment.
    # If there are no message acknowledgments happened on a subscription, the cursor ledger will not be created.
    
    # Default is false.
    managedLedgerLazyCursorLedgerCreationEnabled=false
    ```
    
    * add doc
    
    * Remove configuraiton
    
    * Address comment.
    
    * address comments
    
    * address comments
    
    * Fix test
    
    * Fix test
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 25 +++---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 88 ++++++++++++++++++++--
 .../mledger/impl/ManagedLedgerErrorsTest.java      |  9 ++-
 .../mledger/impl/ManagedLedgerFactoryTest.java     |  6 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  4 +-
 .../mledger/impl/NonDurableCursorTest.java         |  6 +-
 .../broker/admin/v3/AdminApiTransactionTest.java   |  8 +-
 .../broker/stats/ManagedCursorMetricsTest.java     | 13 +++-
 .../client/impl/BrokerClientIntegrationTest.java   | 22 ++++++
 9 files changed, 145 insertions(+), 36 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2e50c712614..c52cd6db933 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -646,19 +646,18 @@ public class ManagedCursorImpl implements ManagedCursor {
             log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
                     ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
         }
-
-        createNewMetadataLedger(new VoidCallback() {
-            @Override
-            public void operationComplete() {
-                STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
-                callback.operationComplete();
-            }
-
-            @Override
-            public void operationFailed(ManagedLedgerException exception) {
-                callback.operationFailed(exception);
-            }
-        });
+        persistPositionMetaStore(cursorLedger != null ? cursorLedger.getId() : -1L, position, properties,
+                new MetaStoreCallback<>() {
+                    @Override
+                    public void operationComplete(Void result, Stat stat) {
+                        STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
+                        callback.operationComplete();
+                    }
+                    @Override
+                    public void operationFailed(MetaStoreException e) {
+                        callback.operationFailed(e);
+                    }
+        }, false);
     }
 
     @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 9768389455c..b5d677f8b04 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -517,9 +517,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ManagedCursor cursor = ledger.openCursor("c1");
         ledger.addEntry("dummy-entry-1".getBytes(Encoding));
         List<Entry> entries = cursor.readEntries(100);
-
-        stopBookKeeper();
         assertEquals(entries.size(), 1);
+        cursor.markDelete(entries.get(0).getPosition());
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        entries = cursor.readEntries(100);
+        stopBookKeeper();
 
         // Mark-delete should succeed if BK is down
         cursor.markDelete(entries.get(0).getPosition());
@@ -1338,9 +1340,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
 
         bkc.failAfter(4, BKException.Code.MetadataVersionException);
-
+        ledger = factory2.open("my_test_ledger");
+        ManagedCursor cursor = ledger.openCursor("c1");
+        Position position = ledger.addEntry("test".getBytes());
         try {
-            ledger = factory2.open("my_test_ledger");
+            cursor.markDelete(position);
             fail("should have failed");
         } catch (ManagedLedgerException e) {
             // ok
@@ -3262,6 +3266,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
         deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
 
+        ManagedCursor finalCursor = cursor;
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(finalCursor.getMarkDeletedPosition(), positions[4]);
+        });
+
         cursor.close();
         ledger.close();
         ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig);
@@ -3533,10 +3542,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
             positions.add(ledger1.addEntry(new byte[1024]));
         }
 
+        c1.markDelete(positions.get(0));
+        Thread.sleep(3000);
         // Simulate BK write error
         bkc.failNow(BKException.Code.NotEnoughBookiesException);
         metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException(""));
-
         try {
             c1.markDelete(positions.get(positions.size() - 1));
             fail("should have failed");
@@ -3694,6 +3704,10 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
         cursor.delete(positions.get(0));
 
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cursor.getMarkDeletedPosition(), positions.get(0));
+        });
+
         long initialLedgerId = cursor.getCursorLedger();
 
         metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
@@ -3702,6 +3716,10 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
             cursor.delete(positions.get(i));
         }
 
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cursor.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+        });
+
         assertEquals(cursor.getCursorLedger(), initialLedgerId);
 
         // After the session gets reestablished, the rollover should restart
@@ -3812,5 +3830,65 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
                 Arrays.asList(1, 1, 1));
     }
 
+    @Test
+    public void testLazyCursorLedgerCreation() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
+                .open("testLazyCursorLedgerCreation", managedLedgerConfig);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");
+        assertEquals(cursor.getState(), "NoLedger");
+        assertEquals(cursor.getMarkDeletedPosition(), ledger.getLastPosition());
+        Position lastPosition = null;
+        for (int i = 0; i < 10; i++) {
+            lastPosition = ledger.addEntry("test".getBytes(Encoding));
+        }
+        cursor.markDelete(lastPosition);
+        Position finalLastPosition = lastPosition;
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cursor.getState(), "Open");
+            assertEquals(cursor.getMarkDeletedPosition(), finalLastPosition);
+            assertEquals(cursor.getPersistentMarkDeletedPosition(), finalLastPosition);
+        });
+
+        // Make sure the recovered mark delete position is correct.
+        cursor.close();
+        ledger.close();
+        ledger = (ManagedLedgerImpl) factory
+                .open("testLazyCursorLedgerCreation", managedLedgerConfig);
+        ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
+        assertEquals(cursor1.getState(), "NoLedger");
+        assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition);
+
+        // Verify the recovered cursor can work with new mark delete.
+        lastPosition = null;
+        for (int i = 0; i < 10; i++) {
+            lastPosition = ledger.addEntry("test".getBytes(Encoding));
+        }
+        cursor1.markDelete(lastPosition);
+        Position finalLastPosition2 = lastPosition;
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cursor1.getState(), "Open");
+            assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition2);
+            assertEquals(cursor1.getPersistentMarkDeletedPosition(), finalLastPosition2);
+        });
+        cursor1.close();
+        ledger.close();
+    }
+
+    @Test
+    public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testLazyCursorLedgerCreation", managedLedgerConfig);
+        Position p1 = ledger.addEntry("test".getBytes());
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");
+        assertEquals(cursor.getMarkDeletedPosition(), p1);
+        ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ledger = (ManagedLedgerImpl) factory2.open("testLazyCursorLedgerCreation", managedLedgerConfig);
+        assertNotNull(ledger.getCursors().get("test"));
+        ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
+        assertEquals(cursor1.getMarkDeletedPosition(), p1);
+        factory2.shutdown();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index dd0ffb980cf..8fd2fd0003d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -75,7 +75,7 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         ledger.deleteCursor("c1");
 
         assertFalse(metadataStore.exists("/managed-ledgers/my_test_ledger/c1").join());
-        assertEquals(bkc.getLedgers().size(), 2);
+        assertEquals(bkc.getLedgers().size(), 1);
     }
 
     @Test
@@ -491,7 +491,8 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         ManagedLedger ledger = factory.open("my_test_ledger");
         ManagedCursor cursor = ledger.openCursor("my-cursor");
         Position position = ledger.addEntry("entry".getBytes());
-
+        Position position1 = ledger.addEntry("entry".getBytes());
+        cursor.markDelete(position);
         bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
         metadataStore.failConditional(new MetadataStoreException("error"), (op, path) ->
                 path.equals("/managed-ledgers/my_test_ledger/my-cursor")
@@ -499,7 +500,7 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         );
 
         try {
-            cursor.markDelete(position);
+            cursor.markDelete(position1);
             fail("should fail");
         } catch (ManagedLedgerException e) {
             // ok
@@ -509,7 +510,7 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         Thread.sleep(100);
 
         // Next markDelete should succeed
-        cursor.markDelete(position);
+        cursor.markDelete(position1);
     }
 
     @Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
index 8d0c9172f54..f307bfa72a5 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
@@ -52,9 +52,9 @@ public class ManagedLedgerFactoryTest extends MockedBookKeeperTestCase {
         assertEquals(info.ledgers.size(), 4);
 
         assertEquals(info.ledgers.get(0).ledgerId, 3);
-        assertEquals(info.ledgers.get(1).ledgerId, 5);
-        assertEquals(info.ledgers.get(2).ledgerId, 6);
-        assertEquals(info.ledgers.get(3).ledgerId, 7);
+        assertEquals(info.ledgers.get(1).ledgerId, 4);
+        assertEquals(info.ledgers.get(2).ledgerId, 5);
+        assertEquals(info.ledgers.get(3).ledgerId, 6);
 
         assertEquals(info.cursors.size(), 1);
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 02696ff504d..0baaa47fe08 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1632,7 +1632,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.openCursor("c1");
 
         ledger.addEntry("data".getBytes(Encoding));
-        assertEquals(bkc.getLedgers().size(), 2);
+        assertEquals(bkc.getLedgers().size(), 1);
 
         ledger.delete();
         assertEquals(bkc.getLedgers().size(), 0);
@@ -1644,7 +1644,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.openCursor("c1");
 
         ledger.addEntry("data".getBytes(Encoding));
-        assertEquals(bkc.getLedgers().size(), 2);
+        assertEquals(bkc.getLedgers().size(), 1);
 
         final CountDownLatch latch = new CountDownLatch(1);
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 4b3f0ea61ff..29ba99687ba 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -282,13 +282,15 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         ManagedLedger ledger = factory.open("my_test_ledger");
         ManagedCursor cursor = ledger.openCursor("c1");
         ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
         List<Entry> entries = cursor.readEntries(100);
+        assertEquals(entries.size(), 2);
+        cursor.markDelete(entries.get(0).getPosition());
 
         stopBookKeeper();
-        assertEquals(entries.size(), 1);
 
         // Mark-delete should succeed if BK is down
-        cursor.markDelete(entries.get(0).getPosition());
+        cursor.markDelete(entries.get(1).getPosition());
 
         entries.forEach(Entry::release);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 0e9bbc10251..4afb2030eca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -712,9 +712,11 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
                 messageId.getLedgerId(), messageId.getEntryId(), null);
          assertEquals(result.state, PositionInPendingAckStats.State.PendingAck);
          transaction.commit().get();
-         result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
-                 messageId.getLedgerId(), messageId.getEntryId(), null);
-         assertEquals(result.state, PositionInPendingAckStats.State.MarkDelete);
+         Awaitility.await().untilAsserted(() -> {
+             PositionInPendingAckStats r = admin.transactions().getPositionStatsInPendingAck(topic, subName,
+                     messageId.getLedgerId(), messageId.getEntryId(), null);
+             assertEquals(r.state, PositionInPendingAckStats.State.MarkDelete);
+         });
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index a12392c435b..8ab3ee30693 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -85,11 +85,16 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
                 .isAckReceiptEnabled(true)
                 .subscribe();
 
+
         @Cleanup
         Producer<byte[]> producer = this.pulsarClient.newProducer()
                 .topic(topicName)
                 .create();
 
+        producer.send("trigger-cursor-ledger-creation".getBytes());
+        // Trigger the cursor ledger creation
+        consumer.acknowledge(consumer.receive().getMessageId());
+
         for(PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
             ledgerHandle.close();
         }
@@ -160,12 +165,12 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
         }
         metricsList = metrics.generate();
         Assert.assertEquals(metricsList.size(), 2);
-        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
-        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
         Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
 
-        Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
-        Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
+        Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
+        Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
         Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 630e27d0361..6d6bb9e3f1f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -95,11 +95,13 @@ import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1053,4 +1055,24 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testManagedLedgerLazyCursorLedgerCreation() throws Exception {
+        String topic = "persistent://my-property/my-ns/testManagedLedgerLazyCursorLedgerCreationEnabled";
+        String sub = "my-subscriber-name";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(sub).subscribe();
+        PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+        assertEquals(stats.cursors.get(sub).state, "NoLedger");
+        producer.send("test".getBytes(UTF_8));
+        consumer.acknowledgeCumulative(consumer.receive());
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats1 = admin.topics().getInternalStats(topic);
+            assertEquals(stats1.cursors.get(sub).state, "Open");
+            assertEquals(stats1.lastConfirmedEntry, stats1.cursors.get(sub).markDeletePosition);
+        });
+    }
+
 }