You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/30 01:41:43 UTC

[pulsar] branch master updated: PIP-187: Add API to analyze a subscription backlog and provide a accurate value (#16545)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1af9695ddf5 PIP-187: Add API to analyze a subscription backlog and provide a accurate value (#16545)
1af9695ddf5 is described below

commit 1af9695ddf54927fcbc0c3db5ee2647fb2d820e3
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sat Jul 30 03:41:35 2022 +0200

    PIP-187: Add API to analyze a subscription backlog and provide a accurate value (#16545)
---
 conf/broker.conf                                   |   6 +
 .../apache/bookkeeper/mledger/AsyncCallbacks.java  |   6 +
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  18 ++
 .../org/apache/bookkeeper/mledger/ScanOutcome.java |  37 ++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  25 +++
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     |   1 -
 .../org/apache/bookkeeper/mledger/impl/OpScan.java | 137 +++++++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 122 +++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  10 ++
 .../broker/admin/impl/PersistentTopicsBase.java    | 101 +++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  51 +++++-
 .../broker/service/AbstractBaseDispatcher.java     |  73 ++------
 .../broker/service/AnalyzeBacklogResult.java       |  46 +++++
 .../pulsar/broker/service/EntryFilterSupport.java  |  85 +++++++++
 .../apache/pulsar/broker/service/Subscription.java |   3 +
 .../nonpersistent/NonPersistentSubscription.java   |  10 ++
 .../service/persistent/PersistentSubscription.java | 114 ++++++++++++
 .../admin/AnalyzeBacklogSubscriptionTest.java      | 194 +++++++++++++++++++++
 .../broker/admin/CreateSubscriptionTest.java       |   4 +
 .../broker/service/plugin/EntryFilterTest.java     |   3 +-
 .../broker/service/plugin/FilterEntryTest.java     |  58 +++++-
 .../org/apache/pulsar/client/admin/Topics.java     |  43 +++++
 .../stats/AnalyzeSubscriptionBacklogResult.java    |  42 +++++
 .../pulsar/client/admin/internal/BaseResource.java |   9 +
 .../pulsar/client/admin/internal/TopicsImpl.java   |  42 +++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   4 +
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  29 +++
 27 files changed, 1205 insertions(+), 68 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 41809f87116..7a0e32c95b3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -235,6 +235,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxEntries=10000
+
 # Set the default behavior for message deduplication in the broker
 # This can be overridden per-namespace. If enabled, broker will reject
 # messages that were already stored in the topic
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index e5fd0ddd1dd..395da52b2af 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -124,6 +124,12 @@ public interface AsyncCallbacks {
         void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx);
     }
 
+    interface ScanCallback {
+        void scanComplete(Position position, ScanOutcome scanOutcome, Object ctx);
+
+        void scanFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx);
+    }
+
     interface ResetCursorCallback {
         void resetComplete(Object ctx);
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 46ca0f14003..3f6852e4085 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -22,6 +22,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Range;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
@@ -502,6 +503,23 @@ public interface ManagedCursor {
      */
     Position findNewestMatching(Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException;
 
+    /**
+     * Scan the cursor from the current position up to the end.
+     * Please note that this is an expensive operation
+     * @param startingPosition the position to start from, if not provided the scan will start from
+     *                         the lastDeleteMarkPosition
+     * @param condition a condition to continue the scan, the condition can access the entry
+     * @param batchSize number of entries to process at each read
+     * @param maxEntries maximum number of entries to scan
+     * @param timeOutMs maximum time to spend on this operation
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    default CompletableFuture<ScanOutcome> scan(Optional<Position> startingPosition,
+                                                Predicate<Entry> condition,
+                                                int batchSize, long maxEntries, long timeOutMs) {
+        return CompletableFuture.failedFuture(new UnsupportedOperationException());
+    }
 
     /**
      * Find the newest entry that matches the given predicate.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ScanOutcome.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ScanOutcome.java
new file mode 100644
index 00000000000..c7a4cff474c
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ScanOutcome.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+/**
+ * Outcome of a Scan operation.
+ */
+public enum ScanOutcome {
+    /**
+     * The scan run fully.
+     */
+    COMPLETED,
+    /**
+     * The scan was aborted by the system (timed out or too many entries...).
+     */
+    ABORTED,
+    /**
+     * The provided user condition aborted the scan.
+     */
+    USER_INTERRUPTED
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 83ee356e5c3..4c688b2d9be 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -83,6 +84,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedE
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -1037,6 +1039,29 @@ public class ManagedCursorImpl implements ManagedCursor {
         return findNewestMatching(FindPositionConstraint.SearchActiveEntries, condition);
     }
 
+    @Override
+    public CompletableFuture<ScanOutcome> scan(Optional<Position> position,
+                                               Predicate<Entry> condition,
+                                               int batchSize, long maxEntries, long timeOutMs) {
+        PositionImpl startPosition = (PositionImpl) position.orElseGet(
+                () -> ledger.getNextValidPosition(markDeletePosition));
+        CompletableFuture<ScanOutcome> future = new CompletableFuture<>();
+        OpScan op = new OpScan(this, batchSize, startPosition, condition, new ScanCallback() {
+            @Override
+            public void scanComplete(Position position, ScanOutcome scanOutcome, Object ctx) {
+                future.complete(scanOutcome);
+            }
+
+            @Override
+            public void scanFailed(ManagedLedgerException exception,
+                                   Optional<Position> failedReadPosition, Object ctx) {
+                future.completeExceptionally(exception);
+            }
+        }, null, maxEntries, timeOutMs);
+        op.find();
+        return future;
+    }
+
     @Override
     public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition)
             throws InterruptedException, ManagedLedgerException {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 3d1e565ac34..2a47cfdc537 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -235,7 +235,6 @@ public class MetaStoreImpl implements MetaStore {
                 log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
             }
         }
-
         store.put(path, content, Optional.of(expectedVersion))
                 .thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor
                         .chooseThread(ledgerName))
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
new file mode 100644
index 00000000000..e65e418a5ec
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntriesCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+    private final int batchSize;
+
+    PositionImpl searchPosition;
+    Position lastSeenPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, int batchSize,
+                  PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.batchSize = batchSize;
+        if (batchSize <= 0) {
+            throw new IllegalArgumentException("batchSize " + batchSize);
+        }
+        this.cursor = Objects.requireNonNull(cursor);
+        this.ledger = cursor.ledger;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+    @Override
+    public void readEntriesComplete(List<Entry> entries, Object ctx) {
+        try {
+            Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition();
+            lastSeenPosition = lastPositionForBatch;
+            // filter out the entry if it has been already deleted
+            // filterReadEntries will call entry.release if the entry is filtered out
+            List<Entry> entriesFiltered = this.cursor.filterReadEntries(entries);
+            int skippedEntries = entries.size() - entriesFiltered.size();
+            remainingEntries.addAndGet(-skippedEntries);
+            if (!entriesFiltered.isEmpty()) {
+                for (Entry entry : entriesFiltered) {
+                    if (remainingEntries.decrementAndGet() <= 0) {
+                        log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+                        callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+                        return;
+                    }
+                    if (!condition.apply(entry)) {
+                        log.warn("[{}] Scan abort due to user code", OpScan.this.cursor);
+                        callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+                        return;
+                    }
+                }
+            }
+            searchPosition = ledger.getPositionAfterN((PositionImpl) lastPositionForBatch, 1,
+                    PositionBound.startExcluded);
+            if (log.isDebugEnabled()) {
+                log.debug("readEntryComplete {} at {} next is {}", lastPositionForBatch, searchPosition);
+            }
+
+            if (searchPosition.compareTo((PositionImpl) lastPositionForBatch) == 0) {
+                // we have reached the end of the ledger, as we are not doing progress
+                callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+                return;
+            }
+        } catch (Throwable t) {
+            log.error("Unhandled error", t);
+            callback.scanFailed(ManagedLedgerException.getManagedLedgerException(t),
+                    Optional.ofNullable(lastSeenPosition), OpScan.this.ctx);
+            return;
+        } finally {
+            entries.forEach(Entry::release);
+        }
+        find();
+    }
+
+    @Override
+    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+        callback.scanFailed(exception, Optional.ofNullable(searchPosition), OpScan.this.ctx);
+    }
+
+    public void find() {
+        if (remainingEntries.get() <= 0) {
+            log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+            callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+            return;
+        }
+        if (System.currentTimeMillis() - startTime > timeOutMs) {
+            log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
+            callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+            return;
+        }
+        if (cursor.hasMoreEntries(searchPosition)) {
+            OpReadEntry opReadEntry = OpReadEntry.create(cursor, searchPosition, batchSize,
+            this, OpScan.this.ctx, null);
+            ledger.asyncReadEntries(opReadEntry);
+        } else {
+            callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+        }
+    }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index b5d677f8b04..4fb51885b9e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -50,6 +51,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
@@ -84,6 +86,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -1832,6 +1835,125 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
                 c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
     }
 
