You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2023/03/05 17:43:45 UTC

[bookkeeper] branch master updated: Pass BookieRequestHandler instead of Channel to the request processors (#3835)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dfde3d6836 Pass BookieRequestHandler instead of Channel to the request processors (#3835)
dfde3d6836 is described below

commit dfde3d6836094cc39d8a3603cb268ca0f633350b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Mar 5 09:43:37 2023 -0800

    Pass BookieRequestHandler instead of Channel to the request processors (#3835)
    
    * Pass BookieRequestHandler instead of Channel to the request processors
    
    * Fixed checkstyle
    
    * Fixed ForceLedgerProcessorV3Test
    
    * Fixed TestBookieRequestProcessor
    
    * Fixed line length
---
 .../bookkeeper/processor/RequestProcessor.java     |  4 +-
 .../bookkeeper/proto/BookieRequestHandler.java     | 11 ++-
 .../bookkeeper/proto/BookieRequestProcessor.java   | 94 ++++++++++++----------
 .../bookkeeper/proto/ForceLedgerProcessorV3.java   |  7 +-
 .../bookkeeper/proto/GetBookieInfoProcessorV3.java |  5 +-
 .../proto/GetListOfEntriesOfLedgerProcessorV3.java |  5 +-
 .../proto/LongPollReadEntryProcessorV3.java        |  5 +-
 .../bookkeeper/proto/PacketProcessorBase.java      | 28 ++++---
 .../bookkeeper/proto/PacketProcessorBaseV3.java    |  7 +-
 .../bookkeeper/proto/ReadEntryProcessor.java       | 14 ++--
 .../bookkeeper/proto/ReadEntryProcessorV3.java     | 11 +--
 .../bookkeeper/proto/ReadLacProcessorV3.java       |  5 +-
 .../bookkeeper/proto/WriteEntryProcessor.java      | 12 +--
 .../bookkeeper/proto/WriteEntryProcessorV3.java    | 13 +--
 .../bookkeeper/proto/WriteLacProcessorV3.java      |  8 +-
 .../proto/ForceLedgerProcessorV3Test.java          | 19 ++++-
 .../proto/GetBookieInfoProcessorV3Test.java        | 12 ++-
 .../proto/LongPollReadEntryProcessorV3Test.java    |  9 ++-
 .../bookkeeper/proto/ReadEntryProcessorTest.java   | 15 +++-
 .../proto/TestBookieRequestProcessor.java          | 14 +++-
 .../bookkeeper/proto/WriteEntryProcessorTest.java  | 45 ++++++-----
 .../proto/WriteEntryProcessorV3Test.java           | 13 ++-
 22 files changed, 223 insertions(+), 133 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 50b97e906b..5a4238e64d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -20,7 +20,7 @@
  */
 package org.apache.bookkeeper.processor;
 
-import io.netty.channel.Channel;
+import org.apache.bookkeeper.proto.BookieRequestHandler;
 
 /**
  * A request processor that is used for processing requests at bookie side.
@@ -41,5 +41,5 @@ public interface RequestProcessor extends AutoCloseable {
      * @param channel
      *          channel received the given request <i>r</i>
      */
-    void processRequest(Object r, Channel channel);
+    void processRequest(Object r, BookieRequestHandler channel);
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
index 93be69cd37..c9d65a7317 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
@@ -32,20 +32,27 @@ import org.slf4j.LoggerFactory;
 /**
  * Serverside handler for bookkeeper requests.
  */
-class BookieRequestHandler extends ChannelInboundHandlerAdapter {
+public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class);
     private final RequestProcessor requestProcessor;
     private final ChannelGroup allChannels;
 
+    private ChannelHandlerContext ctx;
+
     BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
         this.requestProcessor = processor;
         this.allChannels = allChannels;
     }
 
+    public ChannelHandlerContext ctx() {
+        return ctx;
+    }
+
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         LOG.info("Channel connected  {}", ctx.channel());
+        this.ctx = ctx;
         super.channelActive(ctx);
     }
 
@@ -75,6 +82,6 @@ class BookieRequestHandler extends ChannelInboundHandlerAdapter {
             ctx.fireChannelRead(msg);
             return;
         }
-        requestProcessor.processRequest(msg, ctx.channel());
+        requestProcessor.processRequest(msg, this);
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index f7f4eceda3..9237c451ed 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -299,7 +299,8 @@ public class BookieRequestProcessor implements RequestProcessor {
     }
 
     @Override
