You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/05/05 15:13:03 UTC

[kafka] branch 2.7 updated (f98db1e -> adaeb0f)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from f98db1e  MINOR: Bump latest 2.6 version to 2.6.2 (#10582)
     new 6e23078  KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients (#9605)
     new adaeb0f  KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/common/network/SaslChannelBuilder.java   |   2 +-
 .../authenticator/SaslClientAuthenticator.java     |   8 +-
 .../common/security/kerberos/KerberosError.java    |  19 +++
 .../common/security/kerberos/KerberosLogin.java    |  21 ++-
 .../kafka/server/GssapiAuthenticationTest.scala    | 147 +++++++++++++++++++--
 5 files changed, 176 insertions(+), 21 deletions(-)

[kafka] 02/02: KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)

Posted by rs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit adaeb0f6469207e7e2eb44bdc14434cd4d2ee6a7
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Apr 29 14:32:50 2021 +0100

    KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)
    
    From Java 9 onwards, LoginContext#logout() throws an NPE if invoked multiple times due to https://bugs.openjdk.java.net/browse/JDK-8173069. KerberosLogin currently attempts logout followed by login in a background refresh thread. If login fails we retry the same sequence. As a result, a single login failure prevents subsequent re-login. And clients will never be able to authenticate successfully after the first failure, until the process is restarted. The commit checks if logout is ne [...]
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../common/security/kerberos/KerberosLogin.java    | 14 ++++-
 .../kafka/server/GssapiAuthenticationTest.scala    | 64 ++++++++++++++++++----
 2 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 9626da8..3f0a46e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -362,17 +362,25 @@ public class KerberosLogin extends AbstractLogin {
             lastLogin = currentElapsedTime();
             //clear up the kerberos state. But the tokens are not cleared! As per
             //the Java kerberos login module code, only the kerberos credentials
-            //are cleared
-            logout();
+            //are cleared. If previous logout succeeded but login failed, we shouldn't
+            //logout again since duplicate logout causes NPE from Java 9 onwards.
+            if (subject != null && !subject.getPrincipals().isEmpty()) {
+                logout();
+            }
             //login and also update the subject field of this instance to
             //have the new credentials (pass it to the LoginContext constructor)
             loginContext = new LoginContext(contextName(), subject, null, configuration());
             log.info("Initiating re-login for {}", principal);
-            loginContext.login();
+            login(loginContext);
         }
     }
 
     // Visibility to override for testing
+    protected void login(LoginContext loginContext) throws LoginException {
+        loginContext.login();
+    }
+
+    // Visibility to override for testing
     protected void logout() throws LoginException {
         loginContext.logout();
     }
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index fa21a94..efb51f1 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -23,6 +23,7 @@ import java.time.Duration
 import java.util.{Collections, Properties}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 
+import javax.security.auth.login.LoginContext
 import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.CommonClientConfigs