+    @DataProvider(name = "testScanValues")
+    public static Object[][] testScanValues() {
+        return new Object[][] {
+                { 10, 1 }, // single entry
+                { 10, 3 }, // batches with remainder
+                { 10, 5 }, // batches, half
+                { 10, 1000 }, // big batch size, scan whole ledger in one round
+                { 0, 10 } // empty ledger
+        };
+    }
+
+    @Test(dataProvider = "testScanValues", timeOut = 30000)
+    void testScan(int numEntries, int batchSize) throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger_scan_" + numEntries
+                + "_" +batchSize);
+
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < numEntries; i++) {
+            positions.add(ledger.addEntry(("a" + i).getBytes(Encoding)));
+        }
+
+        List<String> contents = new CopyOnWriteArrayList<>();
+
+        assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
+            contents.add(new String(entry.getData(), StandardCharsets.UTF_8));
+            return true;
+        }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+
+        for (int i = 0; i < numEntries; i++) {
+            assertEquals(contents.get(i), ("a" + i));
+        }
+        assertEquals(contents.size(), numEntries);
+
+        if (numEntries <= 0) {
+            return;
+        }
+
+        List<String> contentsFromHalf = new CopyOnWriteArrayList<>();
+        int half = numEntries / 2;
+        Position halfPosition = positions.get(half);
+        assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.of(halfPosition), (entry -> {
+            contentsFromHalf.add(new String(entry.getData(), StandardCharsets.UTF_8));
+            return true;
+        }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+
+        for (int i = half; i < numEntries; i++) {
+            assertEquals(contentsFromHalf.get(i - half), ("a" + i));
+        }
+        assertEquals(contentsFromHalf.size(), numEntries - half);
+
+        assertEquals(ScanOutcome.USER_INTERRUPTED, c1.scan(Optional.empty(), (entry -> {
+            return false;
+        }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+
+        // max entries
+        assertEquals(ScanOutcome.ABORTED, c1.scan(Optional.empty(), (entry -> {
+            return true;
+        }), batchSize, 1, Long.MAX_VALUE).get());
+
+        // timeout
+        // please note that the timeout is verified
+        // between the reads
+        // so with a big batchSize this test would take too much
+        // we are skipping this check if batchSize is too big
+        if (batchSize <= 5) {
+            assertEquals(ScanOutcome.ABORTED, c1.scan(Optional.empty(), (entry -> {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ie) {
+                }
+                return true;
+            }), batchSize, Long.MAX_VALUE, 1000).get());
+        }
+        // user code exception
+        AtomicReference<Throwable> error = new AtomicReference<>();
+        c1.scan(Optional.empty(), (entry -> {
+            throw new RuntimeException("dummy!");
+        }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).handle((___, err) -> {
+            error.set(err);
+            return null;
+        }).get();
+        assertTrue(error.get() instanceof ManagedLedgerException);
+        assertTrue(error.get().getCause() instanceof RuntimeException);
+        assertEquals(error.get().getCause().getMessage(), "dummy!");
+
+
+        // test deleted entries
+        positions.clear();
+        assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
+            positions.add(entry.getPosition());
+            return true;
+        }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+        assertEquals(numEntries, positions.size());
+
+        // delete one entry in the middle
+        c1.delete(positions.get(2));
+
+        List<Position> positionsAfterDelete = new ArrayList<>();
+        assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
+            positionsAfterDelete.add(entry.getPosition());
+            return true;
+        }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+        assertEquals(numEntries - 1, positionsAfterDelete.size());
+
+        // delete all the entries
+        for (Position p : positionsAfterDelete) {
+            c1.delete(p);
+        }
+
+        List<Position> positionsFinal = new ArrayList<>();
+        assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
+            positionsFinal.add(entry.getPosition());
+            return true;
+        }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+        assertEquals(0,positionsFinal.size());
+
+    }
+
     @Test(timeOut = 20000)
     void testFindNewestMatchingOdd1() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a6bd9dde24f..4fb680cf8b4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -625,6 +625,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "How long to delay rewinding cursor and dispatching messages when active consumer is changed"
     )
     private int activeConsumerFailoverDelayTimeMillis = 1000;
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "Maximum time to spend while scanning a subscription to calculate the accurate backlog"
+    )
+    private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "Maximum number of entries to process while scanning a subscription to calculate the accurate backlog"
+    )
+    private long subscriptionBacklogScanMaxEntries = 10_000;
     @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "How long to delete inactive subscriptions from last consuming."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 84356d8ee22..6f48145b0ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -58,6 +58,8 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
