You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/02 04:03:20 UTC

[pulsar] branch master updated: Add streaming dispatcher. (#9056)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cfaf48  Add streaming dispatcher. (#9056)
8cfaf48 is described below

commit 8cfaf48e1dc97e30050cb29c106b614d6b9bc69c
Author: Marvin Cai <zx...@streamnative.io>
AuthorDate: Mon Feb 1 20:02:50 2021 -0800

    Add streaming dispatcher. (#9056)
    
    Related to  #3804
    
    ### Motivation
    
    Trying to streamline the dispatcher's read requests to manager ledger instead of micro batch.
    
    ### Modifications
    
    Created a StreamingEntryReader that can streamline read request to managed ledger.
    Created StreamingDispatcher interface with necessary method to interact with StreamingEntryReader.
    Created PersistentStreamingDispatcherSingleActive/MultipleConsumer that make use of StreamingEntryReader to read entries from managed ledger.
    Add config to use streaming dispatcher.
---
 build/run_unit_group.sh                            |   5 +
 .../bookkeeper/mledger/WaitingEntryCallBack.java   |  30 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  26 +-
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |   2 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 +
 .../AbstractDispatcherMultipleConsumers.java       |   2 +
 .../PersistentDispatcherMultipleConsumers.java     | 185 +++++----
 .../PersistentDispatcherSingleActiveConsumer.java  | 276 ++++++-------
 ...istentStreamingDispatcherMultipleConsumers.java | 191 +++++++++
 ...entStreamingDispatcherSingleActiveConsumer.java | 208 ++++++++++
 .../service/persistent/PersistentSubscription.java |  19 +-
 .../streamingdispatch/PendingReadEntryRequest.java |  76 ++++
 .../streamingdispatch/StreamingDispatcher.java     |  53 +++
 .../streamingdispatch/StreamingEntryReader.java    | 338 ++++++++++++++++
 .../service/streamingdispatch/package-info.java    |  19 +
 .../PersistentDispatcherFailoverConsumerTest.java  |   5 +-
 .../broker/service/PersistentFailoverE2ETest.java  |   5 +
 .../pulsar/broker/service/PersistentTopicTest.java |   4 +-
 ...herFailoverConsumerStreamingDispatcherTest.java |  35 ++
 ...rsistentFailoverStreamingDispatcherE2ETest.java |  36 ++
 ...istentStreamingDispatcherBlockConsumerTest.java |  35 ++
 ...eDispatchStreamingDispatcherThrottlingTest.java |  36 ++
 .../PersistentTopicStreamingDispatcherE2ETest.java |  35 ++
 .../PersistentTopicStreamingDispatcherTest.java    |  35 ++
 ...roducerConsumerTestStreamingDispatcherTest.java |  35 ++
 .../StreamingEntryReaderTests.java                 | 433 +++++++++++++++++++++
 .../client/api/DispatcherBlockConsumerTest.java    |   1 +
 .../SubscriptionMessageDispatchThrottlingTest.java |   9 +-
 site2/docs/reference-configuration.md              |   1 +
 29 files changed, 1905 insertions(+), 237 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index aed0ca4..b4e5530 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -56,12 +56,17 @@ function broker_group_2() {
                                       -DtestForkCount=1 \
                                       -DtestReuseFork=true
 
+  $MVN_TEST_COMMAND -pl pulsar-broker -Dinclude="**/*StreamingDispatcher*Test.java" \
+                                      -DtestForkCount=1 \
+                                      -DtestReuseFork=true
+
   $MVN_TEST_COMMAND -pl pulsar-broker -Dinclude="org/apache/pulsar/broker/zookeeper/**/*.java,
                                                  org/apache/pulsar/broker/loadbalance/**/*.java,
                                                  org/apache/pulsar/broker/service/**/*.java" \
                                       -Dexclude="**/ReplicatorTest.java,
                                                  **/MessagePublishBufferThrottleTest.java,
                                                  **/TopicOwnerTest.java,
+                                                 **/*StreamingDispatcher*Test.java,
                                                  **/AntiAffinityNamespaceGroupTest.java"
 }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java
new file mode 100644
index 0000000..fc1f3b4
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/WaitingEntryCallBack.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+/**
+ * Contains callback that can be registered with {@link ManagedLedger} to wait for new entries to be available.
+ */
+public interface WaitingEntryCallBack {
+
+    /**
+     * The callback {@link ManagedLedger} will trigger when new entries are available.
+     */
+    void entriesAvailable();
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a36b1c4..232bf5b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -110,6 +110,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedger
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
@@ -170,6 +171,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     // Cursors that are waiting to be notified when new entries are persisted
     final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;
 
+    // Objects that are waiting to be notified when new entries are persisted
+    final ConcurrentLinkedQueue<WaitingEntryCallBack> waitingEntryCallBacks;
+
     // This map is used for concurrent open cursor requests, where the 2nd request will attach a listener to the
     // uninitialized cursor future from the 1st request
     final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
@@ -290,6 +294,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         this.mbean = new ManagedLedgerMBeanImpl(this);
         this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
         this.waitingCursors = Queues.newConcurrentLinkedQueue();
+        this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
         this.uninitializedCursors = Maps.newHashMap();
         this.clock = config.getClock();
 
@@ -2109,6 +2114,21 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+    void notifyWaitingEntryCallBacks() {
+        while (true) {
+            final WaitingEntryCallBack cb = waitingEntryCallBacks.poll();
+            if (cb == null) {
+                break;
+            }
+
+            executor.execute(safeRun(cb::entriesAvailable));
+        }
+    }
+
+    public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
+        this.waitingEntryCallBacks.add(cb);
+    }
+
     private void trimConsumedLedgersInBackground() {
         trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
     }
@@ -3086,7 +3106,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
      *            the position to validate
      * @return true if the position is valid, false otherwise
      */
-    boolean isValidPosition(PositionImpl position) {
+    public boolean isValidPosition(PositionImpl position) {
         PositionImpl last = lastConfirmedEntry;
         if (log.isDebugEnabled()) {
             log.debug("IsValid position: {} -- last: {}", position, last);
@@ -3130,7 +3150,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             next = getNextValidPositionInternal(position);
         } catch (NullPointerException e) {
             next = lastConfirmedEntry.getNext();
-            log.error("[{}] Can't find next valid position, fail back to the next position of the last position.", name, e);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Can't find next valid position, fall back to the next position of the last position.", name, e);
+            }
         }
         return next;
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 742ed1e..fc3054e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -207,6 +207,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
                 cb.addComplete(lastEntry, data.asReadOnly(), ctx);
                 ReferenceCountUtil.release(data);
                 ml.notifyCursors();
+                ml.notifyWaitingEntryCallBacks();
                 this.recycle();
             } else {
                 ReferenceCountUtil.release(data);
@@ -232,6 +233,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
         if (cb != null) {
             cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx);
             ml.notifyCursors();
+            ml.notifyWaitingEntryCallBacks();
             this.recycle();
         }
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0d38279..44381bd 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -686,6 +686,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean preciseDispatcherFlowControl = false;
 
     @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " +
+                "in subsequent release."
+    )
+    private boolean streamingDispatch = false;
+
+    @FieldContext(
         dynamic = true,
         category = CATEGORY_SERVER,
         doc = "Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 7cd9e03..2f6b9a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -72,6 +72,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi
 
     public abstract boolean isConsumerAvailable(Consumer consumer);
 
+    protected void cancelPendingRead() {}
+
     /**
      * <pre>
      * Broker gives more priority while dispatching messages. Here, broker follows descending priorities. (eg:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 26f328d..2816f9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -80,14 +80,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;
 
     private CompletableFuture<Void> closeFuture = null;
-    LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+    protected LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
     protected final RedeliveryTracker redeliveryTracker;
 
     private Optional<DelayedDeliveryTracker> delayedDeliveryTracker = Optional.empty();
 
-    private volatile boolean havePendingRead = false;
-    private volatile boolean havePendingReplayRead = false;
-    private boolean shouldRewindBeforeReadingOrReplaying = false;
+    protected volatile boolean havePendingRead = false;
+    protected volatile boolean havePendingReplayRead = false;
+    protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
 
     protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
@@ -95,23 +95,23 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalAvailablePermits");
     protected volatile int totalAvailablePermits = 0;
-    private volatile int readBatchSize;
-    private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
+    protected volatile int readBatchSize;
+    protected final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
             1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
     private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_UNACKED_MESSAGES_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalUnackedMessages");
-    private volatile int totalUnackedMessages = 0;
+    protected volatile int totalUnackedMessages = 0;
     private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
-    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
+    protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "blockedDispatcherOnUnackedMsgs");
     protected final ServiceConfiguration serviceConfig;
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
-    enum ReadType {
+    protected enum ReadType {
         Normal, Replay
     }
 
@@ -199,9 +199,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             consumerList.remove(consumer);
             log.info("Removed consumer {} with pending {} acks", consumer, consumer.getPendingAcks().size());
             if (consumerList.isEmpty()) {
-                if (havePendingRead && cursor.cancelPendingReadRequest()) {
-                    havePendingRead = false;
-                }
+                cancelPendingRead();
 
                 messagesToRedeliver.clear();
                 redeliveryTracker.clear();
@@ -248,81 +246,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         // totalAvailablePermits may be updated by other threads
         int currentTotalAvailablePermits = totalAvailablePermits;
         if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
-            int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
-
-            Consumer c = getRandomConsumer();
-            // if turn on precise dispatcher flow control, adjust the record to read
-            if (c != null && c.isPreciseDispatcherFlowControl()) {
-                messagesToRead = Math.min(
-                        (int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()),
-                        readBatchSize);
-            }
-
-            if (!isConsumerWritable()) {
-                // If the connection is not currently writable, we issue the read request anyway, but for a single
-                // message. The intent here is to keep use the request as a notification mechanism while avoiding to
-                // read and dispatch a big batch of messages which will need to wait before getting written to the
-                // socket.
-                messagesToRead = 1;
-            }
-
-            // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
-            // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
-            // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
-            if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                if (topic.getDispatchRateLimiter().isPresent()
-                        && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
-                    DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
-                    if (!topicRateLimiter.hasMessageDispatchPermit()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
-                                    topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
-                                    MESSAGE_RATE_BACKOFF_MS);
-                        }
-                        topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
-                                TimeUnit.MILLISECONDS);
-                        return;
-                    } else {
-                        // if dispatch-rate is in msg then read only msg according to available permit
-                        long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
-                        }
-                    }
-                }
-
-                if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
-                    if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
-                                            + " schedule after a {}", name,
-                                    dispatchRateLimiter.get().getDispatchRateOnMsg(),
-                                    dispatchRateLimiter.get().getDispatchRateOnByte(),
-                                    MESSAGE_RATE_BACKOFF_MS);
-                        }
-                        topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
-                            TimeUnit.MILLISECONDS);
-                        return;
-                    } else {
-                        // if dispatch-rate is in msg then read only msg according to available permit
-                        long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
-                        }
-                    }
-                }
+            int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
 
-            }
-
-            if (havePendingReplayRead) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
-                }
+            if (-1 == messagesToRead) {
+                // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
                 return;
             }
 
-            // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
-            messagesToRead = Math.max(messagesToRead, 1);
             Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
 
             if (!messagesToReplayNow.isEmpty()) {
@@ -366,6 +296,84 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         }
     }
 
+    protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
+        int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
+
+        Consumer c = getRandomConsumer();
+        // if turn on precise dispatcher flow control, adjust the record to read
+        if (c != null && c.isPreciseDispatcherFlowControl()) {
+            messagesToRead = Math.min(
+                    (int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()),
+                    readBatchSize);
+        }
+
+        if (!isConsumerWritable()) {
+            // If the connection is not currently writable, we issue the read request anyway, but for a single
+            // message. The intent here is to keep use the request as a notification mechanism while avoiding to
+            // read and dispatch a big batch of messages which will need to wait before getting written to the
+            // socket.
+            messagesToRead = 1;
+        }
+
+        // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
+        // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
+        // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
+        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
+            if (topic.getDispatchRateLimiter().isPresent()
+                    && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+                DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
+                if (!topicRateLimiter.hasMessageDispatchPermit()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
+                                topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
+                            TimeUnit.MILLISECONDS);
+                    return -1;
+                } else {
+                    // if dispatch-rate is in msg then read only msg according to available permit
+                    long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
+                    if (availablePermitsOnMsg > 0) {
+                        messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
+                    }
+                }
+            }
+
+            if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+                if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
+                                        + " schedule after a {}", name,
+                                dispatchRateLimiter.get().getDispatchRateOnMsg(),
+                                dispatchRateLimiter.get().getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
+                            TimeUnit.MILLISECONDS);
+                    return -1;
+                } else {
+                    // if dispatch-rate is in msg then read only msg according to available permit
+                    long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
+                    if (availablePermitsOnMsg > 0) {
+                        messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
+                    }
+                }
+            }
+
+        }
+
+        if (havePendingReplayRead) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
+            }
+            return -1;
+        }
+
+        // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
+        return Math.max(messagesToRead, 1);
+    }
+
     protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
         return cursor.asyncReplayEntries(positions, this, ReadType.Replay);
     }
@@ -413,14 +421,19 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             closeFuture.complete(null);
         } else {
             consumerList.forEach(consumer -> consumer.disconnect(isResetCursor));
-            if (havePendingRead && cursor.cancelPendingReadRequest()) {
-                havePendingRead = false;
-            }
+            cancelPendingRead();
         }
         return closeFuture;
     }
 
     @Override
+    protected void cancelPendingRead() {
+        if (havePendingRead && cursor.cancelPendingReadRequest()) {
+            havePendingRead = false;
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
         return disconnectAllConsumers(isResetCursor);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 5501a81..4bc0728 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -58,20 +58,20 @@ import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer
+public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer
         implements Dispatcher, ReadEntriesCallback {
 
-    private final PersistentTopic topic;
-    private final ManagedCursor cursor;
-    private final String name;
+    protected final PersistentTopic topic;
+    protected final ManagedCursor cursor;
+    protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
-    private volatile boolean havePendingRead = false;
+    protected volatile boolean havePendingRead = false;
 
-    private volatile int readBatchSize;
-    private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
+    protected volatile int readBatchSize;
+    protected final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
             1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
-    private final ServiceConfiguration serviceConfig;
+    protected final ServiceConfiguration serviceConfig;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
 
     private final RedeliveryTracker redeliveryTracker;
@@ -90,9 +90,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     }
 
     protected void scheduleReadOnActiveConsumer() {
-        if (havePendingRead && cursor.cancelPendingReadRequest()) {
-            havePendingRead = false;
-        }
+        cancelPendingRead();
 
         if (havePendingRead) {
             return;
@@ -166,6 +164,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         return false;
     }
 
+    @Override
     protected void cancelPendingRead() {
         if (havePendingRead && cursor.cancelPendingReadRequest()) {
             havePendingRead = false;
@@ -230,45 +229,48 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
             filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, cursor, false);
+            dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, batchIndexesAcks, sendMessageInfo);
+        }
+    }
 
-            int totalMessages = sendMessageInfo.getTotalMessages();
-            long totalBytes = sendMessageInfo.getTotalBytes();
-
-            currentConsumer
-                    .sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
-                            sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
-                            redeliveryTracker)
-                    .addListener(future -> {
-                        if (future.isSuccess()) {
-                            // acquire message-dispatch permits for already delivered messages
-                            if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                                if (topic.getDispatchRateLimiter().isPresent()) {
-                                    topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessages, totalBytes);
-                                }
+    protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> entries,
+                                             EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
+                                             SendMessageInfo sendMessageInfo) {
+        currentConsumer
+            .sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
+                    sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
+                    redeliveryTracker)
+            .addListener(future -> {
+                if (future.isSuccess()) {
+                    // acquire message-dispatch permits for already delivered messages
+                    if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
+                        if (topic.getDispatchRateLimiter().isPresent()) {
+                            topic.getDispatchRateLimiter().get().tryDispatchPermit(sendMessageInfo.getTotalMessages(),
+                                    sendMessageInfo.getTotalBytes());
+                        }
 
-                                dispatchRateLimiter.ifPresent(rateLimiter ->
-                                        rateLimiter.tryDispatchPermit(totalMessages, totalBytes));
-                            }
+                        dispatchRateLimiter.ifPresent(rateLimiter ->
+                                rateLimiter.tryDispatchPermit(sendMessageInfo.getTotalMessages(),
+                                        sendMessageInfo.getTotalBytes()));
+                    }
 
-                            // Schedule a new read batch operation only after the previous batch has been written to the
-                            // socket
-                            topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
-                                    SafeRun.safeRun(() -> {
-                                        synchronized (PersistentDispatcherSingleActiveConsumer.this) {
-                                            Consumer newConsumer = getActiveConsumer();
-                                            if (newConsumer != null && !havePendingRead) {
-                                                readMoreEntries(newConsumer);
-                                            } else {
-                                                log.debug(
-                                                        "[{}-{}] Ignoring write future complete."
-                                                                + " consumerAvailable={} havePendingRead={}",
-                                                        name, newConsumer, newConsumer != null, havePendingRead);
-                                            }
-                                        }
-                                    }));
-                        }
-                    });
-        }
+                    // Schedule a new read batch operation only after the previous batch has been written to the socket.
+                    topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
+                        SafeRun.safeRun(() -> {
+                            synchronized (PersistentDispatcherSingleActiveConsumer.this) {
+                                Consumer newConsumer = getActiveConsumer();
+                                if (newConsumer != null && !havePendingRead) {
+                                    readMoreEntries(newConsumer);
+                                } else {
+                                    log.debug(
+                                            "[{}-{}] Ignoring write future complete."
+                                                    + " consumerAvailable={} havePendingRead={}",
+                                            name, newConsumer, newConsumer != null, havePendingRead);
+                                }
+                            }
+                        }));
+                }
+            });
     }
 
     @Override
@@ -322,9 +324,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             return;
         }
 
-        if (havePendingRead && cursor.cancelPendingReadRequest()) {
-            havePendingRead = false;
-        }
+        cancelPendingRead();
 
         if (!havePendingRead) {
             cursor.rewind();
@@ -354,93 +354,14 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             return;
         }
 
-        int availablePermits = consumer.getAvailablePermits();
-
-        if (availablePermits > 0) {
-            if (!consumer.isWritable()) {
-                // If the connection is not currently writable, we issue the read request anyway, but for a single
-                // message. The intent here is to keep use the request as a notification mechanism while avoiding to
-                // read and dispatch a big batch of messages which will need to wait before getting written to the
-                // socket.
-                availablePermits = 1;
-            }
+        if (consumer.getAvailablePermits() > 0) {
+            int messagesToRead = calculateNumOfMessageToRead(consumer);
 
-            int messagesToRead = Math.min(availablePermits, readBatchSize);
-            // if turn of precise dispatcher flow control, adjust the records to read
-            if (consumer.isPreciseDispatcherFlowControl()) {
-                int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
-                messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
+            if (-1 == messagesToRead) {
+                // Skip read as topic/dispatcher has exceed the dispatch rate.
+                return;
             }
 
-            // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
-            // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
-            // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
-            if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                if (topic.getDispatchRateLimiter().isPresent()
-                        && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
-                    DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
-                    if (!topicRateLimiter.hasMessageDispatchPermit()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
-                                topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
-                                MESSAGE_RATE_BACKOFF_MS);
-                        }
-                        topic.getBrokerService().executor().schedule(() -> {
-                            Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-                            if (currentConsumer != null && !havePendingRead) {
-                                readMoreEntries(currentConsumer);
-                            } else {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
-                                                    + " havePendingRead {}",
-                                            topic.getName(), currentConsumer, havePendingRead);
-                                }
-                            }
-                        }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-                        return;
-                    } else {
-                        // if dispatch-rate is in msg then read only msg according to available permit
-                        long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
-                        }
-                    }
-                }
-
-                if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
-                    if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
-                                            + " schedule after a {}",
-                                    name, dispatchRateLimiter.get().getDispatchRateOnMsg(),
-                                    dispatchRateLimiter.get().getDispatchRateOnByte(),
-                                    MESSAGE_RATE_BACKOFF_MS);
-                        }
-                        topic.getBrokerService().executor().schedule(() -> {
-                            Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-                            if (currentConsumer != null && !havePendingRead) {
-                                readMoreEntries(currentConsumer);
-                            } else {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}",
-                                        topic.getName(), currentConsumer, havePendingRead);
-                                }
-                            }
-                        }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-                        return;
-                    } else {
-                        // if dispatch-rate is in msg then read only msg according to available permit
-                        long subPermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                        if (subPermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) subPermitsOnMsg);
-                        }
-                    }
-                }
-            }
-
-            // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
-            messagesToRead = Math.max(messagesToRead, 1);
-
             // Schedule read
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
@@ -459,6 +380,93 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         }
     }
 
+    protected int calculateNumOfMessageToRead(Consumer consumer) {
+        int availablePermits = consumer.getAvailablePermits();
+        if (!consumer.isWritable()) {
+            // If the connection is not currently writable, we issue the read request anyway, but for a single
+            // message. The intent here is to keep use the request as a notification mechanism while avoiding to
+            // read and dispatch a big batch of messages which will need to wait before getting written to the
+            // socket.
+            availablePermits = 1;
+        }
+
+        int messagesToRead = Math.min(availablePermits, readBatchSize);
+        // if turn of precise dispatcher flow control, adjust the records to read
+        if (consumer.isPreciseDispatcherFlowControl()) {
+            int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
+            messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
+        }
+
+        // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
+        // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
+        // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
+        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
+            if (topic.getDispatchRateLimiter().isPresent()
+                    && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+                DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
+                if (!topicRateLimiter.hasMessageDispatchPermit()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
+                                topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    topic.getBrokerService().executor().schedule(() -> {
+                        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                        if (currentConsumer != null && !havePendingRead) {
+                            readMoreEntries(currentConsumer);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
+                                                + " havePendingRead {}",
+                                        topic.getName(), currentConsumer, havePendingRead);
+                            }
+                        }
+                    }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+                    return -1;
+                } else {
+                    // if dispatch-rate is in msg then read only msg according to available permit
+                    long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
+                    if (availablePermitsOnMsg > 0) {
+                        messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
+                    }
+                }
+            }
+
+            if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+                if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
+                                        + " schedule after a {}",
+                                name, dispatchRateLimiter.get().getDispatchRateOnMsg(),
+                                dispatchRateLimiter.get().getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    topic.getBrokerService().executor().schedule(() -> {
+                        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                        if (currentConsumer != null && !havePendingRead) {
+                            readMoreEntries(currentConsumer);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}",
+                                        topic.getName(), currentConsumer, havePendingRead);
+                            }
+                        }
+                    }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+                    return -1;
+                } else {
+                    // if dispatch-rate is in msg then read only msg according to available permit
+                    long subPermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
+                    if (subPermitsOnMsg > 0) {
+                        messagesToRead = Math.min(messagesToRead, (int) subPermitsOnMsg);
+                    }
+                }
+            }
+        }
+
+        // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
+        return Math.max(messagesToRead, 1);
+    }
+
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
         topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
new file mode 100644
index 0000000..9340e17
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Lists;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
+
+/**
+ * A {@link PersistentDispatcherMultipleConsumers} implemented {@link StreamingDispatcher}.
+ * It'll use {@link StreamingEntryReader} to read new entries instead read as micro batch from managed ledger.
+ */
+@Slf4j
+public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers
+    implements StreamingDispatcher {
+
+    private final StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl) cursor,
+            this, topic);
+
+    public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
+                                                          Subscription subscription) {
+        super(topic, cursor, subscription);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
+
+        ReadType readType = (ReadType) ctx.ctx;
+        if (ctx.isLast()) {
+            readFailureBackoff.reduceToHalf();
+            if (readType == ReadType.Normal) {
+                havePendingRead = false;
+            } else {
+                havePendingReplayRead = false;
+            }
+        }
+
+        if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
+            int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Increasing read batch size from {} to {}", name, readBatchSize, newReadBatchSize);
+            }
+            readBatchSize = newReadBatchSize;
+        }
+
+        if (shouldRewindBeforeReadingOrReplaying && readType == ReadType.Normal) {
+            // All consumers got disconnected before the completion of the read operation
+            entry.release();
+            cursor.rewind();
+            shouldRewindBeforeReadingOrReplaying = false;
+            readMoreEntries();
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Distributing a messages to {} consumers", name, consumerList.size());
+        }
+
+        cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
+                .getNextValidPosition((PositionImpl) entry.getPosition()));
+        sendMessagesToConsumers(readType, Lists.newArrayList(entry));
+        ctx.recycle();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void canReadMoreEntries(boolean withBackoff) {
+        havePendingRead = false;
+        topic.getBrokerService().executor().schedule(() -> {
+        topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(), SafeRun.safeRun(() -> {
+                synchronized (PersistentStreamingDispatcherMultipleConsumers.this) {
+                    if (!havePendingRead) {
+                        log.info("[{}] Scheduling read operation", name);
+                        readMoreEntries();
+                    } else {
+                        log.info("[{}] Skipping read since we have pendingRead", name);
+                    }
+                }
+            }));
+        }, withBackoff
+                ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void notifyConsumersEndOfTopic() {
+        if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
+            // Topic has been terminated and there are no more entries to read
+            // Notify the consumer only if all the messages were already acknowledged
+            consumerList.forEach(Consumer::reachedEndOfTopic);
+        }
+    }
+
+    @Override
+    protected void cancelPendingRead() {
+        if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
+            havePendingRead = false;
+        }
+    }
+
+    @Override
+    public void readMoreEntries() {
+        // totalAvailablePermits may be updated by other threads
+        int currentTotalAvailablePermits = totalAvailablePermits;
+        if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
+            int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
+
+            if (-1 == messagesToRead) {
+                // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
+                return;
+            }
+
+            Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
+
+            if (!messagesToReplayNow.isEmpty()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
+                            consumerList.size());
+                }
+
+                havePendingReplayRead = true;
+                Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
+                        ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
+                // clear already acked positions from replay bucket
+
+                deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
+                        ((PositionImpl) position).getEntryId()));
+                // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read
+                // next entries as readCompletedEntries-callback was never called
+                if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
+                    havePendingReplayRead = false;
+                    readMoreEntries();
+                }
+            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
+                log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
+                        totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
+            } else if (!havePendingRead) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
+                            consumerList.size());
+                }
+                havePendingRead = true;
+                streamingEntryReader.asyncReadEntries(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
+                        ReadType.Normal);
+            } else {
+                log.debug("[{}] Cannot schedule next read until previous one is done", name);
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Consumer buffer is full, pause reading", name);
+            }
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
new file mode 100644
index 0000000..b4e4ed3
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.collect.Lists;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
+import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+
+/**
+ * A {@link PersistentDispatcherSingleActiveConsumer} implemented {@link StreamingDispatcher}.
+ * It'll use {@link StreamingEntryReader} to read new entries instead read as micro batch from managed ledger.
+ */
+@Slf4j
+public class PersistentStreamingDispatcherSingleActiveConsumer extends PersistentDispatcherSingleActiveConsumer
+        implements StreamingDispatcher {
+
+    private final StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl) cursor,
+            this, topic);
+
+    public PersistentStreamingDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType,
+                                                             int partitionIndex, PersistentTopic topic,
+                                                             Subscription subscription) {
+        super(cursor, subscriptionType, partitionIndex, topic, subscription);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void canReadMoreEntries(boolean withBackoff) {
+        havePendingRead = false;
+        topic.getBrokerService().executor().schedule(() -> {
+            topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
+                synchronized (PersistentStreamingDispatcherSingleActiveConsumer.this) {
+                    Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                    if (currentConsumer != null && !havePendingRead) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}-{}] Scheduling read ", name, currentConsumer);
+                        }
+                        readMoreEntries(currentConsumer);
+                    } else {
+                        log.info("[{}-{}] Skipping read as we still havePendingRead {}", name,
+                                currentConsumer);
+                    }
+                }
+            }));
+        }, withBackoff
+                ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected void cancelPendingRead() {
+        if (havePendingRead && streamingEntryReader.cancelReadRequests()) {
+            havePendingRead = false;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized void notifyConsumersEndOfTopic() {
+        if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
+            // Topic has been terminated and there are no more entries to read
+            // Notify the consumer only if all the messages were already acknowledged
+            consumers.forEach(Consumer::reachedEndOfTopic);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
+        topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(name, safeRun(() -> {
+            internalReadEntryComplete(entry, ctx);
+        }));
+    }
+
+    public synchronized void internalReadEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
+        if (ctx.isLast()) {
+            readFailureBackoff.reduceToHalf();
+            havePendingRead = false;
+        }
+
+        if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
+            int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
+            if (log.isDebugEnabled()) {
+                log.debug("[{}-{}] Increasing read batch size from {} to {}", name,
+                        ((Consumer) ctx.ctx).consumerName(), readBatchSize, newReadBatchSize);
+            }
+            readBatchSize = newReadBatchSize;
+        }
+
+        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+
+        if (isKeyHashRangeFiltered) {
+            byte[] key = peekStickyKey(entry.getDataBuffer());
+            Consumer consumer = stickyKeyConsumerSelector.select(key);
+            // Skip the entry if it's not for current active consumer.
+            if (consumer == null || currentConsumer != consumer) {
+                entry.release();
+                return;
+            }
+        }
+        Consumer consumer = (Consumer) ctx.ctx;
+        ctx.recycle();
+        if (currentConsumer == null || consumer != currentConsumer) {
+            // Active consumer has changed since the read request has been issued. We need to rewind the cursor and
+            // re-issue the read request for the new consumer
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Rewind because no available consumer found to dispatch message to.", name);
+            }
+
+            entry.release();
+            streamingEntryReader.cancelReadRequests();
+            havePendingRead = false;
+            if (currentConsumer != null) {
+                notifyActiveConsumerChanged(currentConsumer);
+                readMoreEntries(currentConsumer);
+            }
+        } else {
+            EntryBatchSizes batchSizes = EntryBatchSizes.get(1);
+            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(1);
+            filterEntriesForConsumer(Lists.newArrayList(entry), batchSizes, sendMessageInfo, batchIndexesAcks,
+                    cursor, false);
+            // Update cursor's read position.
+            cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
+                    .getNextValidPosition((PositionImpl) entry.getPosition()));
+            dispatchEntriesToConsumer(currentConsumer, Lists.newArrayList(entry), batchSizes,
+                    batchIndexesAcks, sendMessageInfo);
+        }
+    }
+
+    @Override
+    protected void readMoreEntries(Consumer consumer) {
+        // consumer can be null when all consumers are disconnected from broker.
+        // so skip reading more entries if currently there is no active consumer.
+        if (null == consumer) {
+            return;
+        }
+
+        if (!havePendingRead && consumer.getAvailablePermits() > 0) {
+            int messagesToRead = calculateNumOfMessageToRead(consumer);
+
+            if (-1 == messagesToRead) {
+                // Skip read as topic/dispatcher has exceed the dispatch rate.
+                return;
+            }
+
+            // Schedule read
+            if (log.isDebugEnabled()) {
+                log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
+            }
+            havePendingRead = true;
+
+            if (consumer.readCompacted()) {
+                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
+            } else {
+                streamingEntryReader.asyncReadEntries(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
+                        consumer);
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer);
+            }
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a95692f..068ae87 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -181,19 +181,24 @@ public class PersistentSubscription implements Subscription {
 
         if (dispatcher == null || !dispatcher.isConsumerConnected()) {
             Dispatcher previousDispatcher = null;
-
+            boolean useStreamingDispatcher = topic.getBrokerService().getPulsar()
+                                                    .getConfiguration().isStreamingDispatch();
             switch (consumer.subType()) {
             case Exclusive:
                 if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
                     previousDispatcher = dispatcher;
-                    dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor,
-                            SubType.Exclusive, 0, topic, this);
+                    dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherSingleActiveConsumer(cursor,
+                            SubType.Exclusive, 0, topic, this) :
+                            new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0,
+                                    topic, this);
                 }
                 break;
             case Shared:
                 if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
                     previousDispatcher = dispatcher;
-                    dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
+                    dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherMultipleConsumers(topic,
+                            cursor, this) : new PersistentDispatcherMultipleConsumers(topic,
+                            cursor, this);
                 }
                 break;
             case Failover:
