You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2023/03/07 03:55:47 UTC

[bookkeeper] branch master updated: Group and flush add-responses after journal sync (#3837)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new d6748f915d Group and flush add-responses after journal sync (#3837)
d6748f915d is described below

commit d6748f915d4801e90f001bc09d65918df85b305f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 6 19:55:41 2023 -0800

    Group and flush add-responses after journal sync (#3837)
    
    ### Motivation
    
    Note: this is stacked on top of #3830 & #3835
    
    This change improves the way the AddRequests responses are send to client.
    
    The current flow is:
     * The journal-force-thread issues the fsync on the journal file
     * We iterate over all the entries that were just synced and for each of them:
         1. Trigger channel.writeAndFlus()
         2. This will jump on the connection IO thread (Netty will use a `write()` to `eventfd` to post the task and wake the epoll)
         3. Write the object in the connection and trigger the serialization logic
         4. Grab a `ByteBuf` from the pool and write ~20 bytes with the response
         5. Write and flush the buffer on the channel
         6. With the flush consolidator we try to group multiple buffer into a single `writev()` syscall, though each call will have a long list of buffer, making the memcpy inefficient.
         7. Release all the buffers and return them to the pool
    
    All these steps are quite expensive when the bookie is receiving a lot of small requests.
    
    This PR changes the flow into:
    
    1. journal fsync
    2. go through each request and prepare the response into a per-connection `ByteBuf` which is not written on the channel as of yet
    3. after preparing all the responses, we flush them at once: Trigger an event on all the connections that will write the accumulated buffers.
    
    The advantages are:
     1. 1 ByteBuf allocated per connection instead of 1 per request
        1. Less allocations and stress of buffer pool
        2. More efficient socket write() operations
     3. 1 task per connection posted on the Netty IO threads, instead of 1 per request.
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  3 ++
 .../org/apache/bookkeeper/bookie/BookieImpl.java   |  8 ++++
 .../java/org/apache/bookkeeper/bookie/Journal.java | 15 +++++++
 .../bookkeeper/processor/RequestProcessor.java     |  5 +++
 .../bookkeeper/proto/BookieProtoEncoding.java      | 13 +++++-
 .../bookkeeper/proto/BookieRequestHandler.java     | 49 ++++++++++++++++++----
 .../bookkeeper/proto/BookieRequestProcessor.java   | 14 ++++++-
 .../org/apache/bookkeeper/proto/BookieServer.java  |  4 +-
 .../bookkeeper/proto/WriteEntryProcessor.java      |  7 ++--
 .../proto/TestBookieRequestProcessor.java          | 13 ++++--
 .../bookkeeper/proto/WriteEntryProcessorTest.java  | 29 +++++--------
 11 files changed, 126 insertions(+), 34 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 90c8acf5af..ac9df53cd2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.PrimitiveIterator;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 
 /**
@@ -86,6 +87,8 @@ public interface Bookie {
     // TODO: Should be constructed and passed in as a parameter
     LedgerStorage getLedgerStorage();
 
+    void setRequestProcessor(RequestProcessor requestProcessor);
+
     // TODO: Move this exceptions somewhere else
     /**
      * Exception is thrown when no such a ledger is found in this bookie.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 0db230d9d3..2b76488cbe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -69,6 +69,7 @@ import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNS;
+import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -1281,4 +1282,11 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
             }
         }
     }
+
+    @Override
+    public void setRequestProcessor(RequestProcessor requestProcessor) {
+        for (Journal journal : journals) {
+            journal.setRequestProcessor(requestProcessor);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index eff1673edb..5f6b60f799 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -51,6 +51,7 @@ import org.apache.bookkeeper.common.collections.RecyclableArrayList;
 import org.apache.bookkeeper.common.util.MemoryLimitController;
 import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -444,6 +445,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         // This holds the queue entries that should be notified after a
         // successful force write
         Thread threadToNotifyOnEx;
+
+        RequestProcessor requestProcessor;
         // should we group force writes
         private final boolean enableGroupForceWrites;
         private final Counter forceWriteThreadTime;
@@ -499,6 +502,10 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                     journalStats.getForceWriteGroupingCountStats()
                             .registerSuccessfulValue(numReqInLastForceWrite);
 
+                    if (requestProcessor != null) {
+                        requestProcessor.flushPendingResponses();
+                    }
+
                 } catch (IOException ioe) {
                     LOG.error("I/O exception in ForceWrite thread", ioe);
                     running = false;
@@ -1093,6 +1100,10 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                                     numEntriesToFlush--;
                                     entry.run();
                                 }
+
+                                if (forceWriteThread.requestProcessor != null) {
+                                    forceWriteThread.requestProcessor.flushPendingResponses();
+                                }
                             }
 
                             lastFlushPosition = bc.position();
@@ -1211,6 +1222,10 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
     }
 
+    public void setRequestProcessor(RequestProcessor requestProcessor) {
+        forceWriteThread.requestProcessor = requestProcessor;
+    }
+
     /**
      * Shuts down the journal.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 5a4238e64d..9f9a0daf68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -42,4 +42,9 @@ public interface RequestProcessor extends AutoCloseable {
      *          channel received the given request <i>r</i>
      */
     void processRequest(Object r, BookieRequestHandler channel);
+
+    /**
+     * Flush any pending response staged on all the client connections.
+     */
+    void flushPendingResponses();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 3e41a3f5ea..edbffa5f43 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -334,6 +334,14 @@ public class BookieProtoEncoding {
                 throw new IllegalStateException("Received unknown response : op code = " + opCode);
             }
         }
+
+        public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) {
+            buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size
+            buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0));
+            buf.writeInt(rc); // rc-code
+            buf.writeLong(req.getLedgerId());
+            buf.writeLong(req.getEntryId());
+        }
     }
 
     /**
@@ -504,7 +512,10 @@ public class BookieProtoEncoding {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
             }
-            if (msg instanceof BookkeeperProtocol.Response) {
+
+            if (msg instanceof ByteBuf) {
+                ctx.write(msg, promise);
+            } else if (msg instanceof BookkeeperProtocol.Response) {
                 ctx.write(repV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Response) {
                 ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
index c9d65a7317..50b7969023 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
@@ -20,26 +20,31 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.group.ChannelGroup;
 import java.nio.channels.ClosedChannelException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Serverside handler for bookkeeper requests.
  */
