You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "joecqupt (Jira)" <ji...@apache.org> on 2021/12/22 12:27:00 UTC

[jira] [Commented] (KAFKA-13556) Kafka cluster loses the controller

    [ https://issues.apache.org/jira/browse/KAFKA-13556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17463773#comment-17463773 ] 

joecqupt commented on KAFKA-13556:
----------------------------------

this problem has been reported 

 

[KAFKA-13461|https://issues.apache.org/jira/browse/KAFKA-13461] 

 

[#11563|https://github.com/apache/kafka/pull/11563]

> Kafka cluster loses the controller
> ----------------------------------
>
>                 Key: KAFKA-13556
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13556
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.8.1
>            Reporter: Andrei Lakhmanets
>            Priority: Major
>
> Hi team,
> *Kafka version:* 2.8.1
> *Configuration:* 3 kafka brokers in different availability zones and 3 zookeeper brokers in different availability zones.
> I faced with a bug when kafka cluster loses the controller and if after that restart any none controller broker then it stops processing data.
> *Context:*
> The kafka cluster has SASL configuration for connection.
> {code:java}
> # Listeners config
> listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092
> advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512
> inter.broker.listener.name=SASL_SSL
> sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
> inter.broker.protocol.version=2.8-IV0
> # Keystore config
> ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks
> ssl.keystore.password=---PASSWORD---
> ssl.key.password=---PASSWORD---
> ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks
> ssl.truststore.password=---PASSWORD--- 
> zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181
> zookeeper.connection.timeout.ms=6000 {code}
> Zookeeper doesn't have SASL configuration and uses connection without any authentication.
> So, when I start kafka brokers I see in logs error about auth failed but then ZK client switches to connection without authentication and all works fine.
> {code:java}
> [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,462] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,470] INFO Opening socket connection to server ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,478] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,496] INFO Session establishment complete on server ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) {code}
> Then I restart ZK node (or it restarts somehow) which holds connection with kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:
> {code:java}
> [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,583] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,593] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,808] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,019] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,019] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,022] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,023] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,704] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,704] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,707] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,709] INFO Session establishment complete on server ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,798] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,798] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,807] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,824] INFO Session establishment complete on server ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO [ZooKeeperClient Kafka server] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) {code}
> The main points here are next:
> 1. The log above is from kafka controller node.
> 2. Connection to ZK was lost _"Received event: WatchedEvent state:Disconnected"_
> 3. Connection was established with live ZK nodes _"Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent state:SyncConnected"_
> 4. During connection process we get _"Auth failed."_ error as it was during starting of brokers.
> 5. The error "Auth failed" is catched in the code: *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call code:
> {code:java}
> if (initialized)
>     scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) {code}
> where RetryBackoffMs is 1000.
> In a second after "Auth failed" message we see message "{_}Reinitializing due to auth failure.{_}" in the log.
> The method *ZooKeeperClient.scheduleReinitialize* calls {*}ZooKeeperClient.reinitialize{*}.
>  
> {code:java}
>   private def reinitialize(): Unit = {
>     // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
>     // may require additional Zookeeper requests, which will block to acquire the initialization lock
>     stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    inWriteLock(initializationLock) {
>       if (!connectionState.isAlive) {
>         zooKeeper.close()
>         info(s"Initializing a new session to $connectString.")
>         // retry forever until ZooKeeper can be instantiated
>         var connected = false
>         while (!connected) {
>           try {
>             zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)
>             connected = true
>           } catch {
>             case e: Exception =>
>               info("Error when recreating ZooKeeper, retrying after a short sleep", e)
>               Thread.sleep(RetryBackoffMs)
>           }
>         }
>       }
>     }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
>   } {code}
> The code inside "{_}inWriteLock(initializationLock){_}" block is not running, because the connection already was establised with live ZK nodes (in logs I didn't find the message s"{_}Initializing a new session to $connectString.{_}").
> But the code "{*}callBeforeInitializingSession{*}" calls "{*}KafkaController.startup.beforeInitializingSession{*}" which fires "{*}ControllerEvent.Expire{*}" event and the code "{*}callAfterInitializingSession"{*} calls "{*}KafkaController.startup.afterInitializingSession{*}" and fires "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event.
> The event "{*}ControllerEvent.Expire{*}" call "{*}KafkaController.processExpire"{*} method which shutting down the current controller and in logs we see next:
> {code:java}
> [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning (kafka.controller.KafkaController)
> [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController)
> [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped partition state machine (kafka.controller.ZkPartitionStateMachine)
> [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped replica state machine (kafka.controller.ZkReplicaStateMachine)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned (kafka.controller.KafkaController) {code}
> And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method "{*}KafkaController.processRegisterBrokerAndReelect{*}" => "{*}KafkaController.elect{*}". But in the logs there is next:
>  
> {code:java}
> [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) {code}
> There is a part of the method *"KafkaController.elect"* which write the message above
>  
>  
> {code:java}
>   private def elect(): Unit = {
>     activeControllerId = zkClient.getControllerId.getOrElse(-1)
>     /*
>      * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
>      * it's possible that the controller has already been elected when we get here. This check will prevent the following
>      * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
>      */
>     if (activeControllerId != -1) {
>       debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
>       return
>     } {code}
> The problem is:
> The current controller was shutting down because of "{*}KafkaController.processExpire{*}" event but a new one wasn't elected because ZK didn't clean /controller node.
> So in logs we see that Controller id=2 was resigned and then stopped election because think that it is still a live controller, but all listeners of ZK already shutted down.
> If after that I restart any non controller brokers (or they were restarted by any reason) then they don't get metadata because there is no the controller in cluster.
> And if we try to consume data from this brokers we get next errors:
> {code:java}
> /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 
> WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
> WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient){code}
> UPD: Workaround for this bug:
>  # Setup proper SASL connection for ZK
>  # Disable using SASL connection for ZK via property "-Dzookeeper.sasl.client=false"
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)