You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/05/05 14:57:28 UTC
[kafka] branch 2.8 updated: KAFKA-12730;
Avoid duplicate logout if Kerberos login fails (#10611)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new f732aaa KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)
f732aaa is described below
commit f732aaae7fd4a1f43731184b9c3c1a2e433d393a
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Apr 29 14:32:50 2021 +0100
KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)
From Java 9 onwards, LoginContext#logout() throws an NPE if invoked multiple times due to https://bugs.openjdk.java.net/browse/JDK-8173069. KerberosLogin currently attempts logout followed by login in a background refresh thread. If login fails we retry the same sequence. As a result, a single login failure prevents subsequent re-login. And clients will never be able to authenticate successfully after the first failure, until the process is restarted. The commit checks if logout is ne [...]
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../common/security/kerberos/KerberosLogin.java | 14 ++++-
.../kafka/server/GssapiAuthenticationTest.scala | 64 ++++++++++++++++++----
2 files changed, 63 insertions(+), 15 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 9626da8..3f0a46e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -362,17 +362,25 @@ public class KerberosLogin extends AbstractLogin {
lastLogin = currentElapsedTime();
//clear up the kerberos state. But the tokens are not cleared! As per
//the Java kerberos login module code, only the kerberos credentials
- //are cleared
- logout();
+ //are cleared. If previous logout succeeded but login failed, we shouldn't
+ //logout again since duplicate logout causes NPE from Java 9 onwards.
+ if (subject != null && !subject.getPrincipals().isEmpty()) {
+ logout();
+ }
//login and also update the subject field of this instance to
//have the new credentials (pass it to the LoginContext constructor)
loginContext = new LoginContext(contextName(), subject, null, configuration());
log.info("Initiating re-login for {}", principal);
- loginContext.login();
+ login(loginContext);
}
}
// Visibility to override for testing
+ protected void login(LoginContext loginContext) throws LoginException {
+ loginContext.login();
+ }
+
+ // Visibility to override for testing
protected void logout() throws LoginException {
loginContext.logout();
}
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 90454bb..5c75507 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -23,6 +23,7 @@ import java.time.Duration
import java.util.{Collections, Properties}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import javax.security.auth.login.LoginContext
import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
import kafka.utils.TestUtils
import org.apache.kafka.clients.CommonClientConfigs
@@ -102,6 +103,29 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
}
/**
+ * Verifies that if login fails, subsequent re-login without failures works and clients
+ * are able to connect after the second re-login. Verifies that logout is performed only once
+ * since duplicate logouts without successful login results in NPE from Java 9 onwards.
+ */
+ @Test
+ def testLoginFailure(): Unit = {
+ val selector = createSelectorWithRelogin()
+ try {
+ val login = TestableKerberosLogin.instance
+ assertNotNull(login)
+ login.loginException = Some(new RuntimeException("Test exception to fail login"))
+ executor.submit(() => login.reLogin(), 0)
+ executor.submit(() => login.reLogin(), 0)
+
+ verifyRelogin(selector, login)
+ assertEquals(2, login.loginAttempts)
+ assertEquals(1, login.logoutAttempts)
+ } finally {
+ selector.close()
+ }
+ }
+
+ /**
* Verifies that there are no authentication failures during Kerberos re-login. If authentication
* is performed when credentials are unavailable between logout and login, we handle it as a
* transient error and not an authentication failure so that clients may retry.
@@ -113,23 +137,26 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
val login = TestableKerberosLogin.instance
assertNotNull(login)
executor.submit(() => login.reLogin(), 0)
-
- val node1 = "1"
- selector.connect(node1, serverAddr, 1024, 1024)
- login.logoutResumeLatch.countDown()
- login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
- assertFalse(pollUntilReadyOrDisconnected(selector, node1), "Authenticated during re-login")
-
- login.reLoginResumeLatch.countDown()
- login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
- val node2 = "2"
- selector.connect(node2, serverAddr, 1024, 1024)
- assertTrue(pollUntilReadyOrDisconnected(selector, node2), "Authenticated failed after re-login")
+ verifyRelogin(selector, login)
} finally {
selector.close()
}
}
+ private def verifyRelogin(selector: Selector, login: TestableKerberosLogin): Unit = {
+ val node1 = "1"
+ selector.connect(node1, serverAddr, 1024, 1024)
+ login.logoutResumeLatch.countDown()
+ login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
+ assertFalse(pollUntilReadyOrDisconnected(selector, node1), "Authenticated during re-login")
+
+ login.reLoginResumeLatch.countDown()
+ login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
+ val node2 = "2"
+ selector.connect(node2, serverAddr, 1024, 1024)
+ assertTrue(pollUntilReadyOrDisconnected(selector, node2), "Authenticated failed after re-login")
+ }
+
/**
* Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
* as a fatal authentication failure.
@@ -256,6 +283,9 @@ class TestableKerberosLogin extends KerberosLogin {
val logoutCompleteLatch = new CountDownLatch(1)
val reLoginResumeLatch = new CountDownLatch(1)
val reLoginCompleteLatch = new CountDownLatch(1)
+ @volatile var loginException: Option[RuntimeException] = None
+ @volatile var loginAttempts = 0
+ @volatile var logoutAttempts = 0
assertNull(TestableKerberosLogin.instance)
TestableKerberosLogin.instance = this
@@ -265,7 +295,17 @@ class TestableKerberosLogin extends KerberosLogin {
reLoginCompleteLatch.countDown()
}
+ override protected def login(loginContext: LoginContext): Unit = {
+ loginAttempts += 1
+ loginException.foreach { e =>
+ loginException = None
+ throw e
+ }
+ super.login(loginContext)
+ }
+
override protected def logout(): Unit = {
+ logoutAttempts += 1
logoutResumeLatch.await(15, TimeUnit.SECONDS)
super.logout()
logoutCompleteLatch.countDown()