You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ni...@apache.org on 2022/09/20 08:12:37 UTC

[bookkeeper] 14/25: Reorder the sequence of the bookkeeper server shutdown (#2888)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch ds-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit f260ff4876ba27ca088d2624942b56cf5c32a575
Author: pradeepbn <pr...@gmail.com>
AuthorDate: Wed Dec 1 09:37:28 2021 -0800

    Reorder the sequence of the bookkeeper server shutdown (#2888)
    
    Reorders the sequence of the bookkeeper server shutdown
    so that any in-progress reads or writes don't hit ledger
    storage after it has been shutdown. Now the request processor
    is shutdown before the bookie.
    
    An additional check if the channel is active is performed in
    the packet processor callbacks before sending response
    to avoid RejectedExecutionException messages within
    Netty from polluting the log.
    
    (cherry picked from commit 7395bb48e2ff41fac93a84ff8bfa9097bd0af8fe)
    (cherry picked from commit f8eb20db466ac434a7402539d0e3d4c362e57e0f)
---
 .../bookkeeper/proto/BookieRequestProcessor.java   |  3 ++
 .../org/apache/bookkeeper/proto/BookieServer.java  |  2 +-
 .../bookkeeper/proto/PacketProcessorBase.java      |  7 +++-
 .../bookkeeper/proto/PacketProcessorBaseV3.java    | 40 ++++++++++++----------
 .../proto/ForceLedgerProcessorV3Test.java          |  1 +
 .../bookkeeper/proto/WriteEntryProcessorTest.java  |  1 +
 .../proto/WriteEntryProcessorV3Test.java           |  1 +
 7 files changed, 35 insertions(+), 20 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 67f83e9ce5..902e2c1b41 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -264,6 +264,7 @@ public class BookieRequestProcessor implements RequestProcessor {
 
     @Override
     public void close() {
+        LOG.info("Closing RequestProcessor");
         shutdownExecutor(writeThreadPool);
         shutdownExecutor(readThreadPool);
         if (serverCfg.getNumLongPollWorkerThreads() > 0 || readThreadPool == null) {
@@ -271,6 +272,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
         shutdownExecutor(highPriorityThreadPool);
         requestTimer.stop();
+        LOG.info("Closed RequestProcessor");
     }
 
     private OrderedExecutor createExecutor(
@@ -295,6 +297,7 @@ public class BookieRequestProcessor implements RequestProcessor {
     private void shutdownExecutor(OrderedExecutor service) {
         if (null != service) {
             service.shutdown();
+            service.forceShutdown(10, TimeUnit.SECONDS);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index ee83327b2f..69aece207c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -222,8 +222,8 @@ public class BookieServer {
         if (!running) {
             return;
         }
-        exitCode = bookie.shutdown();
         this.requestProcessor.close();
+        exitCode = bookie.shutdown();
         running = false;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 4cc7176ede..d416b9f141 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -66,7 +66,12 @@ abstract class PacketProcessorBase<T extends Request> extends SafeRunnable {
     }
 
     protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
-        channel.writeAndFlush(response, channel.voidPromise());
+        if (channel.isActive()) {
+            channel.writeAndFlush(response, channel.voidPromise());
+        } else {
+            LOGGER.debug("Netty channel {} is inactive, "
+                    + "hence bypassing netty channel writeAndFlush during sendResponse", channel);
+        }
         if (BookieProtocol.EOK == rc) {
             statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
         } else {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 15765a252b..d4ad65ba43 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -87,25 +87,29 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
                 requestProcessor.invalidateBlacklist(channel);
             }
         }
-
-        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
-                if (!future.isSuccess()) {
-                    requestProcessor.getRequestStats().getChannelWriteStats()
-                        .registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
-                } else {
-                    requestProcessor.getRequestStats().getChannelWriteStats()
-                        .registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
-                }
-                if (StatusCode.EOK == code) {
-                    statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
-                } else {
-                    statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+        if (channel.isActive()) {
+            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
+                    if (!future.isSuccess()) {
+                        requestProcessor.getRequestStats().getChannelWriteStats()
+                                .registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
+                    } else {
+                        requestProcessor.getRequestStats().getChannelWriteStats()
+                                .registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
+                    }
+                    if (StatusCode.EOK == code) {
+                        statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+                    } else {
+                        statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+                    }
                 }
-            }
-        });
+            });
+        } else {
+            LOGGER.debug("Netty channel {} is inactive, "
+                    + "hence bypassing netty channel writeAndFlush during sendResponse", channel);
+        }
     }
 
     protected boolean isVersionCompatible() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index bab83fb326..1b07fbb4d1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -76,6 +76,7 @@ public class ForceLedgerProcessorV3Test {
         when(requestProcessor.getBookie()).thenReturn(bookie);
         when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
+        when(channel.isActive()).thenReturn(true);
         processor = new ForceLedgerProcessorV3(
             request,
             channel,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index bbcffea08c..27a4306a6e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -68,6 +68,7 @@ public class WriteEntryProcessorTest {
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
         when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
+        when(channel.isActive()).thenReturn(true);
         processor = WriteEntryProcessor.create(
             request,
             channel,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 292dc519ca..7abaa100c8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -81,6 +81,7 @@ public class WriteEntryProcessorV3Test {
         when(requestProcessor.getBookie()).thenReturn(bookie);
         when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE));
+        when(channel.isActive()).thenReturn(true);
         processor = new WriteEntryProcessorV3(
             request,
             channel,