You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/12/02 22:24:36 UTC

[kafka] branch 3.0 updated: KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new ac923b0  KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563)
ac923b0 is described below

commit ac923b0611817c9400e98cac5e0e19c18833de56
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Dec 2 22:10:37 2021 +0000

    KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563)
    
    If JAAS configuration does not contain a Client section for ZK clients, an auth failure event is generated. If this occurs after the connection is setup in the controller, we schedule reinitialize(), which causes controller to resign. In the case where SASL is not mandatory and the connection is alive, controller maintains the current session and doesn't register its watchers, leaving it in a bad state.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala     |  4 ++--
 .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 11 ++++++++++-
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 091b401..bc634a8 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -433,14 +433,14 @@ class ZooKeeperClient(connectString: String,
             isConnectedOrExpiredCondition.signalAll()
           }
           if (state == KeeperState.AuthFailed) {
-            error("Auth failed.")
+            error(s"Auth failed, initialized=$isFirstConnectionEstablished connectionState=$connectionState")
             stateChangeHandlers.values.foreach(_.onAuthFailure())
 
             // If this is during initial startup, we fail fast. Otherwise, schedule retry.
             val initialized = inLock(isConnectedOrExpiredLock) {
               isFirstConnectionEstablished
             }
-            if (initialized)
+            if (initialized && !connectionState.isAlive)
               scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs)
           } else if (state == KeeperState.Expired) {
             scheduleReinitialize("session-expired", "Session expired.", delayMs = 0L)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index a0eb1ea..37954e6 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -648,9 +648,18 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     }
 
     zooKeeperClient.close()
-    zooKeeperClient = newZooKeeperClient()
+    @volatile var connectionStateOverride: Option[States] = None
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
+      zkMaxInFlightRequests, time, "testMetricGroup", "testMetricType", new ZKClientConfig, "ZooKeeperClientTest") {
+      override def connectionState: States = connectionStateOverride.getOrElse(super.connectionState)
+    }
     zooKeeperClient.registerStateChangeHandler(changeHandler)
 
+    connectionStateOverride = Some(States.CONNECTED)
+    zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
+    assertFalse(sessionInitializedCountDownLatch.await(10, TimeUnit.MILLISECONDS), "Unexpected session initialization when connection is alive")
+
+    connectionStateOverride = Some(States.AUTH_FAILED)
     zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
     assertTrue(sessionInitializedCountDownLatch.await(5, TimeUnit.SECONDS), "Failed to receive session initializing notification")
   }