You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/06/19 07:42:49 UTC

[bookkeeper] 25/31: clear channel when channelInactive (#3966)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit ab90a14b725a69261fb5912c5da458de75a67d46
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Wed May 31 12:10:13 2023 +0800

    clear channel when channelInactive (#3966)
    
    ### Motivation
    
    I am stopping a pulsar broker, and I observed a lot of the following logs in the broker log:
    
    ```
    17:48:35.894 [shutdown-thread-43-1] INFO  org.apache.bookkeeper.proto.PerChannelBookieClient - Closing the per channel bookie client for 10.184.xx.xx:3181
    17:48:35.894 [shutdown-thread-43-1] ERROR io.netty.util.concurrent.DefaultPromise.rejectedExecution - Failed to submit a listener notification task. Event loop shut down?
    java.util.concurrent.RejectedExecutionException: event executor terminated
            at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:842) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) ~[io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
            at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) ~[io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
            at org.apache.bookkeeper.proto.PerChannelBookieClient.closeChannel(PerChannelBookieClient.java:1090) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
            at org.apache.bookkeeper.proto.PerChannelBookieClient.closeInternal(PerChannelBookieClient.java:1079) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
            at org.apache.bookkeeper.proto.PerChannelBookieClient.close(PerChannelBookieClient.java:1063) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
            at org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.close(DefaultPerChannelBookieClientPool.java:157) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
            at org.apache.bookkeeper.proto.BookieClientImpl.close(BookieClientImpl.java:587) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
            at org.apache.bookkeeper.client.BookKeeper.close(BookKeeper.java:1435) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
            at org.apache.pulsar.broker.ManagedLedgerClientFactory.close(ManagedLedgerClientFactory.java:142) ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
            at org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:417) ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
            at org.apache.pulsar.broker.MessagingServiceShutdownHook.lambda$run$1(MessagingServiceShutdownHook.java:62) ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
    ```
    
    When I stopped the broker, there were more than 30,000 lines in such a log, which seriously polluted my normal log of the broker.
    <img width="1180" alt="image" src="https://github.com/apache/bookkeeper/assets/35599757/0ec3c423-14f1-41c4-85c3-d978ece15044">
    
    According to my next investigation,
    1. When the broker stops, it will close the ManagedLedgerClientFactory. It will close all bookkeeper connections
    
    https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java#L154
    ```java
        @Override
        public void close(boolean wait) {
            for (int i = 0; i < clients.length; i++) {
                clients[i].close(wait);
                if (clients != clientsV3Enforced) {
                    clientsV3Enforced[i].close(wait);
                }
            }
        }
    
        private void closeInternal(boolean permanent, boolean wait) {
            Channel toClose = null;
            synchronized (this) {
                if (permanent) {
                    state = ConnectionState.CLOSED;
                } else if (state != ConnectionState.CLOSED) {
                    state = ConnectionState.DISCONNECTED;
                }
                toClose = channel;
                channel = null;
                makeWritable();
            }
            if (toClose != null) {   <== a. toClose not null
                ChannelFuture cf = closeChannel(toClose);
                if (wait) {
                    cf.awaitUninterruptibly();
                }
            }
        }
    
        private ChannelFuture closeChannel(Channel c) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closing channel {}", c);
            }
            return c.close().addListener(x -> makeWritable());  <==  b. here if channel is already inactive, add the listener will throw exception
        }
    ```
    
    2. When the bookie server first disconnects the client, the client will trigger channelInactive
    ```java
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            LOG.info("Disconnected from bookie channel {}", ctx.channel());
            if (ctx.channel() != null) {
                closeChannel(ctx.channel());  <== c. here channel is closed
                if (ctx.channel().pipeline().get(SslHandler.class) != null) {
                    activeTlsChannelCounter.dec();
                } else {
                    activeNonTlsChannelCounter.dec();
                }
            }
    
            errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException);
            errorOutPendingOps(BKException.Code.BookieHandleNotAvailableException);
    
            synchronized (this) {
                if (this.channel == ctx.channel()
                    && state != ConnectionState.CLOSED) {
                    state = ConnectionState.DISCONNECTED;
                }
            }
    
            // we don't want to reconnect right away. If someone sends a request to
            // this address, we will reconnect.
        }
    ```
    
    If step c happens before b, then b throws the above exception.
    
    step c
    <img width="1400" alt="image" src="https://github.com/apache/bookkeeper/assets/35599757/4a33855c-5682-4812-ae0e-1a02972ef502">
    <img width="1407" alt="image" src="https://github.com/apache/bookkeeper/assets/35599757/1c99bccb-b2bd-4cfb-ac4b-ee10b2d236b0">
    
    step a and b
    <img width="1403" alt="image" src="https://github.com/apache/bookkeeper/assets/35599757/fec372c5-f97b-47e7-b5d7-ebbaa6edf682">
    
    So when channelInactive, we should reset the channel to null after closed the channel.
    Here, when the channel is null, we will reconnect after getting the client, which is consistent with the current behavior.
    ![image](https://github.com/apache/bookkeeper/assets/35599757/d1178697-27bd-44a5-acd1-33ccec99e7a4)
    
    (cherry picked from commit 808b909edf597254aea212c287fe17fb6d808198)
---
 .../main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index f84343a71b..b4cf194e24 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1263,6 +1263,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             if (this.channel == ctx.channel()
                 && state != ConnectionState.CLOSED) {
                 state = ConnectionState.DISCONNECTED;
+                channel = null;
             }
         }