You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/11/23 09:05:29 UTC

[kafka] branch trunk updated: KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients (#9605)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed8659b  KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients (#9605)
ed8659b is described below

commit ed8659b4a09a4affff6798b8077ed2d8fb94b6da
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Nov 23 09:04:16 2020 +0000

    KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients (#9605)
    
    We use a background thread for Kerberos to perform re-login before tickets expire. The thread performs logout() followed by login(), relying on the Java library to clear and then populate credentials in Subject. This leaves a timing window where clients fail to authenticate because credentials are not available. We cannot introduce any form of locking since authentication is performed on the network thread. So this commit treats NO_CRED as a transient failure rather than a fatal authe [...]
    
    Reviewers: Ron Dagostino <rd...@confluent.io>, Manikumar Reddy <ma...@gmail.com>
---
 .../kafka/common/network/SaslChannelBuilder.java   |   2 +-
 .../authenticator/SaslClientAuthenticator.java     |   8 +-
 .../common/security/kerberos/KerberosError.java    |  19 ++++
 .../common/security/kerberos/KerberosLogin.java    |   9 +-
 .../kafka/server/GssapiAuthenticationTest.scala    | 107 ++++++++++++++++++---
 5 files changed, 126 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index a29148a..f01c4ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -327,7 +327,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
         }
     }
 
-    private Class<? extends Login> defaultLoginClass() {
+    protected Class<? extends Login> defaultLoginClass() {
         if (jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM))
             return KerberosLogin.class;
         if (OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(clientSaslMechanism))
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 9c99dab..00a4bfc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -539,15 +539,17 @@ public class SaslClientAuthenticator implements Authenticator {
                     " Users must configure FQDN of kafka brokers when authenticating using SASL and" +
                     " `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
             }
-            error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
             //Unwrap the SaslException inside `PrivilegedActionException`
             Throwable cause = e.getCause();
             // Treat transient Kerberos errors as non-fatal SaslExceptions that are processed as I/O exceptions
             // and all other failures as fatal SaslAuthenticationException.
-            if (kerberosError != null && kerberosError.retriable())
+            if ((kerberosError != null && kerberosError.retriable()) || (kerberosError == null && KerberosError.isRetriableClientGssException(e))) {
+                error += " Kafka Client will retry.";
                 throw new SaslException(error, cause);
-            else
+            } else {
+                error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
                 throw new SaslAuthenticationException(error, cause);
+            }
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
index 9c76482..4b8e8e0 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.kerberos;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.utils.Java;
+import org.ietf.jgss.GSSException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,4 +110,22 @@ public enum KerberosError {
         }
         return null;
     }