+@Slf4j
 public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class);
+    static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object();
+
     private final RequestProcessor requestProcessor;
     private final ChannelGroup allChannels;
 
     private ChannelHandlerContext ctx;
 
+    private ByteBuf pendingSendResponses = null;
+    private int maxPendingResponsesSize;
+
     BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
         this.requestProcessor = processor;
         this.allChannels = allChannels;
@@ -51,7 +56,7 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        LOG.info("Channel connected  {}", ctx.channel());
+        log.info("Channel connected  {}", ctx.channel());
         this.ctx = ctx;
         super.channelActive(ctx);
     }
@@ -63,16 +68,16 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        LOG.info("Channels disconnected: {}", ctx.channel());
+        log.info("Channels disconnected: {}", ctx.channel());
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         if (cause instanceof ClosedChannelException) {
-            LOG.info("Client died before request could be completed on {}", ctx.channel(), cause);
+            log.info("Client died before request could be completed on {}", ctx.channel(), cause);
             return;
         }
-        LOG.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause);
+        log.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause);
         ctx.close();
     }
 
@@ -84,4 +89,34 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
         }
         requestProcessor.processRequest(msg, this);
     }
+
+    public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) {
+        if (pendingSendResponses == null) {
+            pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0
+                    ? maxPendingResponsesSize : 256);
+        }
+
+        BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) {
+            synchronized (this) {
+                if (pendingSendResponses != null) {
+                    maxPendingResponsesSize = Math.max(maxPendingResponsesSize,
+                            pendingSendResponses.readableBytes());
+                    if (ctx.channel().isActive()) {
+                        ctx.writeAndFlush(pendingSendResponses, ctx.voidPromise());
+                    } else {
+                        pendingSendResponses.release();
+                    }
+
+                    pendingSendResponses = null;
+                }
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 9237c451ed..d07aa9cffa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.Future;
@@ -118,6 +119,8 @@ public class BookieRequestProcessor implements RequestProcessor {
     final Semaphore addsSemaphore;
     final Semaphore readsSemaphore;
 
+    final ChannelGroup allChannels;
+
     // to temporary blacklist channels
     final Optional<Cache<Channel, Boolean>> blacklistedChannels;
     final Consumer<Channel> onResponseTimeout;
@@ -127,9 +130,11 @@ public class BookieRequestProcessor implements RequestProcessor {
     private final boolean throttleReadResponses;
 
     public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger,
-            SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException {
+                                  SecurityHandlerFactory shFactory, ByteBufAllocator allocator,
+                                  ChannelGroup allChannels) throws SecurityException {
         this.serverCfg = serverCfg;
         this.allocator = allocator;
+        this.allChannels = allChannels;
         this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
         this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution();
         this.bookie = bookie;
@@ -694,6 +699,13 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
+    @Override
+    public void flushPendingResponses() {
+        for (Channel c : allChannels) {
+            c.pipeline().fireUserEventTriggered(BookieRequestHandler.EVENT_FLUSH_ALL_PENDING_RESPONSES);
+        }
+    }
+
     public long getWaitTimeoutOnBackpressureMillis() {
         return waitTimeoutOnBackpressureMillis;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 64f439b213..caff467db3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -102,9 +102,11 @@ public class BookieServer {
 
         shFactory = SecurityProviderFactoryFactory
                 .getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
+
         this.requestProcessor = new BookieRequestProcessor(conf, bookie,
-                statsLogger.scope(SERVER_SCOPE), shFactory, allocator);
+                statsLogger.scope(SERVER_SCOPE), shFactory, allocator, nettyServer.allChannels);
         this.nettyServer.setRequestProcessor(this.requestProcessor);
+        this.bookie.setRequestProcessor(this.requestProcessor);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 7e8f9fa768..29b3a5abb7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -122,9 +122,10 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
             requestProcessor.getRequestStats().getAddEntryStats()
                 .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
         }
-        sendWriteReqResponse(rc,
-                     ResponseBuilder.buildAddResponse(request),
-                     requestProcessor.getRequestStats().getAddRequestStats());
+
+        requestHandler.prepareSendResponseV2(rc, request);
+        requestProcessor.onAddRequestFinish();
+
         request.recycle();
         recycle();
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index d5ee8f5275..4630402343 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -33,6 +33,8 @@ import com.google.protobuf.ByteString;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -53,12 +55,15 @@ public class TestBookieRequestProcessor {
 
     final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
 
+    private final ChannelGroup channelGroup = new DefaultChannelGroup(null);
+
     @Test
     public void testConstructLongPollThreads() throws Exception {
         // long poll threads == read threads
         ServerConfiguration conf = new ServerConfiguration();
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
+                channelGroup)) {
             assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
         }
 
@@ -66,7 +71,8 @@ public class TestBookieRequestProcessor {
         conf = new ServerConfiguration();
         conf.setNumReadWorkerThreads(0);
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
+                channelGroup)) {
             assertNull(processor.getReadThreadPool());
             assertNotNull(processor.getLongPollThreadPool());
         }
@@ -76,7 +82,8 @@ public class TestBookieRequestProcessor {
         conf.setNumReadWorkerThreads(2);
         conf.setNumLongPollWorkerThreads(2);
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
+                channelGroup)) {
             assertNotNull(processor.getReadThreadPool());
             assertNotNull(processor.getLongPollThreadPool());
             assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index 8fc3a89f00..a02cde4ab9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.doAnswer;
@@ -181,28 +182,25 @@ public class WriteEntryProcessorTest {
             return null;
         }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
 
-        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        AtomicReference<Integer> writtenObject = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
         doAnswer(invocationOnMock -> {
             writtenObject.set(invocationOnMock.getArgument(0));
             latch.countDown();
             return null;
-        }).when(channel).writeAndFlush(any(), any());
+        }).when(requestHandler).prepareSendResponseV2(anyInt(), any());
 
         processor.run();
 
         verify(bookie, times(1))
             .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
-        verify(channel, times(1)).writeAndFlush(any(), any());
+        verify(requestHandler, times(1)).prepareSendResponseV2(anyInt(), any());
+//        verify(channel, times(1)).writeAndFlush(any(), any());
 
         latch.await();
 
-        assertTrue(writtenObject.get() instanceof Response);
-        Response response = (Response) writtenObject.get();
-        assertEquals(BookieProtocol.EOK, response.getErrorCode());
-
-        response.release();
-        response.recycle();
+        assertTrue(writtenObject.get() instanceof Integer);
+        assertEquals(BookieProtocol.EOK, (int) writtenObject.get());
     }
 
     @Test
@@ -216,28 +214,23 @@ public class WriteEntryProcessorTest {
             return null;
         }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
 
-        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        AtomicReference<Integer> writtenObject = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
         doAnswer(invocationOnMock -> {
             writtenObject.set(invocationOnMock.getArgument(0));
             latch.countDown();
             return null;
-        }).when(channel).writeAndFlush(any(), any());
+        }).when(requestHandler).prepareSendResponseV2(anyInt(), any());
 
         processor.run();
 
         verify(bookie, times(1))
             .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
-        verify(channel, times(1)).writeAndFlush(any(), any());
+        verify(requestHandler, times(1)).prepareSendResponseV2(anyInt(), any());
 
         latch.await();
 
-        assertTrue(writtenObject.get() instanceof Response);
-        Response response = (Response) writtenObject.get();
-        assertEquals(BookieProtocol.EOK, response.getErrorCode());
-
-        response.release();
-        response.recycle();
+        assertEquals(BookieProtocol.EOK, (int) writtenObject.get());
     }
 
     @Test