@@ -68,6 +70,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
 import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
@@ -131,6 +134,7 @@ import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.policies.data.stats.PartitionedTopicStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
@@ -1569,6 +1573,63 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
+    private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                          String subName,
+                                                                          Optional<Position> position,
+                                                                          boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                            Subscription sub = topic.getSubscription(subName);
+                            if (sub == null) {
+                                throw new RestException(Status.NOT_FOUND,
+                                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                            }
+                            return sub.analyzeBacklog(position);
+                        })
+                .thenAccept((AnalyzeBacklogResult rawResult) -> {
+
+                        AnalyzeSubscriptionBacklogResult result = new AnalyzeSubscriptionBacklogResult();
+
+                        if (rawResult.getFirstPosition() != null) {
+                            result.setFirstMessageId(
+                                    rawResult.getFirstPosition().getLedgerId()
+                                    + ":"
+                                    + rawResult.getFirstPosition().getEntryId());
+                        }
+
+                        if (rawResult.getLastPosition() != null) {
+                            result.setLastMessageId(rawResult.getLastPosition().getLedgerId()
+                                    + ":"
+                                    + rawResult.getLastPosition().getEntryId());
+                        }
+
+                        result.setEntries(rawResult.getEntries());
+                        result.setMessages(rawResult.getMessages());
+
+                        result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
+                        result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
+                        result.setFilterRescheduledEntries(rawResult.getFilterRescheduledEntries());
+
+                        result.setFilterAcceptedMessages(rawResult.getFilterAcceptedMessages());
+                        result.setFilterRejectedMessages(rawResult.getFilterRejectedMessages());
+                        result.setFilterRescheduledMessages(rawResult.getFilterRescheduledMessages());
+                        result.setAborted(rawResult.getScanOutcome() != ScanOutcome.COMPLETED);
+                        log.info("[{}] analyzeBacklog topic {} subscription {} result {}", clientAppId(), subName,
+                            topicName, result);
+                        asyncResponse.resume(result);
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to analyze subscription backlog {} {}",
+                                clientAppId(), topicName, subName, cause);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    return null;
+                });
+    }
     private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
                                       String subName, Map<String, String> subscriptionProperties,
                                       boolean authoritative) {
@@ -2382,6 +2443,46 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
+    protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
+                                                      Optional<Position> position,
+                                                      boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                                .thenAccept(metadata -> {
+                                    if (metadata.partitions > 0) {
+                                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                                "Analyze backlog on a partitioned topic is not allowed, "
+                                                        + "please try do it on specific topic partition");
+                                    }
+                                });
+                    }
+                })
+                .thenAccept(__ -> {
+                   internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(asyncResponse, subName,
+                           position, authoritative);
+                })
+                .exceptionally(ex -> {
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to analyze back log of subscription {} from topic {}",
+                                clientAppId(), subName, topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+        });
+    }
+
     protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName,
                                                         boolean authoritative) {
         CompletableFuture<Void> future;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 28dc8015ff8..854adf98ac3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -45,6 +45,8 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
@@ -1612,7 +1614,7 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @PUT
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
-    @ApiOperation(value = "Replaces all the properties on the given subscription")
+    @ApiOperation(value = "Replace all the properties on the given subscription")
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
@@ -1649,7 +1651,7 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @GET
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
-    @ApiOperation(value = "Replaces all the properties on the given subscription")
+    @ApiOperation(value = "Return all the properties on the given subscription")
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
@@ -1683,6 +1685,51 @@ public class PersistentTopics extends PersistentTopicsBase {
         }
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog")
+    @ApiOperation(value = "Analyse a subscription, by scanning all the unprocessed messages")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Method Not Allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void analyzeSubscriptionBacklog(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription", required = true)
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(name = "position", value = "messageId to start the analysis")
+            ResetCursorData position,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        try {
+            Optional<Position> positionImpl;
+            if (position != null) {
+                positionImpl = Optional.of(new PositionImpl(position.getLedgerId(),
+                        position.getEntryId()));
+            } else {
+                positionImpl = Optional.empty();
+            }
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalAnalyzeSubscriptionBacklog(asyncResponse, decode(encodedSubName),
+                    positionImpl, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
     @ApiOperation(value = "Reset subscription to message position closest to given position.",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d9f36bf0643..de62df77e98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,7 +19,6 @@
 
 package org.apache.pulsar.broker.service;
 
-import com.google.common.collect.ImmutableList;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,7 +32,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
@@ -41,8 +39,6 @@ import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
-import org.apache.pulsar.broker.service.plugin.FilterContext;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -51,31 +47,15 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 
 @Slf4j
-public abstract class AbstractBaseDispatcher implements Dispatcher {
-
-    protected final Subscription subscription;
+public abstract class AbstractBaseDispatcher extends EntryFilterSupport implements Dispatcher {
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
-    /**
-     * Entry filters in Broker.
-     * Not set to final, for the convenience of testing mock.
-     */
-    protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
-    protected final FilterContext filterContext;
 
     protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
-        this.subscription = subscription;
+        super(subscription);
         this.serviceConfig = serviceConfig;
         this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
-        if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic()
-                .getBrokerService().getEntryFilters())) {
-            this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
-            this.filterContext = new FilterContext();
-        } else {
-            this.entryFilters = ImmutableList.of();
-            this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
-        }
     }
 
 
@@ -106,7 +86,6 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 isReplayRead, consumer);
     }
 
-
     /**
      * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
      *
@@ -134,20 +113,17 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             final int metadataIndex = i + startOffset;
             final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
                     .orElseGet(() -> Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
-            if (CollectionUtils.isNotEmpty(entryFilters)) {
-                fillContext(filterContext, msgMetadata, subscription, consumer);
-                EntryFilter.FilterResult filterResult = getFilterResult(filterContext, entry, entryFilters);
-                if (filterResult == EntryFilter.FilterResult.REJECT) {
-                    entriesToFiltered.add(entry.getPosition());
-                    entries.set(i, null);
-                    entry.release();
-                    continue;
-                } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
-                    entriesToRedeliver.add((PositionImpl) entry.getPosition());
-                    entries.set(i, null);
-                    entry.release();
-                    continue;
-                }
+            EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
+            if (filterResult == EntryFilter.FilterResult.REJECT) {
+                entriesToFiltered.add(entry.getPosition());
+                entries.set(i, null);
+                entry.release();
+                continue;
+            } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
+                entriesToRedeliver.add((PositionImpl) entry.getPosition());
+                entries.set(i, null);
+                entry.release();
+                continue;
             }
             if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
                     && msgMetadata.hasTxnidLeastBits()) {
@@ -238,29 +214,6 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
         }
     }
 
-    private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
-                                                            ImmutableList<EntryFilterWithClassLoader> entryFilters) {
-        for (EntryFilter entryFilter : entryFilters) {
-            EntryFilter.FilterResult filterResult =
-                    entryFilter.filterEntry(entry, filterContext);
-            if (filterResult == null) {
-                filterResult = EntryFilter.FilterResult.ACCEPT;
-            }
-            if (filterResult != EntryFilter.FilterResult.ACCEPT) {
-                return filterResult;
-            }
-        }
-        return EntryFilter.FilterResult.ACCEPT;
-    }
-
-    private void fillContext(FilterContext context, MessageMetadata msgMetadata,
-                             Subscription subscription, Consumer consumer) {
-        context.reset();
-        context.setMsgMetadata(msgMetadata);
-        context.setSubscription(subscription);
-        context.setConsumer(consumer);
-    }
-
     /**
      * Determine whether the number of consumers on the subscription reaches the threshold.
      * @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
new file mode 100644
index 00000000000..d2141215333
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Data;
+import lombok.ToString;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+
+@Data
+@ToString
+public final class AnalyzeBacklogResult {
+
+    private long entries;
+    private long messages;
+
+    private long filterRejectedEntries;
+    private long filterAcceptedEntries;
+    private long filterRescheduledEntries;
+
+    private long filterRejectedMessages;
+    private long filterAcceptedMessages;
+    private long filterRescheduledMessages;
+
+    private ScanOutcome scanOutcome;
+
+    private Position firstPosition;
+    private Position lastPosition;
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
new file mode 100644
index 00000000000..b0c7385a28f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.FilterContext;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+public class EntryFilterSupport {
+
+    /**
+     * Entry filters in Broker.
+     * Not set to final, for the convenience of testing mock.
+     */
+    protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
+    protected final FilterContext filterContext;
+    protected final Subscription subscription;
+
+    public EntryFilterSupport(Subscription subscription) {
+        this.subscription = subscription;
+        if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic()
+                .getBrokerService().getEntryFilters())) {
+            this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
+            this.filterContext = new FilterContext();
+        } else {
+            this.entryFilters = ImmutableList.of();
+            this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
+        }
+    }
+
+    public EntryFilter.FilterResult runFiltersForEntry(Entry entry, MessageMetadata msgMetadata,
+                                                       Consumer consumer) {
+        if (CollectionUtils.isNotEmpty(entryFilters)) {
+            fillContext(filterContext, msgMetadata, subscription, consumer);
+            return getFilterResult(filterContext, entry, entryFilters);
+        } else {
+            return EntryFilter.FilterResult.ACCEPT;
+        }
+    }
+
+    private void fillContext(FilterContext context, MessageMetadata msgMetadata,
+                             Subscription subscription, Consumer consumer) {
+        context.reset();
+        context.setMsgMetadata(msgMetadata);
+        context.setSubscription(subscription);
+        context.setConsumer(consumer);
+    }
+
+
+    private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
+                                                            ImmutableList<EntryFilterWithClassLoader> entryFilters) {
+        for (EntryFilter entryFilter : entryFilters) {
+            EntryFilter.FilterResult filterResult =
+                    entryFilter.filterEntry(entry, filterContext);
+            if (filterResult == null) {
+                filterResult = EntryFilter.FilterResult.ACCEPT;
+            }
+            if (filterResult != EntryFilter.FilterResult.ACCEPT) {
+                return filterResult;
+            }
+        }
+        return EntryFilter.FilterResult.ACCEPT;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 49b906b7959..2736f407ebf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
@@ -111,6 +112,8 @@ public interface Subscription {
 
     CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction, long lowWaterMark);
 
+    CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position);
+
     default int getNumberOfSameAddressConsumers(final String clientAddress) {
         int count = 0;
         if (clientAddress != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index c3399bda4bd..3d0a313fef1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -22,6 +22,7 @@ import com.google.common.base.MoreObjects;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.Entry;
@@ -29,6 +30,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.AbstractSubscription;
+import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -512,6 +514,14 @@ public class NonPersistentSubscription extends AbstractSubscription implements S
         return completableFuture;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {
+        CompletableFuture<AnalyzeBacklogResult> completableFuture = new CompletableFuture<>();
+        completableFuture.completeExceptionally(
+                new Exception("Unsupported operation analyzeBacklog for NonPersistentSubscription"));
+        return completableFuture;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class);
 
     public long getLastActive() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 0dc38f9bc2e..22f94ee0545 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service.persistent;
 import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Predicate;
+import io.netty.buffer.ByteBuf;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -30,6 +32,8 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -44,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -52,6 +57,7 @@ import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.AbstractSubscription;
+import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -60,8 +66,10 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFence
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryFilterSupport;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -70,6 +78,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.TopicName;
@@ -77,6 +86,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -512,6 +522,109 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
         return "Null";
     }
 
+    @Override
+    public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {
+
+        long start = System.currentTimeMillis();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Starting to analyze backlog", topicName, subName);
+        }
+
+        AtomicLong entries = new AtomicLong();
+        AtomicLong accepted = new AtomicLong();
+        AtomicLong rejected = new AtomicLong();
+        AtomicLong rescheduled = new AtomicLong();
+        AtomicLong messages = new AtomicLong();
+        AtomicLong acceptedMessages = new AtomicLong();
+        AtomicLong rejectedMessages = new AtomicLong();
+        AtomicLong rescheduledMessages = new AtomicLong();
+
+        Position currentPosition = cursor.getMarkDeletedPosition();
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] currentPosition {}",
+                    topicName, subName, currentPosition);
+        }
+        final EntryFilterSupport entryFilterSupport = dispatcher != null
+                ? (EntryFilterSupport) dispatcher : new EntryFilterSupport(this);
+        // we put some hard limits on the scan, in order to prevent denial of services
+        ServiceConfiguration configuration = topic.getBrokerService().getPulsar().getConfiguration();
+        long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
+        long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
+        int batchSize = configuration.getDispatcherMaxReadBatchSize();
+        AtomicReference<Position> firstPosition = new AtomicReference<>();
+        AtomicReference<Position> lastPosition = new AtomicReference<>();
+        return cursor.scan(position, new Predicate<Entry>() {
+            @Override
+            public boolean apply(Entry entry) {
+                if (log.isDebugEnabled()) {
+                    log.debug("found {}", entry);
+                }
+                Position entryPosition = entry.getPosition();
+                firstPosition.compareAndSet(null, entryPosition);
+                lastPosition.set(entryPosition);
+                ByteBuf metadataAndPayload = entry.getDataBuffer();
+                MessageMetadata messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+                int numMessages = 1;
+                if (messageMetadata.hasNumMessagesInBatch()) {
+                    numMessages = messageMetadata.getNumMessagesInBatch();
+                }
+                EntryFilter.FilterResult filterResult = entryFilterSupport
+                        .runFiltersForEntry(entry, messageMetadata, null);
+
+                if (filterResult == null) {
+                    filterResult = EntryFilter.FilterResult.ACCEPT;
+                }
+                switch (filterResult) {
+                    case REJECT:
+                        rejected.incrementAndGet();
+                        rejectedMessages.addAndGet(numMessages);
+                        break;
+                    case RESCHEDULE:
+                        rescheduled.incrementAndGet();
+                        rescheduledMessages.addAndGet(numMessages);
+                        break;
+                    default:
+                        accepted.incrementAndGet();
+                        acceptedMessages.addAndGet(numMessages);
+                        break;
+                }
+                long num = entries.incrementAndGet();
+                messages.addAndGet(numMessages);
+
+                if (num % 1000 == 0) {
+                    long end = System.currentTimeMillis();
+                    log.info(
+                            "[{}][{}] scan running since {} ms - scanned {} entries",
+                            topicName, subName, end - start, num);
+                }
+
+                return true;
+            }
+        }, batchSize, maxEntries, timeOutMs).thenApply((ScanOutcome outcome) -> {
+            long end = System.currentTimeMillis();
+            AnalyzeBacklogResult result = new AnalyzeBacklogResult();
+            result.setFirstPosition(firstPosition.get());
+            result.setLastPosition(lastPosition.get());
+            result.setEntries(entries.get());
+            result.setMessages(messages.get());
+            result.setFilterAcceptedEntries(accepted.get());
+            result.setFilterAcceptedMessages(acceptedMessages.get());
+            result.setFilterRejectedEntries(rejected.get());
+            result.setFilterRejectedMessages(rejectedMessages.get());
+            result.setFilterRescheduledEntries(rescheduled.get());
+            result.setFilterRescheduledMessages(rescheduledMessages.get());
+            // sometimes we abort the execution due to a timeout or
+            // when we reach a maximum number of entries
+            result.setScanOutcome(outcome);
+            log.info(
+                    "[{}][{}] scan took {} ms - {}",
+                    topicName, subName, end - start, result);
+            return result;
+        });
+
+    }
+
     @Override
     public CompletableFuture<Void> clearBacklog() {
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1192,4 +1305,5 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
     }
 
     private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
