You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Andrei Lakhmanets (Jira)" <ji...@apache.org> on 2021/12/20 09:36:00 UTC

[jira] [Updated] (KAFKA-13556) Kafka cluster loses the controller

     [ https://issues.apache.org/jira/browse/KAFKA-13556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrei Lakhmanets updated KAFKA-13556:
--------------------------------------
    Description: 
Hi team,

*Kafka version:* 2.8.1
*Configuration:* 3 kafka brokers in different availability zones and 3 zookeeper brokers in different availability zones.

I faced with a bug when kafka cluster loses the controller and if after that restart any none controller broker then it stops processing data.

*Context:*
The kafka cluster has SASL configuration for connection.
{code:java}
# Listeners config
listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092
advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512
inter.broker.listener.name=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
inter.broker.protocol.version=2.8-IV0
# Keystore config
ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks
ssl.keystore.password=---PASSWORD---
ssl.key.password=---PASSWORD---
ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks
ssl.truststore.password=---PASSWORD--- {code}

Zookeeper doesn't have SASL configuration and uses connection without any authentication.
{code:java}
zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181
zookeeper.connection.timeout.ms=6000 {code}
So, when I start kafka brokers I see in logs error about auth failed but then ZK client switches to connection without authentication and all works fine.
{code:java}
[2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,462] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,470] INFO Opening socket connection to server ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,478] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,496] INFO Session establishment complete on server ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) {code}
Then I restart ZK node (or it restarts somehow) which holds connection with kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:
{code:java}
[2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,583] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,593] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,808] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,019] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,019] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,022] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,023] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,704] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,704] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,707] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,709] INFO Session establishment complete on server ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,798] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,798] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,807] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,824] INFO Session establishment complete on server ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO [ZooKeeperClient Kafka server] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) {code}
The main points here are next:
1. The log above is from kafka controller node.
2. Connection to ZK was lost _"Received event: WatchedEvent state:Disconnected"_
3. Connection was established with live ZK nodes _"Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent state:SyncConnected"_
4. During connection process we get _"Auth failed."_ error as it was during starting of brokers.
5. The error "Auth failed" is catched in the code: *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call code:
{code:java}
if (initialized)
    scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) {code}
where RetryBackoffMs is 1000.

In a second after "Auth failed" message we see message "{_}Reinitializing due to auth failure.{_}" in the log.
The method *ZooKeeperClient.scheduleReinitialize* calls {*}ZooKeeperClient.reinitialize{*}.

 
{code:java}
  private def reinitialize(): Unit = {
    // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
    // may require additional Zookeeper requests, which will block to acquire the initialization lock
    stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    inWriteLock(initializationLock) {
      if (!connectionState.isAlive) {
        zooKeeper.close()
        info(s"Initializing a new session to $connectString.")
        // retry forever until ZooKeeper can be instantiated
        var connected = false
        while (!connected) {
          try {
            zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)
            connected = true
          } catch {
            case e: Exception =>
              info("Error when recreating ZooKeeper, retrying after a short sleep", e)
              Thread.sleep(RetryBackoffMs)
          }
        }
      }
    }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
  } {code}
The code inside "{_}inWriteLock(initializationLock){_}" block is not running, because the connection already was establised with live ZK nodes (in logs I didn't find the message s"{_}Initializing a new session to $connectString.{_}").
But the code "{*}callBeforeInitializingSession{*}" calls "{*}KafkaController.startup.beforeInitializingSession{*}" which fires "{*}ControllerEvent.Expire{*}" event and the code "{*}callAfterInitializingSession"{*} calls "{*}KafkaController.startup.afterInitializingSession{*}" and fires "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event.

The event "{*}ControllerEvent.Expire{*}" call "{*}KafkaController.processExpire"{*} method which shutting down the current controller and in logs we see next:
{code:java}
[2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning (kafka.controller.KafkaController)
[2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController)
[2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped partition state machine (kafka.controller.ZkPartitionStateMachine)
[2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped replica state machine (kafka.controller.ZkReplicaStateMachine)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned (kafka.controller.KafkaController) {code}
And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method "{*}KafkaController.processRegisterBrokerAndReelect{*}" => "{*}KafkaController.elect{*}". But in the logs there is next:

 
{code:java}
[2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) {code}
There is a part of the method *"KafkaController.elect"* which write the message above

 

 
{code:java}
  private def elect(): Unit = {
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    /*
     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
     * it's possible that the controller has already been elected when we get here. This check will prevent the following
     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
     */
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    } {code}
The problem is:
The current controller was shutting down because of "{*}KafkaController.processExpire{*}" event but a new one wasn't elected because ZK didn't clean /controller node.

So in logs we see that Controller id=2 was resigned and then stopped election because think that it is still a live controller, but all listeners of ZK already shutted down.

If after that I restart any non controller brokers (or they were restarted by any reason) then they don't get metadata because there is no the controller in cluster.
And if we try to consume data from this brokers we get next errors:
{code:java}
/opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 

WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient){code}
 

 

 

 

  was:
Hi team,

*Kafka version:* 2.8.1
*Configuration:* 3 kafka brokers in different availability zones and 3 zookeeper brokers in different availability zones.


I faced with a bug when kafka cluster loses the controller and if after that restart any none controller broker then it stops processing data.

*Context:*
My kafka cluster has SASL configuration for connection.
Zookeeper doesn't have SASL configuration and uses connection without any authentication.


So, when I start kafka brokers I see in logs error about auth failed but then ZK client switches to connection without authentication and all works fine.


{code:java}
[2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,462] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,470] INFO Opening socket connection to server ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,478] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,496] INFO Session establishment complete on server ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) {code}
Then I restart ZK node (or it restarts somehow) which holds connection with kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:
{code:java}
[2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,583] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,593] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,808] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,019] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,019] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,022] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,023] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,704] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,704] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,707] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,709] INFO Session establishment complete on server ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,798] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,798] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,807] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,824] INFO Session establishment complete on server ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO [ZooKeeperClient Kafka server] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) {code}
The main points here are next:
1. The log above is from kafka controller node.
2. Connection to ZK was lost _"Received event: WatchedEvent state:Disconnected"_
3. Connection was established with live ZK nodes _"Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent state:SyncConnected"_
4. During connection process we get _"Auth failed."_ error as it was during starting of brokers.
5. The error "Auth failed" is catched in the code: *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call code:
{code:java}
if (initialized)
    scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) {code}