@@ -206,8 +211,10 @@ public class PersistentSubscription implements Subscription {
 
                 if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
                     previousDispatcher = dispatcher;
-                    dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor,
-                            SubType.Failover, partitionIndex, topic, this);
+                    dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherSingleActiveConsumer(cursor,
+                            SubType.Failover, partitionIndex, topic, this) :
+                            new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
+                                    partitionIndex, topic, this);
                 }
                 break;
             case Key_Shared:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
new file mode 100644
index 0000000..7989bbc
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/PendingReadEntryRequest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import io.netty.util.Recycler;
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+/**
+ * Representing a pending read request to read an entry from {@link ManagedLedger} carrying necessary context.
+ */
+@Data
+public class PendingReadEntryRequest {
+
+    private static final Recycler<PendingReadEntryRequest> RECYCLER = new Recycler<PendingReadEntryRequest>() {
+        protected PendingReadEntryRequest newObject(Recycler.Handle<PendingReadEntryRequest> handle) {
+            return new PendingReadEntryRequest(handle);
+        }
+    };
+
+    public static PendingReadEntryRequest create(Object ctx, PositionImpl position) {
+        PendingReadEntryRequest pendingReadEntryRequest = RECYCLER.get();
+        pendingReadEntryRequest.ctx = ctx;
+        pendingReadEntryRequest.position = position;
+        pendingReadEntryRequest.retry = 0;
+        pendingReadEntryRequest.isLast = false;
+        return pendingReadEntryRequest;
+    }
+
+    public void recycle() {
+        entry = null;
+        ctx = null;
+        position = null;
+        retry = -1;
+        recyclerHandle.recycle(this);
+    }
+
+    public boolean isLastRequest() {
+        return isLast;
+    }
+
+    private final Recycler.Handle<PendingReadEntryRequest> recyclerHandle;
+
+    // Entry read from ledger
+    public Entry entry;
+
+    // Passed in context that'll be pass to callback
+    public Object ctx;
+
+    // Position of entry to be read
+    public PositionImpl position;
+
+    // Number of time request has been retried.
+    int retry;
+
+    // If request is the last one of a set of requests.
+    boolean isLast;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
new file mode 100644
index 0000000..814a381
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingDispatcher.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * A {@link Dispatcher} that'll use {@link StreamingEntryReader} to read entries from {@link ManagedLedger}.
+ */
+@InterfaceStability.Unstable
+public interface StreamingDispatcher extends Dispatcher {
+
+    /**
+     * Notify dispatcher issued read entry request has complete.
+     * @param entry Entry read.
+     * @param ctx   Context passed in when issuing read entries request.
+     */
+    void readEntryComplete(Entry entry, PendingReadEntryRequest ctx);
+
+    /**
+     * Notify dispatcher can issue next read request.
+     */
+    void canReadMoreEntries(boolean withBackoff);
+
+    /**
+     * Notify dispatcher to inform consumers reached end of topic.
+     */
+    void notifyConsumersEndOfTopic();
+
+    /**
+     * @return Name of the dispatcher.
+     */
+    String getName();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
new file mode 100644
index 0000000..24f9bcc
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.client.impl.Backoff;
+
+/**
+ * Entry reader that fulfill read request by streamline the read instead of reading with micro batch.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, WaitingEntryCallBack {
+
+    private final int maxRetry = 3;
+
+    // Queue for read request issued yet waiting for complete from managed ledger.
+    private ConcurrentLinkedQueue<PendingReadEntryRequest> issuedReads = new ConcurrentLinkedQueue<>();
+
+    // Queue for read request that's wait for new entries from managed ledger.
+    private ConcurrentLinkedQueue<PendingReadEntryRequest> pendingReads = new ConcurrentLinkedQueue<>();
+
+    private final ManagedCursorImpl cursor;
+
+    private final StreamingDispatcher dispatcher;
+
+    private final PersistentTopic topic;
+
+    private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
+
+    private volatile State state;
+
+    private static final AtomicReferenceFieldUpdater<StreamingEntryReader, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, State.class, "state");
+
+    private volatile int maxReadSizeByte;
+
+    private final Backoff readFailureBackoff = new Backoff(10, TimeUnit.MILLISECONDS,
+            1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
+
+    /**
+     * Read entries in streaming way, that said instead reading with micro batch and send entries to consumer after all
+     * entries in the batch are read from ledger, this method will fire numEntriesToRead requests to managedLedger
+     * and send entry to consumer whenever it is read && all entries before it have been sent to consumer.
+     * @param numEntriesToRead number of entry to read from ledger.
+     * @param maxReadSizeByte maximum byte will be read from ledger.
+     * @param ctx Context send along with read request.
+     */
+    public synchronized void asyncReadEntries(int numEntriesToRead, int maxReadSizeByte, Object ctx) {
+        if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
+            internalCancelReadRequests();
+        }
+
+        if (!issuedReads.isEmpty() || !pendingReads.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] There's pending streaming read not completed yet. Not scheduling next read request.",
+                        cursor.getName());
+            }
+            return;
+        }
+
+        PositionImpl nextReadPosition = (PositionImpl) cursor.getReadPosition();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) cursor.getManagedLedger();
+        // Edge case, when a old ledger is full and new ledger is not yet opened, position can point to next
+        // position of the last confirmed position, but it'll be an invalid position. So try to update the position.
+        if (!managedLedger.isValidPosition(nextReadPosition)) {
+            nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition);
+        }
+        boolean hasEntriesToRead = managedLedger.hasMoreEntries(nextReadPosition);
+        currentReadSizeByte.set(0);
+        STATE_UPDATER.set(this, State.Issued);
+        this.maxReadSizeByte = maxReadSizeByte;
+        for (int c = 0; c < numEntriesToRead; c++) {
+            PendingReadEntryRequest pendingReadEntryRequest = PendingReadEntryRequest.create(ctx, nextReadPosition);
+            // Make sure once we start putting request into pending requests queue, we won't put any following request
+            // to issued requests queue in order to guarantee the order.
+            if (hasEntriesToRead && managedLedger.hasMoreEntries(nextReadPosition)) {
+                issuedReads.offer(pendingReadEntryRequest);
+            } else {
+                pendingReads.offer(pendingReadEntryRequest);
+            }
+            nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition);
+        }
+
+        // Issue requests.
+        for (PendingReadEntryRequest request : issuedReads) {
+            managedLedger.asyncReadEntry(request.position, this, request);
+        }
+
+        if (!pendingReads.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry."
+                        , cursor.getName(), pendingReads.size());
+            }
+            // If new entries are available after we put request into pending queue, fire read.
+            // Else register callback with managed ledger to get notify when new entries are available.
+            if (managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+                entriesAvailable();
+            } else if (managedLedger.isTerminated()) {
+                dispatcher.notifyConsumersEndOfTopic();
+                cleanQueue(pendingReads);
+                if (issuedReads.size() == 0) {
+                    dispatcher.canReadMoreEntries(true);
+                }
+            } else {
+                managedLedger.addWaitingEntryCallBack(this);
+            }
+        }
+    }
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        // Don't block caller thread, complete read entry with dispatcher dedicated thread.
+        topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
+            internalReadEntryComplete(entry, ctx);
+        }));
+    }
+
+    private void internalReadEntryComplete(Entry entry, Object ctx) {
+        PendingReadEntryRequest pendingReadEntryRequest = (PendingReadEntryRequest) ctx;
+        pendingReadEntryRequest.entry = entry;
+        readFailureBackoff.reduceToHalf();
+        Entry readEntry;
+        // If we have entry to send to dispatcher.
+        if (!issuedReads.isEmpty() && issuedReads.peek() == pendingReadEntryRequest) {
+            while (!issuedReads.isEmpty() && issuedReads.peek().entry != null) {
+                PendingReadEntryRequest firstPendingReadEntryRequest = issuedReads.poll();
+                readEntry = firstPendingReadEntryRequest.entry;
+                currentReadSizeByte.addAndGet(readEntry.getLength());
+                //Cancel remaining requests and reset cursor if maxReadSizeByte exceeded.
+                if (currentReadSizeByte.get() > maxReadSizeByte) {
+                    cancelReadRequests(readEntry.getPosition());
+                    dispatcher.canReadMoreEntries(false);
+                    STATE_UPDATER.set(this, State.Completed);
+                    return;
+                } else {
+                    // All request has been completed, mark returned entry as last.
+                    if (issuedReads.isEmpty() && pendingReads.isEmpty()) {
+                        firstPendingReadEntryRequest.isLast = true;
+                        STATE_UPDATER.set(this, State.Completed);
+                    }
+                    dispatcher.readEntryComplete(readEntry, firstPendingReadEntryRequest);
+                }
+            }
+        } else if (!issuedReads.isEmpty() && issuedReads.peek().retry > maxRetry) {
+            cancelReadRequests(issuedReads.peek().position);
+            dispatcher.canReadMoreEntries(true);
+            STATE_UPDATER.set(this, State.Completed);
+        }
+    }
+
+    @Override
+    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+        // Don't block caller thread, complete read entry fail with dispatcher dedicated thread.
+        topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
+            internalReadEntryFailed(exception, ctx);
+        }));
+    }
+
+    private void internalReadEntryFailed(ManagedLedgerException exception, Object ctx) {
+        PendingReadEntryRequest pendingReadEntryRequest = (PendingReadEntryRequest) ctx;
+        PositionImpl readPosition = pendingReadEntryRequest.position;
+        pendingReadEntryRequest.retry++;
+        long waitTimeMillis = readFailureBackoff.next();
+        if (exception.getCause() instanceof TransactionNotSealedException) {
+            waitTimeMillis = 1;
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds",
+                        cursor.getName(), exception.getMessage(), waitTimeMillis / 1000.0);
+            }
+        } else if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
+            log.error("[{} Error reading entries at {} : {} - Retrying to read in {} seconds", cursor.getName(),
+                    readPosition, exception.getMessage(), waitTimeMillis / 1000.0);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds",
+                        cursor.getName(), readPosition, exception.getMessage(), waitTimeMillis / 1000.0);
+            }
+        }
+        if (!issuedReads.isEmpty()) {
+            if (issuedReads.peek().retry > maxRetry) {
+                cancelReadRequests(issuedReads.peek().position);
+                dispatcher.canReadMoreEntries(true);
+                STATE_UPDATER.set(this, State.Completed);
+                return;
+            }
+            if (pendingReadEntryRequest.retry <= maxRetry) {
+                retryReadRequest(pendingReadEntryRequest, waitTimeMillis);
+            }
+        }
+    }
+
+    // Cancel all issued and pending request and update cursor's read position.
+    private void cancelReadRequests(Position position) {
+        if (!issuedReads.isEmpty()) {
+            cleanQueue(issuedReads);
+            cursor.seek(position);
+        }
+
+        if (!pendingReads.isEmpty()) {
+            cleanQueue(pendingReads);
+        }
+    }
+
+    private void internalCancelReadRequests() {
+        Position readPosition = !issuedReads.isEmpty() ? issuedReads.peek().position : pendingReads.peek().position;
+        cancelReadRequests(readPosition);
+    }
+
+    public boolean cancelReadRequests() {
+        if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
+            // Don't block caller thread, complete cancel read with dispatcher dedicated thread.
+            topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(), SafeRun.safeRun(() -> {
+                synchronized (StreamingEntryReader.this) {
+                    if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
+                        internalCancelReadRequests();
+                    }
+                }
+            }));
+            return true;
+        }
+        return false;
+    }
+
+    private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
+        while (!queue.isEmpty()) {
+            PendingReadEntryRequest pendingReadEntryRequest = queue.poll();
+            if (pendingReadEntryRequest.entry != null) {
+                pendingReadEntryRequest.entry.release();
+                pendingReadEntryRequest.recycle();
+            }
+        }
+    }
+
+    private void retryReadRequest(PendingReadEntryRequest pendingReadEntryRequest, long delay) {
+        topic.getBrokerService().executor().schedule(() -> {
+            // Jump again into dispatcher dedicated thread
+            topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
+                    SafeRun.safeRun(() -> {
+                ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) cursor.getManagedLedger();
+                managedLedger.asyncReadEntry(pendingReadEntryRequest.position, this, pendingReadEntryRequest);
+            }));
+        }, delay, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void entriesAvailable() {
+        topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
+            internalEntriesAvailable();
+        }));
+    }
+
+    private synchronized void internalEntriesAvailable() {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}} Streaming entry reader get notification of newly added entries from managed ledger,"
+                    + " trying to issued pending read requests.", cursor.getName());
+        }
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) cursor.getManagedLedger();
+        List<PendingReadEntryRequest> newlyIssuedRequests = new ArrayList<>();
+        if (!pendingReads.isEmpty()) {
+            // Edge case, when a old ledger is full and new ledger is not yet opened, position can point to next
+            // position of the last confirmed position, but it'll be an invalid position. So try to update the position.
+            if (!managedLedger.isValidPosition(pendingReads.peek().position)) {
+                pendingReads.peek().position = managedLedger.getNextValidPosition(pendingReads.peek().position);
+            }
+            while (!pendingReads.isEmpty() && managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+                PendingReadEntryRequest next = pendingReads.poll();
+                issuedReads.offer(next);
+                newlyIssuedRequests.add(next);
+                // Need to update the position because when the PendingReadEntryRequest is created, the position could
+                // be all set to managed ledger's last confirmed position.
+                if (!pendingReads.isEmpty()) {
+                    pendingReads.peek().position = managedLedger.getNextValidPosition(next.position);
+                }
+            }
+
+            for (PendingReadEntryRequest request : newlyIssuedRequests) {
+                managedLedger.asyncReadEntry(request.position, this, request);
+            }
+
+            if (!pendingReads.isEmpty()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry."
+                            , cursor.getName(), pendingReads.size());
+                }
+                if (managedLedger.hasMoreEntries(pendingReads.peek().position)) {
+                    entriesAvailable();
+                } else {
+                    managedLedger.addWaitingEntryCallBack(this);
+                }
+            }
+        }
+    }
+
+    protected State getState() {
+        return STATE_UPDATER.get(this);
+    }
+
+    enum State {
+        Issued, Canceling, Canceled, Completed;
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
new file mode 100644
index 0000000..9a205ed
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 3497b73..a631db5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -103,7 +104,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     private ChannelHandlerContext channelCtx;
     private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
     private ZooKeeper mockZk;
-    private PulsarService pulsar;
+    protected PulsarService pulsar;
     final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic";
     final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic";
 
@@ -208,7 +209,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     void setupMLAsyncCallbackMocks() {
         ledgerMock = mock(ManagedLedger.class);
-        cursorMock = mock(ManagedCursor.class);
+        cursorMock = mock(ManagedCursorImpl.class);
 
         doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
         doReturn("mockCursor").when(cursorMock).getName();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 8313567..dd90017 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -270,6 +270,11 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
 
     @Test
     public void testSimpleConsumerEventsWithPartition() throws Exception {
+        // Resetting ActiveConsumerFailoverDelayTimeMillis else if testActiveConsumerFailoverWithDelay get executed
+        // first could cause this test to fail.
+        conf.setActiveConsumerFailoverDelayTimeMillis(0);
+        restartBroker();
+
         int numPartitions = 4;
 
         final String topicName = "persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition-" + System.nanoTime();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 52a67cb..7b428a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -141,7 +141,7 @@ import io.netty.buffer.Unpooled;
 /**
  */
 public class PersistentTopicTest extends MockedBookKeeperTestCase {
-    private PulsarService pulsar;
+    protected PulsarService pulsar;
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
@@ -1199,7 +1199,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
     @SuppressWarnings("unchecked")
     void setupMLAsyncCallbackMocks() {
         ledgerMock = mock(ManagedLedger.class);
-        cursorMock = mock(ManagedCursor.class);
+        cursorMock = mock(ManagedCursorImpl.class);
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
         doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
new file mode 100644
index 0000000..66f49c8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentDispatcherFailoverConsumerTest with {@link StreamingDispatcher}
+ */
+public class PersistentDispatcherFailoverConsumerStreamingDispatcherTest extends
+        PersistentDispatcherFailoverConsumerTest {
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.setup();
+        pulsar.getConfiguration().setStreamingDispatch(true);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
new file mode 100644
index 0000000..ee56b5c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentFailoverStreamingDispatcherE2ETest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentFailoverE2ETest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * PersistentFailoverE2ETest with {@link StreamingDispatcher}
+ */
+public class PersistentFailoverStreamingDispatcherE2ETest extends PersistentFailoverE2ETest {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setStreamingDispatch(true);
+        super.setup();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
new file mode 100644
index 0000000..bdaf88e
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.DispatcherBlockConsumerTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * DispatcherBlockConsumerTest with {@link StreamingDispatcher}
+ */
+public class PersistentStreamingDispatcherBlockConsumerTest extends DispatcherBlockConsumerTest {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.setup();
+        conf.setStreamingDispatch(true);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
new file mode 100644
index 0000000..ff170ed
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.SubscriptionMessageDispatchThrottlingTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * SubscriptionMessageDispatchThrottlingTest with {@link StreamingDispatcher}
+ */
+public class PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest
+    extends SubscriptionMessageDispatchThrottlingTest {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.setup();
+        conf.setStreamingDispatch(true);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
new file mode 100644
index 0000000..b1efcf6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentTopicE2ETest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentTopicE2ETest with {@link StreamingDispatcher}
+ */
+public class PersistentTopicStreamingDispatcherE2ETest extends PersistentTopicE2ETest {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setStreamingDispatch(true);
+        super.baseSetup();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
new file mode 100644
index 0000000..f0057d9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.PersistentTopicTest;
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * PersistentTopicTest with {@link StreamingDispatcher}
+ */
+public class PersistentTopicStreamingDispatcherTest extends PersistentTopicTest {
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.setup();
+        pulsar.getConfiguration().setStreamingDispatch(true);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
new file mode 100644
index 0000000..1bcdec7
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
+import org.apache.pulsar.client.api.SimpleProducerConsumerTest;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * SimpleProducerConsumerTest with {@link StreamingDispatcher}
+ */
+public class SimpleProducerConsumerTestStreamingDispatcherTest extends SimpleProducerConsumerTest {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.setup();
+        conf.setStreamingDispatch(true);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
new file mode 100644
index 0000000..332f431
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.streamingdispatch;
+
+import com.google.common.base.Charsets;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tests for {@link StreamingEntryReader}
+ */
+@PrepareForTest({ManagedLedgerImpl.class})
+public class StreamingEntryReaderTests extends MockedBookKeeperTestCase {
+
+    private static final Charset Encoding = Charsets.UTF_8;
+    private PersistentTopic mockTopic;
+    private StreamingDispatcher mockDispatcher;
+    private BrokerService mockBrokerService;
+    private ScheduledExecutorService scheduledExecutorService;
+    private OrderedExecutor orderedExecutor;
+    private ManagedLedgerConfig config;
+    private ManagedLedgerImpl ledger;
+    private ManagedCursor cursor;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+        orderedExecutor = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(1)
+                .name("StreamingEntryReaderTests").build();
+        mockTopic = mock(PersistentTopic.class);
+        mockBrokerService = mock(BrokerService.class);
+        mockDispatcher = mock(StreamingDispatcher.class);
+        config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
+        ledger = spy((ManagedLedgerImpl) factory.open("my_test_ledger", config));
+        cursor = ledger.openCursor("test");
+        when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
+        when(mockBrokerService.executor()).thenReturn(scheduledExecutorService);
+        when(mockBrokerService.getTopicOrderedExecutor()).thenReturn(orderedExecutor);
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return null;
+            }
+        }).when(mockDispatcher).notifyConsumersEndOfTopic();
+    }
+
+    @Test
+    public void testCanReadEntryFromMLedgerHappyPath() throws Exception {
+        AtomicInteger entryCount = new AtomicInteger(0);
+        Stack<Position> positions = new Stack<>();
+
+        for (int i = 0; i < 150; i++) {
+            ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
+        }
+
+        StreamingEntryReader streamingEntryReader =new StreamingEntryReader((ManagedCursorImpl) cursor,
+                mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                assertEquals(new String(entry.getData()), String.format("message-%d", entryCount.getAndIncrement()));
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) entry.getPosition()));
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), any(PendingReadEntryRequest.class));
+
+        streamingEntryReader.asyncReadEntries(50, 700, null);
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> entryCount.get() == 50);
+        // Check cursor's read position has been properly updated
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        streamingEntryReader.asyncReadEntries(50, 700, null);
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> entryCount.get() == 100);
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        streamingEntryReader.asyncReadEntries(50, 700, null);
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> entryCount.get() == 150);
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+    }
+
+    @Test
+    public void testCanReadEntryFromMLedgerSizeExceededLimit() throws Exception {
+        AtomicBoolean readComplete = new AtomicBoolean(false);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+        int size = "mmmmmmmmmmessage-0".getBytes().length;
+        for (int i = 0; i < 15; i++) {
+            ledger.addEntry(String.format("mmmmmmmmmmessage-%d", i).getBytes(Encoding));
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                entries.add(new String(entry.getData()));
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) entry.getPosition()));
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), any(PendingReadEntryRequest.class));
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                readComplete.set(true);
+                return null;
+            }
+        ).when(mockDispatcher).canReadMoreEntries(anyBoolean());
+
+        PositionImpl position = ledger.getPositionAfterN(ledger.getFirstPosition(), 3, ManagedLedgerImpl.PositionBound.startExcluded);
+        // Make reading from mledger return out of order.
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+                AsyncCallbacks.ReadEntryCallback cb = invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+                executor.schedule(() -> {
+                    cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), position.getEntryId(), "mmmmmmmmmmessage-2".getBytes()),
+                            invocationOnMock.getArgument(2));
+                }, 200, TimeUnit.MILLISECONDS);
+                return null;
+            }
+        }).when(ledger).asyncReadEntry(eq(position), any(), any());
+
+        // Only 2 entries should be read with this request.
+        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> readComplete.get());
+        assertEquals(entries.size(), 2);
+        // Assert cursor's read position has been properly updated to the third entry, since we should only read
+        // 2 retries with previous request
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        reset(ledger);
+        readComplete.set(false);
+        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> readComplete.get());
+        readComplete.set(false);
+        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> readComplete.get());
+        readComplete.set(false);
+        streamingEntryReader.asyncReadEntries(6, size * 2 + 1, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> readComplete.get());
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        assertEquals(entries.size(), 8);
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("mmmmmmmmmmessage-%d", i), entries.get(i));
+        }
+    }
+
+    @Test
+    public void testCanReadEntryFromMLedgerWaitingForNewEntry() throws Exception {
+        AtomicInteger entryCount = new AtomicInteger(0);
+        AtomicBoolean entryProcessed = new AtomicBoolean(false);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+        for (int i = 0; i < 7; i++) {
+            ledger.addEntry(String.format("message-%d", i).getBytes(Encoding));
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                entries.add(new String(entry.getData()));
+                entryCount.getAndIncrement();
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) entry.getPosition()));
+                entryProcessed.set(true);
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), any(PendingReadEntryRequest.class));
+
+        streamingEntryReader.asyncReadEntries(5,  100, null);
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> entryCount.get() == 5);
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        streamingEntryReader.asyncReadEntries(5, 100, null);
+        // We only write 7 entries initially so only 7 entries can be read.
+        await().atMost(300, TimeUnit.MILLISECONDS).until(() -> entryCount.get() == 7);
+        // Add new entry and await for it to be send to reader.
+        entryProcessed.set(false);
+        ledger.addEntry("message-7".getBytes(Encoding));
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> entryProcessed.get());
+        assertEquals(entries.size(), 8);
+        entryProcessed.set(false);
+        ledger.addEntry("message-8".getBytes(Encoding));
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> entryProcessed.get());
+        assertEquals(entries.size(), 9);
+        entryProcessed.set(false);
+        ledger.addEntry("message-9".getBytes(Encoding));
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> entryProcessed.get());
+        assertEquals(entries.size(), 10);
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+    }
+
+    @Test
+    public void testCanCancelReadEntryRequestAndResumeReading() throws Exception {
+        Map<Position, String> messages = new HashMap<>();
+        AtomicInteger count = new AtomicInteger(0);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+
+        for (int i = 0; i < 20; i++) {
+            String msg = String.format("message-%d", i);
+            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                entries.add(new String(entry.getData()));
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) entry.getPosition()));
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), any(PendingReadEntryRequest.class));
+
+        // Only return 5 entries
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+                AsyncCallbacks.ReadEntryCallback cb = invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+                PositionImpl position = invocationOnMock.getArgument(0, PositionImpl.class);
+                int c = count.getAndIncrement();
+                if (c < 5) {
+                    cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), position.getEntryId(),
+                            messages.get(position).getBytes()),
+                            invocationOnMock.getArgument(2));
+                }
+                return null;
+            }
+        }).when(ledger).asyncReadEntry(any(), any(), any());
+
+        streamingEntryReader.asyncReadEntries(20,  200, null);
+        streamingEntryReader.cancelReadRequests();
+        await().atMost(10000, TimeUnit.MILLISECONDS).until(() -> streamingEntryReader.getState() == StreamingEntryReader.State.Canceled);
+        // Only have 5 entry as we make ledger only return 5 entries and cancel the request.
+        assertEquals(entries.size(), 5);
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        // Clear mock and try to read remaining entries
+        reset(ledger);
+        streamingEntryReader.asyncReadEntries(15,  200, null);
+        streamingEntryReader.cancelReadRequests();
+        await().atMost(10000, TimeUnit.MILLISECONDS).until(() -> streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+        // Only have 5 entry as we make ledger only return 5 entries and cancel the request.
+        assertEquals(entries.size(), 20);
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        // Make sure message still returned in order
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+    }
+
+    @Test
+    public void testCanHandleExceptionAndRetry() throws Exception {
+        Map<Position, String> messages = new HashMap<>();
+        AtomicBoolean entryProcessed = new AtomicBoolean(false);
+        AtomicInteger count = new AtomicInteger(0);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+        for (int i = 0; i < 12; i++) {
+            String msg = String.format("message-%d", i);
+            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                positions.push(entry.getPosition());
+                entries.add(new String(entry.getData()));
+                cursor.seek(ledger.getNextValidPosition((PositionImpl) entry.getPosition()));
+
+                if (entries.size() == 6 || entries.size() == 12) {
+                    entryProcessed.set(true);
+                }
+                return null;
+            }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), any(PendingReadEntryRequest.class));
+
+        // Make reading from mledger throw exception randomly.
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+                AsyncCallbacks.ReadEntryCallback cb = invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+                PositionImpl position = invocationOnMock.getArgument(0, PositionImpl.class);
+                int c = count.getAndIncrement();
+                if (c >= 3 && c < 5 || c >= 9 && c < 11) {
+                    cb.readEntryFailed(new ManagedLedgerException.TooManyRequestsException("Fake exception."),
+                            invocationOnMock.getArgument(2));
+                } else {
+                    cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), position.getEntryId(),
+                            messages.get(position).getBytes()),
+                            invocationOnMock.getArgument(2));
+                }
+                return null;
+            }
+        }).when(ledger).asyncReadEntry(any(), any(), any());
+
+        streamingEntryReader.asyncReadEntries(6,  100, null);
+        await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> entryProcessed.get());
+        assertEquals(entries.size(), 6);
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        entryProcessed.set(false);
+        streamingEntryReader.asyncReadEntries(6, 100, null);
+        await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> entryProcessed.get());
+        assertEquals(entries.size(), 12);
+        assertEquals(cursor.getReadPosition(), ledger.getNextValidPosition((PositionImpl) positions.peek()));
+        // Make sure message still returned in order
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+    }
+
+    @Test
+    public void testWillCancelReadAfterExhaustingRetry() throws Exception {
+        Map<Position, String> messages = new HashMap<>();
+        AtomicInteger count = new AtomicInteger(0);
+        Stack<Position> positions = new Stack<>();
+        List<String> entries = new ArrayList<>();
+        for (int i = 0; i < 12; i++) {
+            String msg = String.format("message-%d", i);
+            messages.put(ledger.addEntry(msg.getBytes(Encoding)), msg);
+        }
+
+        StreamingEntryReader streamingEntryReader =
+                new StreamingEntryReader((ManagedCursorImpl) cursor, mockDispatcher, mockTopic);
+
+        doAnswer((InvocationOnMock invocationOnMock) -> {
+                    Entry entry = invocationOnMock.getArgument(0, Entry.class);
+                    positions.push(entry.getPosition());
+                    cursor.seek(ledger.getNextValidPosition((PositionImpl) entry.getPosition()));
+                    entries.add(new String(entry.getData()));
+                    return null;
+                }
+        ).when(mockDispatcher).readEntryComplete(any(Entry.class), any(PendingReadEntryRequest.class));
+
+        // Fail after first 3 read.
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+                AsyncCallbacks.ReadEntryCallback cb = invocationOnMock.getArgument(1, AsyncCallbacks.ReadEntryCallback.class);
+                PositionImpl position = invocationOnMock.getArgument(0, PositionImpl.class);
+                int c = count.getAndIncrement();
+                if (c >= 3) {
+                    cb.readEntryFailed(new ManagedLedgerException.TooManyRequestsException("Fake exception."),
+                            invocationOnMock.getArgument(2));
+                } else {
+                    cb.readEntryComplete(EntryImpl.create(position.getLedgerId(), position.getEntryId(),
+                            messages.get(position).getBytes()),
+                            invocationOnMock.getArgument(2));
+                }
+                return null;
+            }
+        }).when(ledger).asyncReadEntry(any(), any(), any());
+
+        streamingEntryReader.asyncReadEntries(5,  100, null);
+        await().atMost(10, TimeUnit.SECONDS).until(() -> streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+        // Issued 5 read, should only have 3 entries as others were canceled after exhausting retries.
+        assertEquals(entries.size(), 3);
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+        reset(ledger);
+        streamingEntryReader.asyncReadEntries(5,  100, null);
+        await().atMost(500, TimeUnit.MILLISECONDS).until(() -> streamingEntryReader.getState() == StreamingEntryReader.State.Completed);
+        assertEquals(entries.size(), 8);
+        for (int i = 0; i < entries.size(); i++) {
+            assertEquals(String.format("message-%d", i), entries.get(i));
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 80bf968..62a6edd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -620,6 +620,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
         int receivedMsgCount = 0;
         for (int i = 0; i < totalProducedMsgs; i++) {
             Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
+            assertTrue(msg != null);
             if (!unackMessages.contains(i)) {
                 consumer.acknowledge(msg);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 7ed88f5..2b63411 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
 import com.google.common.collect.Sets;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -35,6 +36,8 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.awaitility.Awaitility.await;
+
 public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest {
     private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
 
@@ -189,17 +192,13 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         Assert.assertTrue(isMessageRateUpdate);
         Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);
 
-        long start = System.currentTimeMillis();
         // Asynchronously produce messages
         for (int i = 0; i < numProducedMessages; i++) {
             final String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
-        latch.await();
+        await().atMost(2500, TimeUnit.MILLISECONDS).until(() -> latch.getCount() == 0);
         Assert.assertEquals(totalReceived.get(), numProducedMessages);
-        long end = System.currentTimeMillis();
-
-        Assert.assertTrue((end - start) >= 2000);
 
         consumer.close();
         producer.close();
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 52f6d92..f31bd80 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -503,6 +503,7 @@ The value of 0 disables message-byte dispatch-throttling.|0|
 |dispatcherMinReadBatchSize|The minimum number of entries to read from BookKeeper. By default, it is 1 entry. When there is an error occurred on reading entries from bookkeeper, the broker will backoff the batch size to this minimum number.|1|
 |dispatcherMaxRoundRobinBatchSize|The maximum number of entries to dispatch for a shared subscription. By default, it is 20 entries.|20|
 | preciseDispatcherFlowControl | Precise dispathcer flow control according to history message number of each entry. | false |
+| streamingDispatch | Whether to use streaming read dispatcher. It can be useful when there's a huge backlog to drain and instead of read with micro batch we can streamline the read from bookkeeper to make the most of consumer capacity till we hit bookkeeper read limit or consumer process limit, then we can use consumer flow control to tune the speed. This feature is currently in preview and can be changed in subsequent release. | false |
 | maxConcurrentLookupRequest | Maximum number of concurrent lookup request that the broker allows to throttle heavy incoming lookup traffic. | 50000 |
 | maxConcurrentTopicLoadRequest | Maximum number of concurrent topic loading request that the broker allows to control the number of zk-operations. | 5000 |
 | maxConcurrentNonPersistentMessagePerConnection | Maximum number of concurrent non-persistent message that can be processed per connection. | 1000 |