You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/05/05 14:57:28 UTC

[kafka] branch 2.8 updated: KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new f732aaa  KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)
f732aaa is described below

commit f732aaae7fd4a1f43731184b9c3c1a2e433d393a
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Apr 29 14:32:50 2021 +0100

    KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)
    
    From Java 9 onwards, LoginContext#logout() throws an NPE if invoked multiple times due to https://bugs.openjdk.java.net/browse/JDK-8173069. KerberosLogin currently attempts logout followed by login in a background refresh thread. If login fails we retry the same sequence. As a result, a single login failure prevents subsequent re-login. And clients will never be able to authenticate successfully after the first failure, until the process is restarted. The commit checks if logout is ne [...]
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../common/security/kerberos/KerberosLogin.java    | 14 ++++-
 .../kafka/server/GssapiAuthenticationTest.scala    | 64 ++++++++++++++++++----
 2 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 9626da8..3f0a46e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -362,17 +362,25 @@ public class KerberosLogin extends AbstractLogin {
             lastLogin = currentElapsedTime();
             //clear up the kerberos state. But the tokens are not cleared! As per
             //the Java kerberos login module code, only the kerberos credentials
-            //are cleared
-            logout();
+            //are cleared. If previous logout succeeded but login failed, we shouldn't
+            //logout again since duplicate logout causes NPE from Java 9 onwards.
+            if (subject != null && !subject.getPrincipals().isEmpty()) {
+                logout();
+            }
             //login and also update the subject field of this instance to
             //have the new credentials (pass it to the LoginContext constructor)
             loginContext = new LoginContext(contextName(), subject, null, configuration());
             log.info("Initiating re-login for {}", principal);
-            loginContext.login();
+            login(loginContext);
         }
     }
 
     // Visibility to override for testing
+    protected void login(LoginContext loginContext) throws LoginException {
+        loginContext.login();
+    }
+
+    // Visibility to override for testing
     protected void logout() throws LoginException {
         loginContext.logout();
     }
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 90454bb..5c75507 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -23,6 +23,7 @@ import java.time.Duration
 import java.util.{Collections, Properties}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 
+import javax.security.auth.login.LoginContext
 import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.CommonClientConfigs
@@ -102,6 +103,29 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
   }
 
   /**
+   * Verifies that if login fails, subsequent re-login without failures works and clients
+   * are able to connect after the second re-login. Verifies that logout is performed only once
+   * since duplicate logouts without successful login results in NPE from Java 9 onwards.
+   */
+  @Test
+  def testLoginFailure(): Unit = {
+    val selector = createSelectorWithRelogin()
+    try {
+      val login = TestableKerberosLogin.instance
+      assertNotNull(login)
+      login.loginException = Some(new RuntimeException("Test exception to fail login"))
+      executor.submit(() => login.reLogin(), 0)
+      executor.submit(() => login.reLogin(), 0)
+
+      verifyRelogin(selector, login)
+      assertEquals(2, login.loginAttempts)
+      assertEquals(1, login.logoutAttempts)
+    } finally {
+      selector.close()
+    }
+  }
+
+  /**
    * Verifies that there are no authentication failures during Kerberos re-login. If authentication
    * is performed when credentials are unavailable between logout and login, we handle it as a
    * transient error and not an authentication failure so that clients may retry.
@@ -113,23 +137,26 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
       val login = TestableKerberosLogin.instance
       assertNotNull(login)
       executor.submit(() => login.reLogin(), 0)
-
-      val node1 = "1"
-      selector.connect(node1, serverAddr, 1024, 1024)
-      login.logoutResumeLatch.countDown()
-      login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
-      assertFalse(pollUntilReadyOrDisconnected(selector, node1), "Authenticated during re-login")
-
-      login.reLoginResumeLatch.countDown()
-      login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
-      val node2 = "2"
-      selector.connect(node2, serverAddr, 1024, 1024)
-      assertTrue(pollUntilReadyOrDisconnected(selector, node2), "Authenticated failed after re-login")
+      verifyRelogin(selector, login)
     } finally {
       selector.close()
     }
   }
 
+  private def verifyRelogin(selector: Selector, login: TestableKerberosLogin): Unit = {
+    val node1 = "1"
+    selector.connect(node1, serverAddr, 1024, 1024)
+    login.logoutResumeLatch.countDown()
+    login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
+    assertFalse(pollUntilReadyOrDisconnected(selector, node1), "Authenticated during re-login")
+
+    login.reLoginResumeLatch.countDown()
+    login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
+    val node2 = "2"
+    selector.connect(node2, serverAddr, 1024, 1024)
+    assertTrue(pollUntilReadyOrDisconnected(selector, node2), "Authenticated failed after re-login")
+  }
+
   /**
    * Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
    * as a fatal authentication failure.
@@ -256,6 +283,9 @@ class TestableKerberosLogin extends KerberosLogin {
   val logoutCompleteLatch = new CountDownLatch(1)
   val reLoginResumeLatch = new CountDownLatch(1)
   val reLoginCompleteLatch = new CountDownLatch(1)
+  @volatile var loginException: Option[RuntimeException] = None
+  @volatile var loginAttempts = 0
+  @volatile var logoutAttempts = 0
 
   assertNull(TestableKerberosLogin.instance)
   TestableKerberosLogin.instance = this
@@ -265,7 +295,17 @@ class TestableKerberosLogin extends KerberosLogin {
     reLoginCompleteLatch.countDown()
   }
 
+  override protected def login(loginContext: LoginContext): Unit = {
+    loginAttempts += 1
+    loginException.foreach { e =>
+      loginException = None
+      throw e
+    }
+    super.login(loginContext)
+  }
+
   override protected def logout(): Unit = {
+    logoutAttempts += 1
     logoutResumeLatch.await(15, TimeUnit.SECONDS)
     super.logout()
     logoutCompleteLatch.countDown()