where RetryBackoffMs is 1000.



In a second after "Auth failed" message we see message "{_}Reinitializing due to auth failure.{_}" in the log.
The method *ZooKeeperClient.scheduleReinitialize* calls {*}ZooKeeperClient.reinitialize{*}.

 
{code:java}
  private def reinitialize(): Unit = {
    // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
    // may require additional Zookeeper requests, which will block to acquire the initialization lock
    stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    inWriteLock(initializationLock) {
      if (!connectionState.isAlive) {
        zooKeeper.close()
        info(s"Initializing a new session to $connectString.")
        // retry forever until ZooKeeper can be instantiated
        var connected = false
        while (!connected) {
          try {
            zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)
            connected = true
          } catch {
            case e: Exception =>
              info("Error when recreating ZooKeeper, retrying after a short sleep", e)
              Thread.sleep(RetryBackoffMs)
          }
        }
      }
    }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
  } {code}
The code inside "{_}inWriteLock(initializationLock){_}" block is not running, because the connection already was establised with live ZK nodes (in logs I didn't find the message s"{_}Initializing a new session to $connectString.{_}").
But the code "{*}callBeforeInitializingSession{*}" calls "{*}KafkaController.startup.beforeInitializingSession{*}" which fires "{*}ControllerEvent.Expire{*}" event and the code "{*}callAfterInitializingSession"{*} calls "{*}KafkaController.startup.afterInitializingSession{*}" and fires "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event.

The event "{*}ControllerEvent.Expire{*}" call "{*}KafkaController.processExpire"{*} method which shutting down the current controller and in logs we see next:


{code:java}
[2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning (kafka.controller.KafkaController)
[2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController)
[2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped partition state machine (kafka.controller.ZkPartitionStateMachine)
[2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped replica state machine (kafka.controller.ZkReplicaStateMachine)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned (kafka.controller.KafkaController) {code}
And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method "{*}KafkaController.processRegisterBrokerAndReelect{*}" => "{*}KafkaController.elect{*}". But in the logs there is next:

 
{code:java}
[2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) {code}
There is a part of the method *"KafkaController.elect"* which write the message above

 

 
{code:java}
  private def elect(): Unit = {
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    /*
     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
     * it's possible that the controller has already been elected when we get here. This check will prevent the following
     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
     */
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    } {code}
The problem is:
The current controller was shutting down because of "{*}KafkaController.processExpire{*}" event but a new one wasn't elected because ZK didn't clean /controller node.

So in logs we see that Controller id=2 was resigned and then stopped election because think that it is still a live controller, but all listeners of ZK already shutted down.

If after that I restart any non controller brokers (or they were restarted by any reason) then they don't get metadata because there is no the controller in cluster.
And if we try to consume data from this brokers we get next errors:
{code:java}
/opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 

WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient){code}
 

 

 

 


> Kafka cluster loses the controller
> ----------------------------------
>
>                 Key: KAFKA-13556
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13556
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.8.1
>            Reporter: Andrei Lakhmanets
>            Priority: Major
>
> Hi team,
> *Kafka version:* 2.8.1
> *Configuration:* 3 kafka brokers in different availability zones and 3 zookeeper brokers in different availability zones.
> I faced with a bug when kafka cluster loses the controller and if after that restart any none controller broker then it stops processing data.
> *Context:*
> The kafka cluster has SASL configuration for connection.
> {code:java}
> # Listeners config
> listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092
> advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512
> inter.broker.listener.name=SASL_SSL
> sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
> inter.broker.protocol.version=2.8-IV0
> # Keystore config
> ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks
> ssl.keystore.password=---PASSWORD---
> ssl.key.password=---PASSWORD---
> ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks
> ssl.truststore.password=---PASSWORD--- {code}
> Zookeeper doesn't have SASL configuration and uses connection without any authentication.
> {code:java}
> zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181
> zookeeper.connection.timeout.ms=6000 {code}
> So, when I start kafka brokers I see in logs error about auth failed but then ZK client switches to connection without authentication and all works fine.
> {code:java}
> [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,462] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,470] INFO Opening socket connection to server ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,478] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,496] INFO Session establishment complete on server ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) {code}
> Then I restart ZK node (or it restarts somehow) which holds connection with kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:
> {code:java}
> [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,583] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,593] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,808] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,019] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,019] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,022] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,023] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,704] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,704] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,707] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,709] INFO Session establishment complete on server ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,798] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,798] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,807] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,824] INFO Session establishment complete on server ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO [ZooKeeperClient Kafka server] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) {code}
> The main points here are next:
> 1. The log above is from kafka controller node.
> 2. Connection to ZK was lost _"Received event: WatchedEvent state:Disconnected"_
> 3. Connection was established with live ZK nodes _"Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent state:SyncConnected"_
> 4. During connection process we get _"Auth failed."_ error as it was during starting of brokers.
> 5. The error "Auth failed" is catched in the code: *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call code:
> {code:java}
> if (initialized)
>     scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) {code}
> where RetryBackoffMs is 1000.
> In a second after "Auth failed" message we see message "{_}Reinitializing due to auth failure.{_}" in the log.
> The method *ZooKeeperClient.scheduleReinitialize* calls {*}ZooKeeperClient.reinitialize{*}.
>  
> {code:java}
>   private def reinitialize(): Unit = {
>     // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
>     // may require additional Zookeeper requests, which will block to acquire the initialization lock
>     stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    inWriteLock(initializationLock) {
>       if (!connectionState.isAlive) {
>         zooKeeper.close()
>         info(s"Initializing a new session to $connectString.")
>         // retry forever until ZooKeeper can be instantiated
>         var connected = false
>         while (!connected) {
>           try {
>             zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)
>             connected = true
>           } catch {
>             case e: Exception =>
>               info("Error when recreating ZooKeeper, retrying after a short sleep", e)
>               Thread.sleep(RetryBackoffMs)
>           }
>         }
>       }
>     }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
>   } {code}
> The code inside "{_}inWriteLock(initializationLock){_}" block is not running, because the connection already was establised with live ZK nodes (in logs I didn't find the message s"{_}Initializing a new session to $connectString.{_}").
> But the code "{*}callBeforeInitializingSession{*}" calls "{*}KafkaController.startup.beforeInitializingSession{*}" which fires "{*}ControllerEvent.Expire{*}" event and the code "{*}callAfterInitializingSession"{*} calls "{*}KafkaController.startup.afterInitializingSession{*}" and fires "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event.
> The event "{*}ControllerEvent.Expire{*}" call "{*}KafkaController.processExpire"{*} method which shutting down the current controller and in logs we see next:
> {code:java}
> [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning (kafka.controller.KafkaController)
> [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController)
> [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped partition state machine (kafka.controller.ZkPartitionStateMachine)
> [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped replica state machine (kafka.controller.ZkReplicaStateMachine)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned (kafka.controller.KafkaController) {code}
> And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method "{*}KafkaController.processRegisterBrokerAndReelect{*}" => "{*}KafkaController.elect{*}". But in the logs there is next:
>  
> {code:java}
> [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) {code}
> There is a part of the method *"KafkaController.elect"* which write the message above
>  
>  
> {code:java}
>   private def elect(): Unit = {
>     activeControllerId = zkClient.getControllerId.getOrElse(-1)
>     /*
>      * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
>      * it's possible that the controller has already been elected when we get here. This check will prevent the following
>      * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
>      */
>     if (activeControllerId != -1) {
>       debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
>       return
>     } {code}
> The problem is:
> The current controller was shutting down because of "{*}KafkaController.processExpire{*}" event but a new one wasn't elected because ZK didn't clean /controller node.
> So in logs we see that Controller id=2 was resigned and then stopped election because think that it is still a live controller, but all listeners of ZK already shutted down.
> If after that I restart any non controller brokers (or they were restarted by any reason) then they don't get metadata because there is no the controller in cluster.
> And if we try to consume data from this brokers we get next errors:
> {code:java}
> /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 
> WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
> WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient){code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)