@@ -100,6 +101,29 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
   }
 
   /**
+   * Verifies that if login fails, subsequent re-login without failures works and clients
+   * are able to connect after the second re-login. Verifies that logout is performed only once
+   * since duplicate logouts without successful login results in NPE from Java 9 onwards.
+   */
+  @Test
+  def testLoginFailure(): Unit = {
+    val selector = createSelectorWithRelogin()
+    try {
+      val login = TestableKerberosLogin.instance
+      assertNotNull(login)
+      login.loginException = Some(new RuntimeException("Test exception to fail login"))
+      executor.submit(() => login.reLogin(), 0)
+      executor.submit(() => login.reLogin(), 0)
+
+      verifyRelogin(selector, login)
+      assertEquals(2, login.loginAttempts)
+      assertEquals(1, login.logoutAttempts)
+    } finally {
+      selector.close()
+    }
+  }
+
+  /**
    * Verifies that there are no authentication failures during Kerberos re-login. If authentication
    * is performed when credentials are unavailable between logout and login, we handle it as a
    * transient error and not an authentication failure so that clients may retry.
@@ -111,23 +135,26 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
       val login = TestableKerberosLogin.instance
       assertNotNull(login)
       executor.submit(() => login.reLogin(), 0)
-
-      val node1 = "1"
-      selector.connect(node1, serverAddr, 1024, 1024)
-      login.logoutResumeLatch.countDown()
-      login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
-      assertFalse("Authenticated during re-login", pollUntilReadyOrDisconnected(selector, node1))
-
-      login.reLoginResumeLatch.countDown()
-      login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
-      val node2 = "2"
-      selector.connect(node2, serverAddr, 1024, 1024)
-      assertTrue("Authenticated failed after re-login", pollUntilReadyOrDisconnected(selector, node2))
+      verifyRelogin(selector, login)
     } finally {
       selector.close()
     }
   }
 
+  private def verifyRelogin(selector: Selector, login: TestableKerberosLogin): Unit = {
+    val node1 = "1"
+    selector.connect(node1, serverAddr, 1024, 1024)
+    login.logoutResumeLatch.countDown()
+    login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
+    assertFalse("Authenticated during re-login", pollUntilReadyOrDisconnected(selector, node1))
+
+    login.reLoginResumeLatch.countDown()
+    login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
+    val node2 = "2"
+    selector.connect(node2, serverAddr, 1024, 1024)
+    assertTrue("Authenticated failed after re-login", pollUntilReadyOrDisconnected(selector, node2))
+  }
+
   /**
    * Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
    * as a fatal authentication failure.
@@ -258,6 +285,9 @@ class TestableKerberosLogin extends KerberosLogin {
   val logoutCompleteLatch = new CountDownLatch(1)
   val reLoginResumeLatch = new CountDownLatch(1)
   val reLoginCompleteLatch = new CountDownLatch(1)
+  @volatile var loginException: Option[RuntimeException] = None
+  @volatile var loginAttempts = 0
+  @volatile var logoutAttempts = 0
 
   assertNull(TestableKerberosLogin.instance)
   TestableKerberosLogin.instance = this
@@ -267,7 +297,17 @@ class TestableKerberosLogin extends KerberosLogin {
     reLoginCompleteLatch.countDown()
   }
 
+  override protected def login(loginContext: LoginContext): Unit = {
+    loginAttempts += 1
+    loginException.foreach { e =>
+      loginException = None
+      throw e
+    }
+    super.login(loginContext)
+  }
+
   override protected def logout(): Unit = {
+    logoutAttempts += 1
     logoutResumeLatch.await(15, TimeUnit.SECONDS)
     super.logout()
     logoutCompleteLatch.countDown()

[kafka] 01/02: KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients (#9605)

Posted by rs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 6e230782c54a47231d38e3643323d6d5d2cad311
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Nov 23 09:04:16 2020 +0000

    KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients (#9605)
    
    We use a background thread for Kerberos to perform re-login before tickets expire. The thread performs logout() followed by login(), relying on the Java library to clear and then populate credentials in Subject. This leaves a timing window where clients fail to authenticate because credentials are not available. We cannot introduce any form of locking since authentication is performed on the network thread. So this commit treats NO_CRED as a transient failure rather than a fatal authe [...]
    
    Reviewers: Ron Dagostino <rd...@confluent.io>, Manikumar Reddy <ma...@gmail.com>
---
 .../kafka/common/network/SaslChannelBuilder.java   |   2 +-
 .../authenticator/SaslClientAuthenticator.java     |   8 +-
 .../common/security/kerberos/KerberosError.java    |  19 ++++
 .../common/security/kerberos/KerberosLogin.java    |   9 +-
 .../kafka/server/GssapiAuthenticationTest.scala    | 107 ++++++++++++++++++---
 5 files changed, 126 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index a29148a..f01c4ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -327,7 +327,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
         }
     }
 
-    private Class<? extends Login> defaultLoginClass() {
+    protected Class<? extends Login> defaultLoginClass() {
         if (jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM))
             return KerberosLogin.class;
         if (OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(clientSaslMechanism))
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index bba1c43..fb30c87 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -533,15 +533,17 @@ public class SaslClientAuthenticator implements Authenticator {
                     " Users must configure FQDN of kafka brokers when authenticating using SASL and" +
                     " `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
             }
-            error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
             //Unwrap the SaslException inside `PrivilegedActionException`
             Throwable cause = e.getCause();
             // Treat transient Kerberos errors as non-fatal SaslExceptions that are processed as I/O exceptions
             // and all other failures as fatal SaslAuthenticationException.
-            if (kerberosError != null && kerberosError.retriable())
+            if ((kerberosError != null && kerberosError.retriable()) || (kerberosError == null && KerberosError.isRetriableClientGssException(e))) {
+                error += " Kafka Client will retry.";
                 throw new SaslException(error, cause);
-            else
+            } else {
+                error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
                 throw new SaslAuthenticationException(error, cause);
+            }
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
index 9c76482..4b8e8e0 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.kerberos;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.utils.Java;
+import org.ietf.jgss.GSSException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,4 +110,22 @@ public enum KerberosError {
         }
         return null;
     }
+
+    /**
+     * Returns true if the exception should be handled as a transient failure on clients.
+     * We handle GSSException.NO_CRED as retriable on the client-side since this may
+     * occur during re-login if a clients attempts to authentication after logout, but
+     * before the subsequent login.
+     */
+    public static boolean isRetriableClientGssException(Exception exception) {
+        Throwable cause = exception.getCause();
+        while (cause != null && !(cause instanceof GSSException)) {
+            cause = cause.getCause();
+        }
+        if (cause != null) {
+            GSSException gssException = (GSSException) cause;
+            return gssException.getMajor() == GSSException.NO_CRED;
+        }
+        return false;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 5efd722..9626da8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -346,7 +346,7 @@ public class KerberosLogin extends AbstractLogin {
      * Re-login a principal. This method assumes that {@link #login()} has happened already.
      * @throws javax.security.auth.login.LoginException on a failure
      */
-    private void reLogin() throws LoginException {
+    protected void reLogin() throws LoginException {
         if (!isKrbTicket) {
             return;
         }
@@ -363,7 +363,7 @@ public class KerberosLogin extends AbstractLogin {
             //clear up the kerberos state. But the tokens are not cleared! As per
             //the Java kerberos login module code, only the kerberos credentials
             //are cleared
-            loginContext.logout();
+            logout();
             //login and also update the subject field of this instance to
             //have the new credentials (pass it to the LoginContext constructor)
             loginContext = new LoginContext(contextName(), subject, null, configuration());
@@ -372,6 +372,11 @@ public class KerberosLogin extends AbstractLogin {
         }
     }
 
+    // Visibility to override for testing
+    protected void logout() throws LoginException {
+        loginContext.logout();
+    }
+
     private long currentElapsedTime() {
         return time.hiResClockMs();
     }
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index d6414c0..fa21a94 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -20,8 +20,8 @@ package kafka.server
 
 import java.net.InetSocketAddress
 import java.time.Duration
-import java.util.Properties
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.{Collections, Properties}
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 
 import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
 import kafka.utils.TestUtils
@@ -31,7 +31,8 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.SaslAuthenticationException
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
+import org.apache.kafka.common.security.kerberos.KerberosLogin
 import org.apache.kafka.common.utils.{LogContext, MockTime}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -57,6 +58,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
 
   @Before
   override def setUp(): Unit = {
+    TestableKerberosLogin.reset()
     startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
     serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
     serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp, failedAuthenticationDelayMs.toString)
@@ -78,6 +80,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
     executor.shutdownNow()
     super.tearDown()
     closeSasl()
+    TestableKerberosLogin.reset()
   }
 
   /**
@@ -97,6 +100,35 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
   }
 
   /**
+   * Verifies that there are no authentication failures during Kerberos re-login. If authentication
+   * is performed when credentials are unavailable between logout and login, we handle it as a
+   * transient error and not an authentication failure so that clients may retry.
+   */
+  @Test
+  def testReLogin(): Unit = {
+    val selector = createSelectorWithRelogin()
+    try {
+      val login = TestableKerberosLogin.instance
+      assertNotNull(login)
+      executor.submit(() => login.reLogin(), 0)
+
+      val node1 = "1"
+      selector.connect(node1, serverAddr, 1024, 1024)
+      login.logoutResumeLatch.countDown()
+      login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
+      assertFalse("Authenticated during re-login", pollUntilReadyOrDisconnected(selector, node1))
+
+      login.reLoginResumeLatch.countDown()
+      login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
+      val node2 = "2"
+      selector.connect(node2, serverAddr, 1024, 1024)
+      assertTrue("Authenticated failed after re-login", pollUntilReadyOrDisconnected(selector, node2))
+    } finally {
+      selector.close()
+    }
+  }
+
+  /**
    * Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
    * as a fatal authentication failure.
    */
@@ -149,16 +181,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
       while (actualSuccessfulAuths < numSuccessfulAuths) {
         val nodeId = actualSuccessfulAuths.toString
         selector.connect(nodeId, serverAddr, 1024, 1024)
-        TestUtils.waitUntilTrue(() => {
-          selector.poll(100)
-          val disconnectState = selector.disconnected().get(nodeId)
-          // Verify that disconnect state is not AUTHENTICATION_FAILED
-          if (disconnectState != null)
-            assertEquals(s"Authentication failed with exception ${disconnectState.exception()}",
-              ChannelState.State.AUTHENTICATE, disconnectState.state())
-          selector.isChannelReady(nodeId) || disconnectState != null
-        }, "Client not ready or disconnected within timeout")
-        if (selector.isChannelReady(nodeId))
+        val isReady = pollUntilReadyOrDisconnected(selector, nodeId)
+        if (isReady)
           actualSuccessfulAuths += 1
         selector.close(nodeId)
       }
@@ -167,6 +191,22 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
     }
   }
 
+  private def pollUntilReadyOrDisconnected(selector: Selector, nodeId: String): Boolean = {
+    TestUtils.waitUntilTrue(() => {
+      selector.poll(100)
+      val disconnectState = selector.disconnected().get(nodeId)
+      // Verify that disconnect state is not AUTHENTICATION_FAILED
+      if (disconnectState != null) {
+        assertEquals(s"Authentication failed with exception ${disconnectState.exception()}",
+          ChannelState.State.AUTHENTICATE, disconnectState.state())
+      }
+      selector.isChannelReady(nodeId) || disconnectState != null
+    }, "Client not ready or disconnected within timeout")
+    val isReady = selector.isChannelReady(nodeId)
+    selector.close(nodeId)
+    isReady
+  }
+
   /**
    * Verifies that authentication with the current `clientConfig` results in disconnection and that
    * the disconnection is notified with disconnect state `AUTHENTICATION_FAILED`. This is to ensure
@@ -192,4 +232,45 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
       time, true, new LogContext())
     NetworkTestUtils.createSelector(channelBuilder, time)
   }
+
+  private def createSelectorWithRelogin(): Selector = {
+    clientConfig.setProperty(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "0")
+    val config = new TestSecurityConfig(clientConfig)
+    val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values()))
+    val channelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol,
+      null, false, kafkaClientSaslMechanism, true, null, null, time, new LogContext()) {
+      override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin]
+    }
+    channelBuilder.configure(config.values())
+    NetworkTestUtils.createSelector(channelBuilder, time)
+  }
+}
+
+object TestableKerberosLogin {
+  @volatile var instance: TestableKerberosLogin = _
+  def reset(): Unit = {
+    instance = null
+  }
+}
+
+class TestableKerberosLogin extends KerberosLogin {
+  val logoutResumeLatch = new CountDownLatch(1)
+  val logoutCompleteLatch = new CountDownLatch(1)
+  val reLoginResumeLatch = new CountDownLatch(1)
+  val reLoginCompleteLatch = new CountDownLatch(1)
+
+  assertNull(TestableKerberosLogin.instance)
+  TestableKerberosLogin.instance = this
+
+  override def reLogin(): Unit = {
+    super.reLogin()
+    reLoginCompleteLatch.countDown()
+  }
+
+  override protected def logout(): Unit = {
+    logoutResumeLatch.await(15, TimeUnit.SECONDS)
+    super.logout()
+    logoutCompleteLatch.countDown()
+    reLoginResumeLatch.await(15, TimeUnit.SECONDS)
+  }
 }