+
+    /**
+     * Returns true if the exception should be handled as a transient failure on clients.
+     * We handle GSSException.NO_CRED as retriable on the client-side since this may
+     * occur during re-login if a clients attempts to authentication after logout, but
+     * before the subsequent login.
+     */
+    public static boolean isRetriableClientGssException(Exception exception) {
+        Throwable cause = exception.getCause();
+        while (cause != null && !(cause instanceof GSSException)) {
+            cause = cause.getCause();
+        }
+        if (cause != null) {
+            GSSException gssException = (GSSException) cause;
+            return gssException.getMajor() == GSSException.NO_CRED;
+        }
+        return false;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 5efd722..9626da8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -346,7 +346,7 @@ public class KerberosLogin extends AbstractLogin {
      * Re-login a principal. This method assumes that {@link #login()} has happened already.
      * @throws javax.security.auth.login.LoginException on a failure
      */
-    private void reLogin() throws LoginException {
+    protected void reLogin() throws LoginException {
         if (!isKrbTicket) {
             return;
         }
@@ -363,7 +363,7 @@ public class KerberosLogin extends AbstractLogin {
             //clear up the kerberos state. But the tokens are not cleared! As per
             //the Java kerberos login module code, only the kerberos credentials
             //are cleared
-            loginContext.logout();
+            logout();
             //login and also update the subject field of this instance to
             //have the new credentials (pass it to the LoginContext constructor)
             loginContext = new LoginContext(contextName(), subject, null, configuration());
@@ -372,6 +372,11 @@ public class KerberosLogin extends AbstractLogin {
         }
     }
 
+    // Visibility to override for testing
+    protected void logout() throws LoginException {
+        loginContext.logout();
+    }
+
     private long currentElapsedTime() {
         return time.hiResClockMs();
     }
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index d6414c0..fa21a94 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -20,8 +20,8 @@ package kafka.server
 
 import java.net.InetSocketAddress
 import java.time.Duration
-import java.util.Properties
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.{Collections, Properties}
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 
 import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
 import kafka.utils.TestUtils
@@ -31,7 +31,8 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.SaslAuthenticationException
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
+import org.apache.kafka.common.security.kerberos.KerberosLogin
 import org.apache.kafka.common.utils.{LogContext, MockTime}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -57,6 +58,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
 
   @Before
   override def setUp(): Unit = {
+    TestableKerberosLogin.reset()
     startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
     serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
     serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp, failedAuthenticationDelayMs.toString)
@@ -78,6 +80,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
     executor.shutdownNow()
     super.tearDown()
     closeSasl()
+    TestableKerberosLogin.reset()
   }
 
   /**
@@ -97,6 +100,35 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
   }
 
   /**
+   * Verifies that there are no authentication failures during Kerberos re-login. If authentication
+   * is performed when credentials are unavailable between logout and login, we handle it as a
+   * transient error and not an authentication failure so that clients may retry.
+   */
+  @Test
+  def testReLogin(): Unit = {
+    val selector = createSelectorWithRelogin()
+    try {
+      val login = TestableKerberosLogin.instance
+      assertNotNull(login)
+      executor.submit(() => login.reLogin(), 0)
+
+      val node1 = "1"
+      selector.connect(node1, serverAddr, 1024, 1024)
+      login.logoutResumeLatch.countDown()
+      login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
+      assertFalse("Authenticated during re-login", pollUntilReadyOrDisconnected(selector, node1))
+
+      login.reLoginResumeLatch.countDown()
+      login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
+      val node2 = "2"
+      selector.connect(node2, serverAddr, 1024, 1024)
+      assertTrue("Authenticated failed after re-login", pollUntilReadyOrDisconnected(selector, node2))
+    } finally {
+      selector.close()
+    }
+  }
+
+  /**
    * Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
    * as a fatal authentication failure.
    */
@@ -149,16 +181,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
       while (actualSuccessfulAuths < numSuccessfulAuths) {
         val nodeId = actualSuccessfulAuths.toString
         selector.connect(nodeId, serverAddr, 1024, 1024)
-        TestUtils.waitUntilTrue(() => {
-          selector.poll(100)
-          val disconnectState = selector.disconnected().get(nodeId)
-          // Verify that disconnect state is not AUTHENTICATION_FAILED
-          if (disconnectState != null)
-            assertEquals(s"Authentication failed with exception ${disconnectState.exception()}",
-              ChannelState.State.AUTHENTICATE, disconnectState.state())
-          selector.isChannelReady(nodeId) || disconnectState != null
-        }, "Client not ready or disconnected within timeout")
-        if (selector.isChannelReady(nodeId))
+        val isReady = pollUntilReadyOrDisconnected(selector, nodeId)
+        if (isReady)
           actualSuccessfulAuths += 1
         selector.close(nodeId)
       }
@@ -167,6 +191,22 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
     }
   }
 
+  private def pollUntilReadyOrDisconnected(selector: Selector, nodeId: String): Boolean = {
+    TestUtils.waitUntilTrue(() => {
+      selector.poll(100)
+      val disconnectState = selector.disconnected().get(nodeId)
+      // Verify that disconnect state is not AUTHENTICATION_FAILED
+      if (disconnectState != null) {
+        assertEquals(s"Authentication failed with exception ${disconnectState.exception()}",
+          ChannelState.State.AUTHENTICATE, disconnectState.state())
+      }
+      selector.isChannelReady(nodeId) || disconnectState != null
+    }, "Client not ready or disconnected within timeout")
+    val isReady = selector.isChannelReady(nodeId)
+    selector.close(nodeId)
+    isReady
+  }
+
   /**
    * Verifies that authentication with the current `clientConfig` results in disconnection and that
    * the disconnection is notified with disconnect state `AUTHENTICATION_FAILED`. This is to ensure
@@ -192,4 +232,45 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
       time, true, new LogContext())
     NetworkTestUtils.createSelector(channelBuilder, time)
   }
+
+  private def createSelectorWithRelogin(): Selector = {
+    clientConfig.setProperty(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "0")
+    val config = new TestSecurityConfig(clientConfig)
+    val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values()))
+    val channelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol,
+      null, false, kafkaClientSaslMechanism, true, null, null, time, new LogContext()) {
+      override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin]
+    }
+    channelBuilder.configure(config.values())
+    NetworkTestUtils.createSelector(channelBuilder, time)
+  }
+}
+
+object TestableKerberosLogin {
+  @volatile var instance: TestableKerberosLogin = _
+  def reset(): Unit = {
+    instance = null
+  }
+}
+
+class TestableKerberosLogin extends KerberosLogin {
+  val logoutResumeLatch = new CountDownLatch(1)
+  val logoutCompleteLatch = new CountDownLatch(1)
+  val reLoginResumeLatch = new CountDownLatch(1)
+  val reLoginCompleteLatch = new CountDownLatch(1)
+
+  assertNull(TestableKerberosLogin.instance)
+  TestableKerberosLogin.instance = this
+
+  override def reLogin(): Unit = {
+    super.reLogin()
+    reLoginCompleteLatch.countDown()
+  }
+
+  override protected def logout(): Unit = {
+    logoutResumeLatch.await(15, TimeUnit.SECONDS)
+    super.logout()
+    logoutCompleteLatch.countDown()
+    reLoginResumeLatch.await(15, TimeUnit.SECONDS)
+  }
 }