new file mode 100644
index 00000000000..fdf6fc562db
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Lists;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+
+@Test(groups = "broker-admin")
+public class AnalyzeBacklogSubscriptionTest extends ProducerConsumerBase {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+        producerBaseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void simpleAnalyzeBacklogTest() throws Exception {
+        simpleAnalyzeBacklogTest(false);
+    }
+
+    @Test
+    public void simpleAnalyzeBacklogTestWithBatching() throws Exception {
+        simpleAnalyzeBacklogTest(true);
+    }
+
+    private void simpleAnalyzeBacklogTest(boolean batching) throws Exception {
+        int numMessages = 20;
+        int batchSize = batching ? 5 : 1;
+        int numEntries = numMessages / batchSize;
+
+        String topic = "persistent://my-property/my-ns/my-topic-" + batching;
+        String subName = "sub-1";
+        admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+        assertEquals(admin.topics().getSubscriptions(topic), Lists.newArrayList("sub-1"));
+
+        verifyBacklog(topic, subName, 0, 0);
+
+        @Cleanup
+        Producer<byte[]> p = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(batching)
+                .batchingMaxMessages(batchSize)
+                .batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.SECONDS)
+                .create();
+
+        List<CompletableFuture<MessageId>> handles = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            CompletableFuture<MessageId> handle
+                    = p.sendAsync(("test-" + i).getBytes());
+            handles.add(handle);
+        }
+        FutureUtil.waitForAll(handles).get();
+
+        MessageId middleMessageId = handles.get(numMessages / 2).get();
+
+        verifyBacklog(topic, subName, numEntries, numMessages);
+
+        // create a second subscription
+        admin.topics().createSubscription(topic, "from-middle", middleMessageId);
+
+        verifyBacklog(topic, "from-middle", numEntries / 2, numMessages / 2);
+
+
+        try (Consumer consumer = pulsarClient
+                .newConsumer()
+                .topic(topic)
+                // we want to wait for the server to process acks, in order to not have a flaky test
+                .isAckReceiptEnabled(true)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe()) {
+            Message receive1 = consumer.receive();
+            Message receive2 = consumer.receive();
+            Message receive3 = consumer.receive();
+            Message receive4 = consumer.receive();
+            Message receive5 = consumer.receive();
+
+            verifyBacklog(topic, subName, numEntries, numMessages);
+
+            consumer.acknowledge(receive2);
+
+            // one individually deleted message
+            if (batching) {
+                // acknowledging a single message in a entry is not enough
+                // to count -1 for the backlog
+                verifyBacklog(topic, subName, numEntries, numMessages);
+            } else {
+                verifyBacklog(topic, subName, numEntries - 1, numMessages - 1);
+            }
+
+            consumer.acknowledge(receive1);
+            consumer.acknowledge(receive3);
+            consumer.acknowledge(receive4);
+            consumer.acknowledge(receive5);
+
+            verifyBacklog(topic, subName, numEntries - (5 / batchSize), numMessages - 5);
+
+            int count = numMessages - 5;
+            while (count -- > 0) {
+                Message m = consumer.receive();
+                consumer.acknowledge(m);
+            }
+
+            verifyBacklog(topic, subName, 0,0);
+        }
+
+    }
+
+    private void verifyBacklog(String topic, String subscription, int numEntries, int numMessages) throws Exception {
+        AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
+                = admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty());
+
+        assertEquals(numEntries, analyzeSubscriptionBacklogResult.getEntries());
+        assertEquals(numEntries, analyzeSubscriptionBacklogResult.getFilterAcceptedEntries());
+        assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedEntries());
+        assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
+        assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
+
+        assertEquals(numMessages, analyzeSubscriptionBacklogResult.getMessages());
+        assertEquals(numMessages, analyzeSubscriptionBacklogResult.getFilterAcceptedMessages());
+        assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedMessages());
+
+        assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledMessages());
+        assertFalse(analyzeSubscriptionBacklogResult.isAborted());
+    }
+
+
+    @Test
+    public void partitionedTopicNotAllowed() throws Exception {
+        String topic = "persistent://my-property/my-ns/my-partitioned-topic";
+        String subName = "sub-1";
+        admin.topics().createPartitionedTopic(topic, 2);
+        admin.topics().createSubscription(topic, subName, MessageId.latest);
+        assertEquals(admin.topics().getSubscriptions(topic), Lists.newArrayList("sub-1"));
+
+        // you cannot use this feature on a partitioned topic
+        assertThrows(PulsarAdminException.NotAllowedException.class, () -> {
+            admin.topics().analyzeSubscriptionBacklog(topic, "sub-1", Optional.empty());
+        });
+
+        // you can access single partitions
+        AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
+                = admin.topics().analyzeSubscriptionBacklog(topic + "-partition-0", "sub-1", Optional.empty());
+        assertEquals(0, analyzeSubscriptionBacklogResult.getEntries());
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 12b742a0191..ada26ee6c56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
 import java.io.IOException;
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.core.Response.Status;
+
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.http.HttpResponse;
@@ -63,6 +65,7 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
+@Slf4j
 public class CreateSubscriptionTest extends ProducerConsumerBase {
 
     @BeforeMethod
@@ -480,4 +483,5 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger());
         assertEquals(ml.getWaitingCursorsCount(), 0);
     }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