-    public void processRequest(Object msg, Channel c) {
+    public void processRequest(Object msg, BookieRequestHandler requestHandler) {
+        Channel channel = requestHandler.ctx().channel();
         // If we can decode this packet as a Request protobuf packet, process
         // it as a version 3 packet. Else, just use the old protocol.
         if (msg instanceof BookkeeperProtocol.Request) {
@@ -309,16 +310,16 @@ public class BookieRequestProcessor implements RequestProcessor {
                 BookkeeperProtocol.BKPacketHeader header = r.getHeader();
                 switch (header.getOperation()) {
                     case ADD_ENTRY:
-                        processAddRequestV3(r, c);
+                        processAddRequestV3(r, requestHandler);
                         break;
                     case READ_ENTRY:
-                        processReadRequestV3(r, c);
+                        processReadRequestV3(r, requestHandler);
                         break;
                     case FORCE_LEDGER:
-                        processForceLedgerRequestV3(r, c);
+                        processForceLedgerRequestV3(r, requestHandler);
                         break;
                     case AUTH:
-                        LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
+                        LOG.info("Ignoring auth operation from client {}", channel.remoteAddress());
                         BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
                                 .newBuilder()
                                 .setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
@@ -328,29 +329,29 @@ public class BookieRequestProcessor implements RequestProcessor {
                                 .newBuilder().setHeader(r.getHeader())
                                 .setStatus(BookkeeperProtocol.StatusCode.EOK)
                                 .setAuthResponse(message);
-                        c.writeAndFlush(authResponse.build());
+                        channel.writeAndFlush(authResponse.build());
                         break;
                     case WRITE_LAC:
-                        processWriteLacRequestV3(r, c);
+                        processWriteLacRequestV3(r, requestHandler);
                         break;
                     case READ_LAC:
-                        processReadLacRequestV3(r, c);
+                        processReadLacRequestV3(r, requestHandler);
                         break;
                     case GET_BOOKIE_INFO:
-                        processGetBookieInfoRequestV3(r, c);
+                        processGetBookieInfoRequestV3(r, requestHandler);
                         break;
                     case START_TLS:
-                        processStartTLSRequestV3(r, c);
+                        processStartTLSRequestV3(r, requestHandler);
                         break;
                     case GET_LIST_OF_ENTRIES_OF_LEDGER:
-                        processGetListOfEntriesOfLedgerProcessorV3(r, c);
+                        processGetListOfEntriesOfLedgerProcessorV3(r, requestHandler);
                         break;
                     default:
                         LOG.info("Unknown operation type {}", header.getOperation());
                         BookkeeperProtocol.Response.Builder response =
                                 BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
                                         .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
-                        c.writeAndFlush(response.build());
+                        channel.writeAndFlush(response.build());
                         if (statsEnabled) {
                             bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
                         }
@@ -365,26 +366,27 @@ public class BookieRequestProcessor implements RequestProcessor {
             switch (r.getOpCode()) {
                 case BookieProtocol.ADDENTRY:
                     checkArgument(r instanceof BookieProtocol.ParsedAddRequest);
-                    processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
+                    processAddRequest((BookieProtocol.ParsedAddRequest) r, requestHandler);
                     break;
                 case BookieProtocol.READENTRY:
                     checkArgument(r instanceof BookieProtocol.ReadRequest);
-                    processReadRequest((BookieProtocol.ReadRequest) r, c);
+                    processReadRequest((BookieProtocol.ReadRequest) r, requestHandler);
                     break;
                 case BookieProtocol.AUTH:
-                    LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
+                    LOG.info("Ignoring auth operation from client {}",
+                            requestHandler.ctx().channel().remoteAddress());
                     BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
                             .newBuilder()
                             .setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
                             .setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
                             .build();
 
-                    c.writeAndFlush(new BookieProtocol.AuthResponse(
+                    channel.writeAndFlush(new BookieProtocol.AuthResponse(
                             BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
                     break;
                 default:
                     LOG.error("Unknown op type {}, sending error", r.getOpCode());
-                    c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
+                    channel.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
                     if (statsEnabled) {
                         bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
                     }
@@ -402,8 +404,9 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, c, this);
+    private void processWriteLacRequestV3(final BookkeeperProtocol.Request r,
+                                          final BookieRequestHandler requestHandler) {
+        WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, requestHandler, this);
         if (null == writeThreadPool) {
             writeLac.run();
         } else {
@@ -411,8 +414,9 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processReadLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, c, this);
+    private void processReadLacRequestV3(final BookkeeperProtocol.Request r,
+                                         final BookieRequestHandler requestHandler) {
+        ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, requestHandler, this);
         if (null == readThreadPool) {
             readLac.run();
         } else {
@@ -420,8 +424,8 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
+    private void processAddRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {
+        WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, requestHandler, this);
 
         final OrderedExecutor threadPool;
         if (RequestUtils.isHighPriority(r)) {
@@ -455,8 +459,9 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, c, this);
+    private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r,
+                                             final BookieRequestHandler requestHandler) {
+        ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, requestHandler, this);
 
         final OrderedExecutor threadPool;
         if (RequestUtils.isHighPriority(r)) {
@@ -492,19 +497,20 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
+    private void processReadRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {
+        ExecutorService fenceThread = null == highPriorityThreadPool ? null :
+                highPriorityThreadPool.chooseThread(requestHandler.ctx());
 
         final ReadEntryProcessorV3 read;
         final OrderedExecutor threadPool;
         if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
-            ExecutorService lpThread = longPollThreadPool.chooseThread(c);
+            ExecutorService lpThread = longPollThreadPool.chooseThread(requestHandler.ctx());
 
-            read = new LongPollReadEntryProcessorV3(r, c, this, fenceThread,
+            read = new LongPollReadEntryProcessorV3(r, requestHandler, this, fenceThread,
                                                     lpThread, requestTimer);
             threadPool = longPollThreadPool;
         } else {
-            read = new ReadEntryProcessorV3(r, c, this, fenceThread);
+            read = new ReadEntryProcessorV3(r, requestHandler, this, fenceThread);
 
             // If it's a high priority read (fencing or as part of recovery process), we want to make sure it
             // gets executed as fast as possible, so bypass the normal readThreadPool
@@ -544,13 +550,16 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+    private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
+                                          final BookieRequestHandler requestHandler) {
         BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder();
         BookkeeperProtocol.BKPacketHeader.Builder header = BookkeeperProtocol.BKPacketHeader.newBuilder();
         header.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE);
         header.setOperation(r.getHeader().getOperation());
         header.setTxnId(r.getHeader().getTxnId());
         response.setHeader(header.build());
+        final Channel c = requestHandler.ctx().channel();
+
         if (shFactory == null) {
             LOG.error("Got StartTLS request but TLS not configured");
             response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
@@ -596,8 +605,9 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, c, this);
+    private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r,
+                                               final BookieRequestHandler requestHandler) {
+        GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, requestHandler, this);
         if (null == readThreadPool) {
             getBookieInfo.run();
         } else {
@@ -605,9 +615,10 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r, final Channel c) {
-        GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger = new GetListOfEntriesOfLedgerProcessorV3(r, c,
-                this);
+    private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r,
+                                                            final BookieRequestHandler requestHandler) {
+        GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger =
+                new GetListOfEntriesOfLedgerProcessorV3(r, requestHandler, this);
         if (null == readThreadPool) {
             getListOfEntriesOfLedger.run();
         } else {
@@ -615,8 +626,8 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Channel c) {
-        WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
+    private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final BookieRequestHandler requestHandler) {
+        WriteEntryProcessor write = WriteEntryProcessor.create(r, requestHandler, this);
 
         // If it's a high priority add (usually as part of recovery process), we want to make sure it gets
         // executed as fast as possible, so bypass the normal writeThreadPool and execute in highPriorityThreadPool
@@ -647,10 +658,11 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
-    private void processReadRequest(final BookieProtocol.ReadRequest r, final Channel c) {
+    private void processReadRequest(final BookieProtocol.ReadRequest r, final BookieRequestHandler requestHandler) {
         ExecutorService fenceThreadPool =
-                null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
-        ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this, fenceThreadPool, throttleReadResponses);
+                null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(requestHandler.ctx());
+        ReadEntryProcessor read = ReadEntryProcessor.create(r, requestHandler,
+                this, fenceThreadPool, throttleReadResponses);
 
         // If it's a high priority read (fencing or as part of recovery process), we want to make sure it
         // gets executed as fast as possible, so bypass the normal readThreadPool
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
index de73f95011..c1627579c6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import io.netty.channel.Channel;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.net.BookieId;
@@ -39,9 +38,9 @@ import org.slf4j.LoggerFactory;
 class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     private static final Logger logger = LoggerFactory.getLogger(ForceLedgerProcessorV3.class);
 
-    public ForceLedgerProcessorV3(Request request, Channel channel,
+    public ForceLedgerProcessorV3(Request request, BookieRequestHandler requestHandler,
                              BookieRequestProcessor requestProcessor) {
-        super(request, channel, requestProcessor);
+        super(request, requestHandler, requestProcessor);
     }
 
     // Returns null if there is no exception thrown
@@ -98,7 +97,7 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
         };
         StatusCode status = null;
         try {
-            requestProcessor.getBookie().forceLedger(ledgerId, wcb, channel);
+            requestProcessor.getBookie().forceLedger(ledgerId, wcb, requestHandler);
             status = StatusCode.EOK;
         } catch (Throwable t) {
             logger.error("Unexpected exception while forcing ledger {} : ", ledgerId, t);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index 6f24255586..8795263a5b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -20,7 +20,6 @@
  */
 package org.apache.bookkeeper.proto;
 
-import io.netty.channel.Channel;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
@@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory;
 public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(GetBookieInfoProcessorV3.class);
 
-    public GetBookieInfoProcessorV3(Request request, Channel channel,
+    public GetBookieInfoProcessorV3(Request request, BookieRequestHandler requestHandler,
                                      BookieRequestProcessor requestProcessor) {
-        super(request, channel, requestProcessor);
+        super(request, requestHandler, requestProcessor);
     }
 
     private GetBookieInfoResponse getGetBookieInfoResponse() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
index 57f72208d8..90c850841d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
@@ -21,7 +21,6 @@
 package org.apache.bookkeeper.proto;
 
 import com.google.protobuf.ByteString;
-import io.netty.channel.Channel;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.Bookie;
@@ -44,9 +43,9 @@ public class GetListOfEntriesOfLedgerProcessorV3 extends PacketProcessorBaseV3 i
     protected final GetListOfEntriesOfLedgerRequest getListOfEntriesOfLedgerRequest;
     protected final long ledgerId;
 
-    public GetListOfEntriesOfLedgerProcessorV3(Request request, Channel channel,
+    public GetListOfEntriesOfLedgerProcessorV3(Request request, BookieRequestHandler requestHandler,
             BookieRequestProcessor requestProcessor) {
-        super(request, channel, requestProcessor);
+        super(request, requestHandler, requestProcessor);
         this.getListOfEntriesOfLedgerRequest = request.getGetListOfEntriesOfLedgerRequest();
         this.ledgerId = getListOfEntriesOfLedgerRequest.getLedgerId();
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index f61a2688ba..658c37c594 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -18,7 +18,6 @@
 package org.apache.bookkeeper.proto;
 
 import com.google.common.base.Stopwatch;
-import io.netty.channel.Channel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import java.io.IOException;
@@ -55,12 +54,12 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch
     private boolean shouldReadEntry = false;
 
     LongPollReadEntryProcessorV3(Request request,
-                                 Channel channel,
+                                 BookieRequestHandler requestHandler,
                                  BookieRequestProcessor requestProcessor,
                                  ExecutorService fenceThreadPool,
                                  ExecutorService longPollThreadPool,
                                  HashedWheelTimer requestTimer) {
-        super(request, channel, requestProcessor, fenceThreadPool);
+        super(request, requestHandler, requestProcessor, fenceThreadPool);
         this.previousLAC = readRequest.getPreviousLAC();
         this.longPollThreadPool = longPollThreadPool;
         this.requestTimer = requestTimer;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 8d079504b1..c9798156c2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -35,20 +35,20 @@ import org.slf4j.LoggerFactory;
 abstract class PacketProcessorBase<T extends Request> implements Runnable {
     private static final Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class);
     T request;
-    Channel channel;
+    BookieRequestHandler requestHandler;
     BookieRequestProcessor requestProcessor;
     long enqueueNanos;
 
-    protected void init(T request, Channel channel, BookieRequestProcessor requestProcessor) {
+    protected void init(T request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) {
         this.request = request;
-        this.channel = channel;
+        this.requestHandler = requestHandler;
         this.requestProcessor = requestProcessor;
         this.enqueueNanos = MathUtils.nowInNano();
     }
 
     protected void reset() {
         request = null;
-        channel = null;
+        requestHandler = null;
         requestProcessor = null;
         enqueueNanos = -1;
     }
@@ -82,8 +82,10 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable {
 
     protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
         final long writeNanos = MathUtils.nowInNano();
-
         final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
+
+        Channel channel = requestHandler.ctx().channel();
+
         if (timeOut >= 0 && !channel.isWritable()) {
             if (!requestProcessor.isBlacklisted(channel)) {
                 synchronized (channel) {
@@ -120,18 +122,23 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable {
         }
 
         if (channel.isActive()) {
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (!future.isSuccess()) {
-                    logger.debug("Netty channel write exception. ", future.cause());
-                }
-            });
+            ChannelPromise promise = channel.voidPromise();
+            if (logger.isDebugEnabled()) {
+                promise = channel.newPromise().addListener(future -> {
+                    if (!future.isSuccess()) {
+                        logger.debug("Netty channel write exception. ", future.cause());
+                    }
+                });
+            }
             channel.writeAndFlush(response, promise);
         } else {
             if (response instanceof BookieProtocol.Response) {
                 ((BookieProtocol.Response) response).release();
             }
+            if (logger.isDebugEnabled()) {
             logger.debug("Netty channel {} is inactive, "
                     + "hence bypassing netty channel writeAndFlush during sendResponse", channel);
+            }
         }
         if (BookieProtocol.EOK == rc) {
             statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
@@ -149,6 +156,7 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable {
      */
     protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsLogger) {
         try {
+            Channel channel = requestHandler.ctx().channel();
             ChannelFuture future = channel.writeAndFlush(response);
             if (!channel.eventLoop().inEventLoop()) {
                 future.get();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index ccc452ae60..dac454933c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -40,14 +40,14 @@ import org.apache.bookkeeper.util.StringUtils;
 public abstract class PacketProcessorBaseV3 implements Runnable {
 
     final Request request;
-    final Channel channel;
+    final BookieRequestHandler requestHandler;
     final BookieRequestProcessor requestProcessor;
     final long enqueueNanos;
 
-    public PacketProcessorBaseV3(Request request, Channel channel,
+    public PacketProcessorBaseV3(Request request, BookieRequestHandler requestHandler,
                                  BookieRequestProcessor requestProcessor) {
         this.request = request;
-        this.channel = channel;
+        this.requestHandler = requestHandler;
         this.requestProcessor = requestProcessor;
         this.enqueueNanos = MathUtils.nowInNano();
     }
@@ -55,6 +55,7 @@ public abstract class PacketProcessorBaseV3 implements Runnable {
     protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
         final long writeNanos = MathUtils.nowInNano();
 
+        Channel channel = requestHandler.ctx().channel();
         final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
         if (timeOut >= 0 && !channel.isWritable()) {
             if (!requestProcessor.isBlacklisted(channel)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 71ee51f3fa..6935ca8be6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.bookkeeper.proto;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
@@ -44,15 +43,15 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
     private boolean throttleReadResponses;
 
     public static ReadEntryProcessor create(ReadRequest request,
-                                            Channel channel,
+                                            BookieRequestHandler requestHandler,
                                             BookieRequestProcessor requestProcessor,
                                             ExecutorService fenceThreadPool,
                                             boolean throttleReadResponses) {
         ReadEntryProcessor rep = RECYCLER.get();
-        rep.init(request, channel, requestProcessor);
+        rep.init(request, requestHandler, requestProcessor);
         rep.fenceThreadPool = fenceThreadPool;
         rep.throttleReadResponses = throttleReadResponses;
-        requestProcessor.onReadRequestStart(channel);
+        requestProcessor.onReadRequestStart(requestHandler.ctx().channel());
         return rep;
     }
 
@@ -61,9 +60,9 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received new read request: {}", request);
         }
-        if (!channel.isOpen()) {
+        if (!requestHandler.ctx().channel().isOpen()) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Dropping read request for closed channel: {}", channel);
+                LOG.debug("Dropping read request for closed channel: {}", requestHandler.ctx().channel());
             }
             requestProcessor.onReadRequestFinish();
             return;
@@ -74,7 +73,8 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
         try {
             CompletableFuture<Boolean> fenceResult = null;
             if (request.isFencing()) {
-                LOG.warn("Ledger: {}  fenced by: {}", request.getLedgerId(), channel.remoteAddress());
+                LOG.warn("Ledger: {}  fenced by: {}", request.getLedgerId(),
+                        requestHandler.ctx().channel().remoteAddress());
 
                 if (request.hasMasterKey()) {
                     fenceResult = requestProcessor.getBookie().fenceLedger(request.getLedgerId(),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 4672d592d8..999b8095db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -57,11 +57,11 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
     protected final OpStatsLogger reqStats;
 
     public ReadEntryProcessorV3(Request request,
-                                Channel channel,
+                                BookieRequestHandler requestHandler,
                                 BookieRequestProcessor requestProcessor,
                                 ExecutorService fenceThreadPool) {
-        super(request, channel, requestProcessor);
-        requestProcessor.onReadRequestStart(channel);
+        super(request, requestHandler, requestProcessor);
+        requestProcessor.onReadRequestStart(requestHandler.ctx().channel());
 
         this.readRequest = request.getReadRequest();
         this.ledgerId = readRequest.getLedgerId();
@@ -194,6 +194,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
 
     protected ReadResponse getReadResponse() {
         final Stopwatch startTimeSw = Stopwatch.createStarted();
+        final Channel channel = requestHandler.ctx().channel();
 
         final ReadResponse.Builder readResponse = ReadResponse.newBuilder()
             .setLedgerId(ledgerId)
@@ -249,9 +250,9 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
     public void run() {
         requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(
             MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
-        if (!channel.isOpen()) {
+        if (!requestHandler.ctx().channel().isOpen()) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Dropping read request for closed channel: {}", channel);
+                LOG.debug("Dropping read request for closed channel: {}", requestHandler.ctx().channel());
             }
             requestProcessor.onReadRequestFinish();
             return;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index abb7d61646..25fe9530ad 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
 
 import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
@@ -43,9 +42,9 @@ import org.slf4j.LoggerFactory;
 class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     private static final Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class);
 
-    public ReadLacProcessorV3(Request request, Channel channel,
+    public ReadLacProcessorV3(Request request, BookieRequestHandler requestHandler,
                              BookieRequestProcessor requestProcessor) {
-        super(request, channel, requestProcessor);
+        super(request, requestHandler, requestProcessor);
     }
 
     // Returns null if there is no exception thrown
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 531dfecccc..7e8f9fa768 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.proto;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.util.Recycler;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
@@ -47,11 +46,11 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
         startTimeNanos = -1L;
     }
 
-    public static WriteEntryProcessor create(ParsedAddRequest request, Channel channel,
+    public static WriteEntryProcessor create(ParsedAddRequest request, BookieRequestHandler requestHandler,
                                              BookieRequestProcessor requestProcessor) {
         WriteEntryProcessor wep = RECYCLER.get();
-        wep.init(request, channel, requestProcessor);
-        requestProcessor.onAddRequestStart(channel);
+        wep.init(request, requestHandler, requestProcessor);
+        requestProcessor.onAddRequestStart(requestHandler.ctx().channel());
         return wep;
     }
 
@@ -74,9 +73,10 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
         ByteBuf addData = request.getData();
         try {
             if (request.isRecoveryAdd()) {
-                requestProcessor.getBookie().recoveryAddEntry(addData, this, channel, request.getMasterKey());
+                requestProcessor.getBookie().recoveryAddEntry(addData, this, requestHandler, request.getMasterKey());
             } else {
-                requestProcessor.getBookie().addEntry(addData, false, this, channel, request.getMasterKey());
+                requestProcessor.getBookie().addEntry(addData, false, this,
+                        requestHandler, request.getMasterKey());
             }
         } catch (OperationRejectedException e) {
             requestProcessor.getRequestStats().getAddEntryRejectedCounter().inc();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 7d59858732..36aff7ad92 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
@@ -43,10 +42,10 @@ import org.slf4j.LoggerFactory;
 class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
     private static final Logger logger = LoggerFactory.getLogger(WriteEntryProcessorV3.class);
 
-    public WriteEntryProcessorV3(Request request, Channel channel,
+    public WriteEntryProcessorV3(Request request, BookieRequestHandler requestHandler,
                                  BookieRequestProcessor requestProcessor) {
-        super(request, channel, requestProcessor);
-        requestProcessor.onAddRequestStart(channel);
+        super(request, requestHandler, requestProcessor);
+        requestProcessor.onAddRequestStart(requestHandler.ctx().channel());
     }
 
     // Returns null if there is no exception thrown
@@ -118,9 +117,11 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
         ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
         try {
             if (RequestUtils.hasFlag(addRequest, AddRequest.Flag.RECOVERY_ADD)) {
-                requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb, channel, masterKey);
+                requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb,
+                        requestHandler.ctx().channel(), masterKey);
             } else {
-                requestProcessor.getBookie().addEntry(entryToAdd, ackBeforeSync, wcb, channel, masterKey);
+                requestProcessor.getBookie().addEntry(entryToAdd, ackBeforeSync, wcb,
+                        requestHandler.ctx().channel(), masterKey);
             }
             status = StatusCode.EOK;
         } catch (OperationRejectedException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index d8a427c905..293cea3bb0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -21,7 +21,6 @@
 package org.apache.bookkeeper.proto;
 
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
@@ -40,9 +39,9 @@ import org.slf4j.LoggerFactory;
 class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     private static final Logger logger = LoggerFactory.getLogger(WriteLacProcessorV3.class);
 
-    public WriteLacProcessorV3(Request request, Channel channel,
+    public WriteLacProcessorV3(Request request, BookieRequestHandler requestHandler,
                              BookieRequestProcessor requestProcessor) {
-        super(request, channel, requestProcessor);
+        super(request, requestHandler, requestProcessor);
     }
 
     // Returns null if there is no exception thrown
@@ -103,7 +102,8 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
         byte[] masterKey = writeLacRequest.getMasterKey().toByteArray();
 
         try {
-            requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), writeCallback, channel, masterKey);
+            requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd),
+                    writeCallback, requestHandler, masterKey);
             status = StatusCode.EOK;
         } catch (IOException e) {
             logger.error("Error saving lac {} for ledger:{}",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 54460770f1..3bc9cbee42 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelPromise;
 import java.util.concurrent.CountDownLatch;
@@ -55,6 +56,8 @@ public class ForceLedgerProcessorV3Test {
 
     private Request request;
     private ForceLedgerProcessorV3 processor;
+
+    private BookieRequestHandler requestHandler;
     private Channel channel;
     private BookieRequestProcessor requestProcessor;
     private Bookie bookie;
@@ -71,17 +74,25 @@ public class ForceLedgerProcessorV3Test {
                 .setLedgerId(System.currentTimeMillis())
                 .build())
             .build();
+
+
         channel = mock(Channel.class);
         when(channel.isOpen()).thenReturn(true);
+        when(channel.isActive()).thenReturn(true);
+
+        requestHandler = mock(BookieRequestHandler.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(requestHandler.ctx()).thenReturn(ctx);
+
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
         when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
-        when(channel.isActive()).thenReturn(true);
         processor = new ForceLedgerProcessorV3(
             request,
-            channel,
+            requestHandler,
             requestProcessor);
     }
 
@@ -102,7 +113,7 @@ public class ForceLedgerProcessorV3Test {
         }).when(bookie).forceLedger(
             eq(request.getForceLedgerRequest().getLedgerId()),
             any(WriteCallback.class),
-            same(channel));
+            same(requestHandler));
 
         ChannelPromise promise = new DefaultChannelPromise(channel);
         AtomicReference<Object> writtenObject = new AtomicReference<>();
@@ -117,7 +128,7 @@ public class ForceLedgerProcessorV3Test {
 
         verify(bookie, times(1))
             .forceLedger(eq(request.getForceLedgerRequest().getLedgerId()),
-                    any(WriteCallback.class), same(channel));
+                    any(WriteCallback.class), same(requestHandler));
         verify(channel, times(1)).writeAndFlush(any(Response.class));
 
         latch.await();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java
index e294a55ccf..5e986515e9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.Bookie;
@@ -38,6 +39,7 @@ import org.junit.Test;
  */
 public class GetBookieInfoProcessorV3Test {
 
+    private BookieRequestHandler requestHandler;
     private Channel channel;
     private BookieRequestProcessor requestProcessor;
     private Bookie bookie;
@@ -55,9 +57,17 @@ public class GetBookieInfoProcessorV3Test {
         requestProcessor = mock(BookieRequestProcessor.class);
         bookie = mock(Bookie.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
+
+        requestHandler = mock(BookieRequestHandler.class);
+
         channel = mock(Channel.class);
         when(channel.isOpen()).thenReturn(true);
         when(channel.isActive()).thenReturn(true);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(requestHandler.ctx()).thenReturn(ctx);
+
         when(requestProcessor.getRequestStats()).thenReturn(requestStats);
         when(requestProcessor.getRequestStats().getGetBookieInfoStats())
                 .thenReturn(getBookieInfoStats);
@@ -85,7 +95,7 @@ public class GetBookieInfoProcessorV3Test {
                 .build();
 
         GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(
-                getBookieInfoRequest, channel, requestProcessor);
+                getBookieInfoRequest, requestHandler, requestProcessor);
         getBookieInfo.run();
 
         // get BookieInfo succeeded.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
index 9eae1b9c0d..33a4fdc829 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 import com.google.protobuf.ByteString;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.HashedWheelTimer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -84,6 +85,12 @@ public class LongPollReadEntryProcessorV3Test {
 
         Channel channel = mock(Channel.class);
         when(channel.isOpen()).thenReturn(true);
+
+        BookieRequestHandler requestHandler = mock(BookieRequestHandler.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(requestHandler.ctx()).thenReturn(ctx);
+
         Bookie bookie = mock(Bookie.class);
 
         BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
@@ -104,7 +111,7 @@ public class LongPollReadEntryProcessorV3Test {
 
         LongPollReadEntryProcessorV3 processor = new LongPollReadEntryProcessorV3(
             request,
-            channel,
+            requestHandler,
             requestProcessor,
             executor, executor, timer);
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
index f3e9764e71..91e100d809 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelPromise;
 import io.netty.channel.EventLoop;
@@ -53,6 +54,7 @@ import org.junit.Test;
 public class ReadEntryProcessorTest {
 
     private Channel channel;
+    private BookieRequestHandler requestHandler;
     private BookieRequestProcessor requestProcessor;
     private Bookie bookie;
 
@@ -60,6 +62,12 @@ public class ReadEntryProcessorTest {
     public void setup() throws IOException, BookieException {
         channel = mock(Channel.class);
         when(channel.isOpen()).thenReturn(true);
+
+        requestHandler = mock(BookieRequestHandler.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(requestHandler.ctx()).thenReturn(ctx);
+
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
@@ -101,7 +109,8 @@ public class ReadEntryProcessorTest {
         long ledgerId = System.currentTimeMillis();
         ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
                 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
-        ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, service, true);
+        ReadEntryProcessor processor = ReadEntryProcessor.create(
+                request, requestHandler, requestProcessor, service, true);
         processor.run();
 
         fenceResult.complete(result);
@@ -143,7 +152,7 @@ public class ReadEntryProcessorTest {
         long ledgerId = System.currentTimeMillis();
         ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
                 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
-        ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, null, true);
+        ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
         fenceResult.complete(result);
         processor.run();
 
@@ -173,7 +182,7 @@ public class ReadEntryProcessorTest {
         long ledgerId = System.currentTimeMillis();
         ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
                 1, (short) 0, new byte[]{});
-        ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, null, true);
+        ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
         processor.run();
 
         latch.await();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index b88f819949..d5ee8f5275 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -27,9 +27,12 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.protobuf.ByteString;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -137,7 +140,14 @@ public class TestBookieRequestProcessor {
                 .setBody(ByteString.copyFrom("entrydata".getBytes())).build();
         Request request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
 
-        WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
+        Channel channel = mock(Channel.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(channel);
+        BookieRequestHandler requestHandler = mock(BookieRequestHandler.class);
+        when(requestHandler.ctx()).thenReturn(ctx);
+
+        WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, requestHandler,
+                requestProcessor);
         String toString = writeEntryProcessorV3.toString();
         assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
         assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
@@ -155,7 +165,7 @@ public class TestBookieRequestProcessor {
                 .setBody(ByteString.copyFrom("entrydata".getBytes())).setFlag(Flag.RECOVERY_ADD).setWriteFlags(0)
                 .build();
         request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
-        writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
+        writeEntryProcessorV3 = new WriteEntryProcessorV3(request, requestHandler, requestProcessor);
         toString = writeEntryProcessorV3.toString();
         assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
         assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index 4479ea4a7a..8fc3a89f00 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelPromise;
 import java.util.concurrent.CountDownLatch;
@@ -53,6 +54,8 @@ public class WriteEntryProcessorTest {
     private ParsedAddRequest request;
     private WriteEntryProcessor processor;
     private Channel channel;
+    private ChannelHandlerContext ctx;
+    private BookieRequestHandler requestHandler;
     private BookieRequestProcessor requestProcessor;
     private Bookie bookie;
 
@@ -67,6 +70,12 @@ public class WriteEntryProcessorTest {
             Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
         channel = mock(Channel.class);
         when(channel.isOpen()).thenReturn(true);
+
+        requestHandler = mock(BookieRequestHandler.class);
+        ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(requestHandler.ctx()).thenReturn(ctx);
+
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
@@ -75,7 +84,7 @@ public class WriteEntryProcessorTest {
         when(channel.isWritable()).thenReturn(true);
         processor = WriteEntryProcessor.create(
             request,
-            channel,
+            requestHandler,
             requestProcessor);
     }
 
@@ -93,7 +102,7 @@ public class WriteEntryProcessorTest {
             Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
         processor = WriteEntryProcessor.create(
             request,
-            channel,
+            requestHandler,
             requestProcessor);
     }
 
@@ -110,11 +119,11 @@ public class WriteEntryProcessorTest {
             writtenObject.set(invocationOnMock.getArgument(0));
             latch.countDown();
             return null;
-        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+        }).when(channel).writeAndFlush(any(), any());
 
         processor.run();
 
-        verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+        verify(channel, times(1)).writeAndFlush(any(), any());
 
         latch.await();
 
@@ -142,11 +151,11 @@ public class WriteEntryProcessorTest {
             writtenObject.set(invocationOnMock.getArgument(0));
             latch.countDown();
             return null;
-        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+        }).when(channel).writeAndFlush(any(), any());
 
         processor.run();
 
-        verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+        verify(channel, times(1)).writeAndFlush(any(), any());
 
         latch.await();
 
@@ -170,7 +179,7 @@ public class WriteEntryProcessorTest {
         doAnswer(invocationOnMock -> {
             processor.writeComplete(0, request.ledgerId, request.entryId, null, null);
             return null;
-        }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+        }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
 
         AtomicReference<Object> writtenObject = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
@@ -178,13 +187,13 @@ public class WriteEntryProcessorTest {
             writtenObject.set(invocationOnMock.getArgument(0));
             latch.countDown();
             return null;
-        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+        }).when(channel).writeAndFlush(any(), any());
 
         processor.run();
 
         verify(bookie, times(1))
-            .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
-        verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+            .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(), any());
 
         latch.await();
 
@@ -205,7 +214,7 @@ public class WriteEntryProcessorTest {
         doAnswer(invocationOnMock -> {
             processor.writeComplete(0, request.ledgerId, request.entryId, null, null);
             return null;
-        }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+        }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
 
         AtomicReference<Object> writtenObject = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
@@ -213,13 +222,13 @@ public class WriteEntryProcessorTest {
             writtenObject.set(invocationOnMock.getArgument(0));
             latch.countDown();
             return null;
-        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+        }).when(channel).writeAndFlush(any(), any());
 
         processor.run();
 
         verify(bookie, times(1))
-            .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
-        verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+            .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(), any());
 
         latch.await();
 
@@ -241,7 +250,7 @@ public class WriteEntryProcessorTest {
         doAnswer(invocationOnMock -> {
             throw new BookieException.OperationRejectedException();
         }).when(bookie).addEntry(
-                any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+                any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
 
         ChannelPromise promise = new DefaultChannelPromise(channel);
         AtomicReference<Object> writtenObject = new AtomicReference<>();
@@ -250,13 +259,13 @@ public class WriteEntryProcessorTest {
             writtenObject.set(invocationOnMock.getArgument(0));
             latch.countDown();
             return promise;
-        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+        }).when(channel).writeAndFlush(any(), any());
 
         processor.run();
 
         verify(bookie, times(1))
-                .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
-        verify(channel, times(1)).writeAndFlush(any(Response.class), any(ChannelPromise.class));
+                .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(Response.class), any());
 
         latch.await();
         assertTrue(writtenObject.get() instanceof Response);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 2175b26a5b..40eb662cc5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.when;
 import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelPromise;
 import java.util.concurrent.CountDownLatch;
@@ -57,7 +58,9 @@ public class WriteEntryProcessorV3Test {
 
     private Request request;
     private WriteEntryProcessorV3 processor;
+
     private Channel channel;
+    private BookieRequestHandler requestHandler;
     private BookieRequestProcessor requestProcessor;
     private Bookie bookie;
 
@@ -78,6 +81,12 @@ public class WriteEntryProcessorV3Test {
             .build();
         channel = mock(Channel.class);
         when(channel.isOpen()).thenReturn(true);
+
+        requestHandler = mock(BookieRequestHandler.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(requestHandler.ctx()).thenReturn(ctx);
+
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
@@ -86,7 +95,7 @@ public class WriteEntryProcessorV3Test {
         when(channel.isActive()).thenReturn(true);
         processor = new WriteEntryProcessorV3(
             request,
-            channel,
+            requestHandler,
             requestProcessor);
     }
 
@@ -99,7 +108,7 @@ public class WriteEntryProcessorV3Test {
 
         processor = new WriteEntryProcessorV3(
             request,
-            channel,
+            requestHandler,
             requestProcessor);
     }