You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:05 UTC

[pulsar] 19/38: Fix check backlogged cursors without consumer (#6766)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5b74f81555a5ff12c4889d0de023b538cebdc136
Author: zhaorongsheng <zh...@users.noreply.github.com>
AuthorDate: Thu Apr 23 02:42:22 2020 +0800

    Fix check backlogged cursors without consumer (#6766)
    
    * fix the backlogged cursors bug
    
    * add some test
    
    * change test case
    
    * change test case
    
    * change test case
    
    * change check logical
    
    * move backlogged cursor check after consumer add
    
    Co-authored-by: zhaorongsheng <zh...@bigo.sg>(cherry picked from commit 4d59f2b8a002c48676ca67bbb137ca0078f7fa8d)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   6 -
 .../mledger/ManagedLedgerFactoryConfig.java        |   5 -
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  15 ---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  80 ------------
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   1 -
 .../apache/pulsar/broker/service/PulsarStats.java  |   2 +-
 .../org/apache/pulsar/broker/service/Topic.java    |   6 +
 .../service/nonpersistent/NonPersistentTopic.java  |   5 +
 .../broker/service/persistent/PersistentTopic.java |  20 ++-
 .../pulsar/broker/service/PersistentTopicTest.java | 142 ++++++++++++++++++++-
 .../client/api/SimpleProducerConsumerTest.java     |   4 +-
 11 files changed, 172 insertions(+), 114 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 50439f5..8f52905 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -328,12 +328,6 @@ public interface ManagedLedger {
      */
     long getOffloadedSize();
 
-    /**
-     * Activate cursors those caught up backlog-threshold entries and deactivate slow cursors which are creating
-     * backlog.
-     */
-    void checkBackloggedCursors();
-
     void asyncTerminate(TerminateCallback callback, Object ctx);
 
     /**
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index b5d0a7c..4d1eb31 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -48,11 +48,6 @@ public class ManagedLedgerFactoryConfig {
     private long cacheEvictionTimeThresholdMillis = 1000;
 
     /**
-     * Threshould to consider a cursor as "backlogged"
-     */
-    private long thresholdBackloggedCursor = 1000;
-
-    /**
      * Whether we should make a copy of the entry payloads when inserting in cache
      */
     private boolean copyEntriesInCache = false;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bf7ad2b..12ca6cd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -234,8 +234,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             .newUpdater(ManagedLedgerImpl.class, "addOpCount");
     private volatile long addOpCount = 0;
 
-    private final long backloggedCursorThresholdEntries;
-
     // last read-operation's callback to check read-timeout on it.
     private volatile ReadEntryCallbackWrapper lastReadCallback = null;
     private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
@@ -271,7 +269,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         this.waitingCursors = Queues.newConcurrentLinkedQueue();
         this.uninitializedCursors = Maps.newHashMap();
         this.clock = config.getClock();
-        this.backloggedCursorThresholdEntries = factory.getConfig().getThresholdBackloggedCursor();
 
         // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
         this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
@@ -908,18 +905,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     @Override
-    public void checkBackloggedCursors() {
-        // activate caught up cursors
-        cursors.forEach(cursor -> {
-            if (cursor.getNumberOfEntries() < backloggedCursorThresholdEntries) {
-                cursor.setActive();
-            } else {
-                cursor.setInactive();
-            }
-        });
-    }
-
-    @Override
     public long getEstimatedBacklogSize() {
 
         PositionImpl pos = getMarkDeletePositionOfSlowestConsumer();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index c891a89..c2886cf 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2070,71 +2070,6 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testBacklogCursor() throws Exception {
-        int backloggedThreshold = 10;
-        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
-        factoryConf.setThresholdBackloggedCursor(backloggedThreshold);
-        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_backlog_ledger");
-
-        // Open Cursor also adds cursor into activeCursor-container
-        ManagedCursor cursor1 = ledger.openCursor("c1");
-        ManagedCursor cursor2 = ledger.openCursor("c2");
-
-        CountDownLatch latch = new CountDownLatch(backloggedThreshold);
-        for (int i = 0; i < backloggedThreshold + 1; i++) {
-            String content = "entry"; // 5 bytes
-            ByteBuf entry = getMessageWithMetadata(content.getBytes());
-            ledger.asyncAddEntry(entry, new AddEntryCallback() {
-                @Override
-                public void addComplete(Position position, Object ctx) {
-                    latch.countDown();
-                    entry.release();
-                }
-
-                @Override
-                public void addFailed(ManagedLedgerException exception, Object ctx) {
-                    latch.countDown();
-                    entry.release();
-                }
-
-            }, null);
-        }
-        latch.await();
-
-        // Verify: cursors are active as :haven't started deactivateBacklogCursor scan
-        assertTrue(cursor1.isActive());
-        assertTrue(cursor2.isActive());
-
-        // deactivate backlog cursors
-        ledger.checkBackloggedCursors();
-
-        // both cursors have to be inactive
-        assertFalse(cursor1.isActive());
-        assertFalse(cursor2.isActive());
-
-        // read entries so, cursor1 reaches maxBacklog threshold again to be active again
-        List<Entry> entries1 = cursor1.readEntries(50);
-        for (Entry entry : entries1) {
-            log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
-            entry.release();
-        }
-
-        // activate cursors which caught up maxbacklog threshold
-        ledger.checkBackloggedCursors();
-
-        // verify: cursor1 has consumed messages so, under maxBacklog threshold => active
-        assertTrue(cursor1.isActive());
-
-        // verify: cursor2 has not consumed messages so, above maxBacklog threshold => inactive
-        assertFalse(cursor2.isActive());
-
-        ledger.close();
-
-        factory.shutdown();
-    }
-
-    @Test
     public void testConcurrentOpenCursor() throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testConcurrentOpenCursor");
 
@@ -2271,21 +2206,6 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
     }
 
-    public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
-        MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
-                .setProducerName("prod-name").setSequenceId(0).build();
-        ByteBuf payload = Unpooled.wrappedBuffer(data, 0, data.length);
-
-        int msgMetadataSize = messageData.getSerializedSize();
-        int headersSize = 4 + msgMetadataSize;
-        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
-        ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
-        headers.writeInt(msgMetadataSize);
-        messageData.writeTo(outStream);
-        outStream.recycle();
-        return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
-    }
-
     @Test
     public void testConsumerSubscriptionInitializePosition() throws Exception{
         final int MAX_ENTRY_PER_LEDGER = 2;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index bb286ad..2f640e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -55,7 +55,6 @@ public class ManagedLedgerClientFactory implements Closeable {
         managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
         managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
         managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
-        managedLedgerFactoryConfig.setThresholdBackloggedCursor(conf.getManagedLedgerCursorBackloggedThreshold());
         managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
 
         this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index 678abf3..66c5b04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -138,7 +138,7 @@ public class PulsarStats implements Closeable {
                                 }
                                 // this task: helps to activate inactive-backlog-cursors which have caught up and
                                 // connected, also deactivate active-backlog-cursors which has backlog
-                                ((PersistentTopic) topic).getManagedLedger().checkBackloggedCursors();
+                                ((PersistentTopic) topic).checkBackloggedCursors();
                             }else if (topic instanceof NonPersistentTopic) {
                                 tempNonPersistentTopics.add((NonPersistentTopic) topic);
                             } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 890e02a..aa147d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -129,6 +129,12 @@ public interface Topic {
 
     void checkInactiveSubscriptions();
 
+    /**
+     * Activate cursors those caught up backlog-threshold entries and deactivate slow cursors which are creating
+     * backlog.
+     */
+    void checkBackloggedCursors();
+
     void checkMessageExpiry();
 
     void checkMessageDeduplicationInfo();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 2792234..332abfb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -865,6 +865,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
     }
 
     @Override
+    public void checkBackloggedCursors() {
+        // no-op
+    }
+
+    @Override
     public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 18d371b..b7470fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,6 +157,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
     private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
     public volatile long delayedDeliveryTickTimeMillis = 1000;
+    private final long backloggedCursorThresholdEntries;
     public volatile boolean delayedDeliveryEnabled = false;
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
@@ -215,6 +216,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         USAGE_COUNT_UPDATER.set(this, 0);
         this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
         this.delayedDeliveryTickTimeMillis = brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
+        this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
 
         initializeDispatchRateLimiterIfNeeded(Optional.empty());
 
@@ -270,6 +272,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
         this.replicators = new ConcurrentOpenHashMap<>(16, 1);
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
+        this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
     }
 
     private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
@@ -589,12 +592,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         subscriptionFuture.thenAccept(subscription -> {
             try {
-                ledger.checkBackloggedCursors();
-
                 Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
                                                  maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
                 subscription.addConsumer(consumer);
 
+                checkBackloggedCursors();
+
                 if (!cnx.isActive()) {
                     consumer.close();
                     if (log.isDebugEnabled()) {
@@ -1692,6 +1695,19 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         });
     }
 
+    @Override
+    public void checkBackloggedCursors() {
+        // activate caught up cursors which include consumers
+        subscriptions.forEach((subName, subscription) -> {
+            if (!subscription.getConsumers().isEmpty()
+                && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
+                subscription.getCursor().setActive();
+            } else {
+                subscription.getCursor().setInactive();
+            }
+        });
+    }
+
     /**
      * Check whether the topic should be retained (based on time), even tough there are no producers/consumers and it's
      * marked as inactive.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index bc92ee4..87a8d98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.mockito.Mockito.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.matches;
@@ -40,12 +40,14 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -68,13 +70,17 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -93,6 +99,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
@@ -100,8 +107,11 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
@@ -126,7 +136,7 @@ import io.netty.buffer.Unpooled;
 
 /**
  */
-public class PersistentTopicTest {
+public class PersistentTopicTest extends MockedBookKeeperTestCase {
     private PulsarService pulsar;
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
@@ -1505,4 +1515,132 @@ public class PersistentTopicTest {
         topic.checkCompaction();
         verify(compactor, times(0)).compact(anyString());
     }
+
+    @Test
+    public void testBacklogCursor() throws Exception {
+        int backloggedThreshold = 10;
+        pulsar.getConfiguration().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
+
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_backlog_ledger");
+        PersistentTopic topic = new PersistentTopic(successTopicName, ledger, brokerService);
+
+        // STEP1: prepare cursors
+        // Open cursor1, add it into activeCursor-container and add it into subscription consumer list
+        ManagedCursor cursor1 = ledger.openCursor("c1");
+        PersistentSubscription sub1 = new PersistentSubscription(topic, "sub-1", cursor1, false);
+        Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
+            50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
+        topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1);
+        sub1.addConsumer(consumer1);
+        // Open cursor2, add it into activeCursor-container and add it into subscription consumer list
+        ManagedCursor cursor2 = ledger.openCursor("c2");
+        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursor2, false);
+        Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
+            50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
+        topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2);
+        sub2.addConsumer(consumer2);
+        // Open cursor3, add it into activeCursor-container and do not add it into subscription consumer list
+        ManagedCursor cursor3 = ledger.openCursor("c3");
+        PersistentSubscription sub3 = new PersistentSubscription(topic, "sub-3", cursor3, false);
+        Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */,
+            50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
+        topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3);
+
+        // Case1: cursors are active as haven't started deactivateBacklogCursor scan
+        assertTrue(cursor1.isActive());
+        assertTrue(cursor2.isActive());
+        assertTrue(cursor3.isActive());
+
+        // deactivate cursor which consumer list is empty
+        topic.checkBackloggedCursors();
+
+        // Case2: cursor3 change to be inactive as it does not include consumer
+        assertTrue(cursor1.isActive());
+        assertTrue(cursor2.isActive());
+        assertFalse(cursor3.isActive());
+
+        // Write messages to ledger
+        CountDownLatch latch = new CountDownLatch(backloggedThreshold);
+        for (int i = 0; i < backloggedThreshold + 1; i++) {
+            String content = "entry"; // 5 bytes
+            ByteBuf entry = getMessageWithMetadata(content.getBytes());
+            ledger.asyncAddEntry(entry, new AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, Object ctx) {
+                    latch.countDown();
+                    entry.release();
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                    entry.release();
+                }
+
+            }, null);
+        }
+        latch.await();
+
+        assertTrue(cursor1.isActive());
+        assertTrue(cursor2.isActive());
+        assertFalse(cursor3.isActive());
+
+        // deactivate backlog cursors
+        topic.checkBackloggedCursors();
+
+        // Case3: cursor1 and cursor2 change to be inactive because of the backlog
+        assertFalse(cursor1.isActive());
+        assertFalse(cursor2.isActive());
+        assertFalse(cursor3.isActive());
+
+        // read entries so, cursor1 reaches maxBacklog threshold again to be active again
+        List<Entry> entries1 = cursor1.readEntries(50);
+        for (Entry entry : entries1) {
+            log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
+            entry.release();
+        }
+        List<Entry> entries3 = cursor3.readEntries(50);
+        for (Entry entry : entries3) {
+            log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
+            entry.release();
+        }
+
+        // activate cursors which caught up maxbacklog threshold
+        topic.checkBackloggedCursors();
+
+        // Case4:
+        // cursor1 has consumed messages so, under maxBacklog threshold => active
+        assertTrue(cursor1.isActive());
+        // cursor2 has not consumed messages so, above maxBacklog threshold => inactive
+        assertFalse(cursor2.isActive());
+        // cursor3 has not consumer so do not change to active
+        assertFalse(cursor3.isActive());
+
+        // add consumer to sub3 and read entries
+        sub3.addConsumer(consumer3);
+        entries3 = cursor3.readEntries(50);
+        for (Entry entry : entries3) {
+            log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
+            entry.release();
+        }
+
+        topic.checkBackloggedCursors();
+        // Case5: cursor3 has consumer so change to active
+        assertTrue(cursor3.isActive());
+    }
+
+    private ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
+        MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
+            .setProducerName("prod-name").setSequenceId(0).build();
+        ByteBuf payload = Unpooled.wrappedBuffer(data, 0, data.length);
+
+        int msgMetadataSize = messageData.getSerializedSize();
+        int headersSize = 4 + msgMetadataSize;
+        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
+        ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
+        headers.writeInt(msgMetadataSize);
+        messageData.writeTo(outStream);
+        outStream.recycle();
+        return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ad039e1..51a7eef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -1040,7 +1040,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         Thread.sleep(maxMessageCacheRetentionTimeMillis);
 
         // 4. deactivate subscriber which has built the backlog
-        ledger.checkBackloggedCursors();
+        topicRef.checkBackloggedCursors();
         Thread.sleep(100);
 
         // 5. verify: active subscribers
@@ -1055,7 +1055,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             subscriber2.acknowledge(msg);
         }
 
-        ledger.checkBackloggedCursors();
+        topicRef.checkBackloggedCursors();
 
         activeSubscriber.clear();
         ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));