You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/06/16 01:19:02 UTC

[bookkeeper] 02/02: Apply the backpressure changes on the V2 requests (#3324)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit b025f7b7a34dbf9ec450007d0acfbc7d6d9793c8
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Jun 16 08:17:19 2022 +0800

    Apply the backpressure changes on the V2 requests (#3324)
    
    ---
    
    *Motivation*
    
    If one bookie is slow (not down, just slow), the BK client
    will the acks to the user that the entries are written after
    the first 2 acks. In the meantime, it will keep waiting for
    the 3rd bookie to respond. If the bookie responds within the
    timeout, the entries can now be dropped from memory, otherwise
    the write will timeout internally and it will get replayed
    to a new bookie.
    
    In the V3 request, we have [server-side backpressure](https://github.com/apache/bookkeeper/pull/1410)
    to impact the client-side behaviors. We should apply the same
    changes to the V2 request. That would help this [issue](https://github.com/apache/pulsar/issues/14861)
    to be resolved.
    
    *Modification*
    
    - Apply the change https://github.com/apache/bookkeeper/pull/1410 to V2 protocol
    
    Descriptions of the changes in this PR:
    
    (cherry picked from commit 62400bd959b781fafa9471915c0cc7133a89dee7)
---
 .../bookkeeper/proto/BookieRequestProcessor.java   |  2 +-
 .../bookkeeper/proto/PacketProcessorBase.java      | 56 ++++++++++++++++++++++
 .../bookkeeper/proto/ReadEntryProcessor.java       |  7 +--
 .../bookkeeper/proto/WriteEntryProcessor.java      |  7 +--
 .../proto/BookieBackpressureForV2Test.java         | 36 ++++++++++++++
 .../bookkeeper/proto/WriteEntryProcessorTest.java  |  1 +
 6 files changed, 100 insertions(+), 9 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 902e2c1b41..2223274882 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -638,7 +638,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                             r.entryId);
                 }
 
-                write.sendResponse(
+                write.sendWriteReqResponse(
                     BookieProtocol.ETOOMANYREQUESTS,
                     ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r),
                     requestStats.getAddRequestStats());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index d416b9f141..91f054c005 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -25,6 +25,7 @@ import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +66,56 @@ abstract class PacketProcessorBase<T extends Request> extends SafeRunnable {
         return true;
     }
 
+    protected void sendWriteReqResponse(int rc, Object response, OpStatsLogger statsLogger) {
+        sendResponse(rc, response, statsLogger);
+        requestProcessor.onAddRequestFinish();
+    }
+
+    protected void sendReadReqResponse(int rc, Object response, OpStatsLogger statsLogger, boolean throttle) {
+        if (throttle) {
+            sendResponseAndWait(rc, response, statsLogger);
+        } else {
+            sendResponse(rc, response, statsLogger);
+        }
+        requestProcessor.onReadRequestFinish();
+    }
+
     protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
+        final long writeNanos = MathUtils.nowInNano();
+
+        final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
+        if (timeOut >= 0 && !channel.isWritable()) {
+            if (!requestProcessor.isBlacklisted(channel)) {
+                synchronized (channel) {
+                    if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) {
+                        final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut);
+                        while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) {
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(1);
+                            } catch (InterruptedException e) {
+                                break;
+                            }
+                        }
+                        if (!channel.isWritable()) {
+                            requestProcessor.blacklistChannel(channel);
+                            requestProcessor.handleNonWritableChannel(channel);
+                        }
+                    }
+                }
+            }
+
+            if (!channel.isWritable()) {
+                LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel,
+                    StringUtils.requestToString(request));
+                requestProcessor.getRequestStats().getChannelWriteStats()
+                    .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
+                statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+                return;
+            } else {
+                requestProcessor.invalidateBlacklist(channel);
+            }
+        }
+
         if (channel.isActive()) {
             channel.writeAndFlush(response, channel.voidPromise());
         } else {
@@ -106,6 +156,12 @@ abstract class PacketProcessorBase<T extends Request> extends SafeRunnable {
             sendResponse(BookieProtocol.EBADVERSION,
                          ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request),
                          requestProcessor.getRequestStats().getReadRequestStats());
+            if (request instanceof BookieProtocol.ReadRequest) {
+                requestProcessor.onReadRequestFinish();
+            }
+            if (request instanceof BookieProtocol.AddRequest) {
+                requestProcessor.onAddRequestFinish();
+            }
             return;
         }
         processPacket();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index a760383b8d..60de0440c1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -56,6 +56,7 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
         rep.init(request, channel, requestProcessor);
         rep.fenceThreadPool = fenceThreadPool;
         rep.throttleReadResponses = throttleReadResponses;
+        requestProcessor.onReadRequestStart(channel);
         return rep;
     }
 
@@ -133,11 +134,7 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
             response = ResponseBuilder.buildErrorResponse(errorCode, request);
         }
 
-        if (throttleReadResponses) {
-            sendResponseAndWait(errorCode, response, stats.getReadRequestStats());
-        } else {
-            sendResponse(errorCode, response, stats.getReadRequestStats());
-        }
+        sendReadReqResponse(errorCode, response, stats.getReadRequestStats(), throttleReadResponses);
         recycle();
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index a61e0d5024..a4340bc998 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -53,6 +53,7 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
                                              BookieRequestProcessor requestProcessor) {
         WriteEntryProcessor wep = RECYCLER.get();
         wep.init(request, channel, requestProcessor);
+        requestProcessor.onAddRequestStart(channel);
         return wep;
     }
 
@@ -62,7 +63,7 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
             && !(request.isHighPriority() && requestProcessor.getBookie().isAvailableForHighPriorityWrites())) {
             LOG.warn("BookieServer is running in readonly mode,"
                     + " so rejecting the request from the client!");
-            sendResponse(BookieProtocol.EREADONLY,
+            sendWriteReqResponse(BookieProtocol.EREADONLY,
                          ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request),
                          requestProcessor.getRequestStats().getAddRequestStats());
             request.release();
@@ -107,7 +108,7 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
         if (rc != BookieProtocol.EOK) {
             requestProcessor.getRequestStats().getAddEntryStats()
                 .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
-            sendResponse(rc,
+            sendWriteReqResponse(rc,
                          ResponseBuilder.buildErrorResponse(rc, request),
                          requestProcessor.getRequestStats().getAddRequestStats());
             request.recycle();
@@ -124,7 +125,7 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
             requestProcessor.getRequestStats().getAddEntryStats()
                 .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
         }
-        sendResponse(rc,
+        sendWriteReqResponse(rc,
                      ResponseBuilder.buildAddResponse(request),
                      requestProcessor.getRequestStats().getAddRequestStats());
         request.recycle();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
new file mode 100644
index 0000000000..775844d05e
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import org.junit.Before;
+
+/**
+ * Tests for bckpressure handling on the server side with V2 protocol.
+ */
+public class BookieBackpressureForV2Test extends BookieBackpressureTest {
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        baseClientConf.setUseV2WireProtocol(true);
+        // the backpressure will bloc the read response, disable it to let it use backpressure mechanism
+        confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false);
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index 27a4306a6e..a69245d4a9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -69,6 +69,7 @@ public class WriteEntryProcessorTest {
         when(requestProcessor.getBookie()).thenReturn(bookie);
         when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
         when(channel.isActive()).thenReturn(true);
+        when(channel.isWritable()).thenReturn(true);
         processor = WriteEntryProcessor.create(
             request,
             channel,