index c8d13c4826e..19175e936c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.plugin;
 
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -35,7 +36,7 @@ public class EntryFilterTest implements EntryFilter {
             return FilterResult.ACCEPT;
         }
         Consumer consumer = context.getConsumer();
-        Map<String, String> metadata = consumer.getMetadata();
+        Map<String, String> metadata = consumer != null ? consumer.getMetadata() : Collections.emptyMap();
         log.info("filterEntry for {}", metadata);
         String matchValueAccept = metadata.getOrDefault("matchValueAccept", "ACCEPT");
         String matchValueReject = metadata.getOrDefault("matchValueReject", "REJECT");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 6d77d06da01..ee100f3fdee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -29,17 +29,18 @@ import com.google.common.collect.ImmutableMap;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryFilterSupport;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -48,7 +49,9 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -68,6 +71,7 @@ public class FilterEntryTest extends BrokerTestBase {
         internalCleanup();
     }
 
+    @Test
     public void testFilter() throws Exception {
         Map<String, String> map = new HashMap<>();
         map.put("1","1");
@@ -76,12 +80,13 @@ public class FilterEntryTest extends BrokerTestBase {
         String subName = "sub";
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
                 .subscriptionProperties(map)
+                .isAckReceiptEnabled(true)
                 .subscriptionName(subName).subscribe();
         // mock entry filters
         PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         Dispatcher dispatcher = subscription.getDispatcher();
-        Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+        Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
         field.setAccessible(true);
         NarClassLoader narClassLoader = mock(NarClassLoader.class);
         EntryFilter filter1 = new EntryFilterTest();
@@ -96,6 +101,8 @@ public class FilterEntryTest extends BrokerTestBase {
             producer.send("test");
         }
 
+        verifyBacklog(topic, subName, 10, 10, 10, 10, 0, 0, 0, 0);
+
         int counter = 0;
         while (true) {
             Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
@@ -108,10 +115,28 @@ public class FilterEntryTest extends BrokerTestBase {
         }
         // All normal messages can be received
         assertEquals(10, counter);
+
+        verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
+
+        // stop the consumer
+        consumer.close();
+
         MessageIdImpl lastMsgId = null;
         for (int i = 0; i < 10; i++) {
             lastMsgId = (MessageIdImpl) producer.newMessage().property("REJECT", "").value("1").send();
         }
+
+        // analyze the subscription and predict that
+        // 10 messages will be rejected by the filter
+        verifyBacklog(topic, subName, 10, 10, 0, 0, 10, 10, 0, 0);
+
+        consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .isAckReceiptEnabled(true)
+                .subscriptionProperties(map)
+                .subscriptionName(subName)
+                .subscribe();
+
         counter = 0;
         while (true) {
             Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
@@ -125,6 +150,10 @@ public class FilterEntryTest extends BrokerTestBase {
         // REJECT messages are filtered out
         assertEquals(0, counter);
 
+        // now the Filter acknoledged the messages on behalf of the Consumer
+        // backlog is now zero again
+        verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
+
         // All messages should be acked, check the MarkDeletedPosition
         assertNotNull(lastMsgId);
         MessageIdImpl finalLastMsgId = lastMsgId;
@@ -180,7 +209,7 @@ public class FilterEntryTest extends BrokerTestBase {
             PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                     .getTopicReference(topic).get().getSubscription(subName);
             Dispatcher dispatcher = subscription.getDispatcher();
-            Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+            Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
             field.setAccessible(true);
             NarClassLoader narClassLoader = mock(NarClassLoader.class);
             EntryFilter filter1 = new EntryFilterTest();
@@ -253,7 +282,7 @@ public class FilterEntryTest extends BrokerTestBase {
             PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                     .getTopicReference(topic).get().getSubscription(subName);
             Dispatcher dispatcher = subscription.getDispatcher();
-            Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+            Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
             field.setAccessible(true);
             NarClassLoader narClassLoader = mock(NarClassLoader.class);
             EntryFilter filter1 = new EntryFilterTest();
@@ -328,4 +357,25 @@ public class FilterEntryTest extends BrokerTestBase {
             resultConsume2.get(1, TimeUnit.MINUTES);
         }
     }
+
+
+    private void verifyBacklog(String topic, String subscription,
+                               int numEntries, int numMessages,
+                               int numEntriesAccepted, int numMessagesAccepted,
+                               int numEntriesRejected, int numMessagesRejected,
+                               int numEntriesRescheduled, int numMessagesRescheduled
+                               ) throws Exception {
+        AnalyzeSubscriptionBacklogResult a1
+                = admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty());
+
+        Assert.assertEquals(numEntries, a1.getEntries());
+        Assert.assertEquals(numEntriesAccepted, a1.getFilterAcceptedEntries());
+        Assert.assertEquals(numEntriesRejected, a1.getFilterRejectedEntries());
+        Assert.assertEquals(numEntriesRescheduled, a1.getFilterRescheduledEntries());
+
+        Assert.assertEquals(numMessages, a1.getMessages());
+        Assert.assertEquals(numMessagesAccepted, a1.getFilterAcceptedMessages());
+        Assert.assertEquals(numMessagesRejected, a1.getFilterRejectedMessages());
+        Assert.assertEquals(numMessagesRescheduled, a1.getFilterRescheduledMessages());
+    }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 71ca31e5903..7e88b3fbd14 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -46,6 +47,8 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
+
 /**
  * Admin interface for Topics management.
  */
@@ -2049,6 +2052,46 @@ public interface Topics {
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last processed message)
+     * @return an accurate analysis of the backlog
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName,
+                                                                Optional<MessageId> startPosition)
+            throws PulsarAdminException;
+
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last processed message)
+     * @return an accurate analysis of the backlog
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic,
+                                                                           String subscriptionName,
+                                                                           Optional<MessageId> startPosition);
+
     /**
      * Get backlog size by a message ID.
      * @param topic
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
new file mode 100644
index 00000000000..a97ccb0bb96
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.stats;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class AnalyzeSubscriptionBacklogResult {
+    private long entries;
+    private long messages;
+
+    private long filterRejectedEntries;
+    private long filterAcceptedEntries;
+    private long filterRescheduledEntries;
+
+    private long filterRejectedMessages;
+    private long filterAcceptedMessages;
+    private long filterRescheduledMessages;
+
+    private boolean aborted;
+    private String firstMessageId;
+    private String lastMessageId;
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 48cb74d5952..d0c55c01957 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -141,6 +141,15 @@ public abstract class BaseResource {
         return future;
     }
 
+    public <T, R> void asyncPostRequestWithResponse(final WebTarget target, Entity<T> entity,
+                                                                    InvocationCallback<R> callback) {
+        try {
+            request(target).async().post(entity, callback);
+        } catch (PulsarAdminException cae) {
+            callback.failed(cae);
+        }
+    }
+
     public <T> CompletableFuture<Void> asyncPostRequest(final WebTarget target, Entity<T> entity) {
         final CompletableFuture<Void> future = new CompletableFuture<>();
         try {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index d7363597459..0b3f4ea7c7a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
@@ -88,6 +89,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.slf4j.Logger;
@@ -1662,6 +1664,46 @@ public class TopicsImpl extends BaseResource implements Topics {
         }
     }
 
+
+    @Override
+    public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
+                                                                       String subscriptionName,
+                                                                       Optional<MessageId> startPosition)
+            throws PulsarAdminException {
+        return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition));
+    }
+
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic,
+                                                                                String subscriptionName,
+                                                                                Optional<MessageId> startPosition) {
+        TopicName topicName = validateTopic(topic);
+        String encodedSubName = Codec.encode(subscriptionName);
+        WebTarget path = topicPath(topicName, "subscription", encodedSubName, "analyzeBacklog");
+
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new CompletableFuture<>();
+        Entity entity = null;
+        if (startPosition.isPresent()) {
+            ResetCursorData resetCursorData = new ResetCursorData(startPosition.get());
+            entity = Entity.entity(resetCursorData, MediaType.APPLICATION_JSON);
+        } else {
+            entity = null;
+        }
+
+        asyncPostRequestWithResponse(path, entity, new InvocationCallback<AnalyzeSubscriptionBacklogResult>() {
+            @Override
+            public void completed(AnalyzeSubscriptionBacklogResult res) {
+                future.complete(res);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        });
+        return future;
+    }
+
     @Override
     public Long getBacklogSizeByMessageId(String topic, MessageId messageId)
             throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 79865fae759..e3e9157233c 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -1481,6 +1482,9 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
         verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, null);
 
+        cmdTopics.run(split("analyze-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
+        verify(mockTopics).analyzeSubscriptionBacklog("persistent://myprop/clust/ns1/ds1", "sub1", Optional.empty());
+        
         // jcommander is stateful, you cannot parse the same command twice
         cmdTopics = new CmdTopics(() -> admin);
         cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest --property a=b -p x=y,z"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 2913b5112bc..4b73703395e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -39,6 +39,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -243,6 +244,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("set-replicated-subscription-status", new SetReplicatedSubscriptionStatus());
         jcommander.addCommand("get-replicated-subscription-status", new GetReplicatedSubscriptionStatus());
         jcommander.addCommand("get-backlog-size", new GetBacklogSizeByMessageId());
+        jcommander.addCommand("analyze-backlog", new AnalyzeBacklog());
 
         jcommander.addCommand("get-replication-clusters", new GetReplicationClusters());
         jcommander.addCommand("set-replication-clusters", new SetReplicationClusters());
@@ -2922,6 +2924,33 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+
+    @Parameters(commandDescription = "Analyze the backlog of a subscription.")
+    private class AnalyzeBacklog extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-s", "--subscription" }, description = "Subscription to be analyzed", required = true)
+        private String subName;
+
+        @Parameter(names = { "--position",
+                "-p" }, description = "message position to start the scan from (ledgerId:entryId)", required = false)
+        private String messagePosition;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            Optional<MessageId> startPosition = Optional.empty();
+            if (isNotBlank(messagePosition)) {
+                int partitionIndex = TopicName.get(persistentTopic).getPartitionIndex();
+                MessageId messageId = validateMessageIdString(messagePosition, partitionIndex);
+                startPosition = Optional.of(messageId);
+            }
+            print(getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition));
+
+        }
+    }
+
     @Parameters(commandDescription = "Get the schema validation enforced")
     private class GetSchemaValidationEnforced extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)