You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/06 02:44:30 UTC

kafka git commit: KAFKA-5325; Avoid and handle exceptions for Kerberos re-login

Repository: kafka
Updated Branches:
  refs/heads/trunk 2bb48a4fa -> 9934d28a3


KAFKA-5325; Avoid and handle exceptions for Kerberos re-login

If producer creates a connection during Kerberos re-login (after logout,
before login), there are no principals in the subject and
`SaslClientAuthenticator.configure` may throw an exception while trying
to determine the principal. A socket channel is created and its key
registered with the selector, but the `RuntimeException` thrown leaves
the key registered with the selector without adding the channel to the
channel list. This results in an infinite loop of `NullPointerExceptions`.
The PR applies two fixes:

1. Convert the `RuntimeException` to a meaningful `KafkaException`
2. Handle any exception in `buildChannel`, cleanup and throw `IOException`.
Retries will take care of re-connections.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #3208 from rajinisivaram/KAFKA-5325


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9934d28a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9934d28a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9934d28a

Branch: refs/heads/trunk
Commit: 9934d28a34351a63738f8e9bac6b53fd699a4905
Parents: 2bb48a4
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Tue Jun 6 03:43:07 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 6 03:43:14 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/common/network/Selector.java   | 17 +++++++++-
 .../authenticator/SaslClientAuthenticator.java  | 35 ++++++++++++++++----
 .../authenticator/SaslServerAuthenticator.java  | 10 +++---
 .../authenticator/SaslAuthenticatorTest.java    | 10 ++++--
 4 files changed, 56 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9934d28a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8f85202..5dbe83b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -198,7 +198,17 @@ public class Selector implements Selectable, AutoCloseable {
             throw e;
         }
         SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
-        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        KafkaChannel channel;
+        try {
+            channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        } catch (Exception e) {
+            try {
+                socketChannel.close();
+            } finally {
+                key.cancel();
+            }
+            throw new IOException("Channel could not be created for socket " + socketChannel, e);
+        }
         key.attach(channel);
         this.channels.put(id, channel);
 
@@ -682,6 +692,11 @@ public class Selector implements Selectable, AutoCloseable {
         this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
     }
 
+    // only for testing
+    public Set<SelectionKey> keys() {
+        return new HashSet<>(nioSelector.keys());
+    }
+
     private class SelectorMetrics {
         private final Metrics metrics;
         private final String metricGrpPrefix;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9934d28a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 149148e..6dab8f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.authenticator;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
@@ -52,7 +53,9 @@ import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 public class SaslClientAuthenticator implements Authenticator {
 
@@ -106,13 +109,14 @@ public class SaslClientAuthenticator implements Authenticator {
 
             setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
 
-            // determine client principal from subject.
-            if (!subject.getPrincipals().isEmpty()) {
-                Principal clientPrincipal = subject.getPrincipals().iterator().next();
-                this.clientPrincipalName = clientPrincipal.getName();
-            } else {
-                clientPrincipalName = null;
-            }
+            // determine client principal from subject for Kerberos to use as authorization id for the SaslClient.
+            // For other mechanisms, the authenticated principal (username for PLAIN and SCRAM) is used as
+            // authorization id. Hence the principal is not specified for creating the SaslClient.
+            if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM))
+                this.clientPrincipalName = firstPrincipal(subject);
+            else
+                this.clientPrincipalName = null;
+
             callbackHandler = new SaslClientCallbackHandler();
             callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism);
 
@@ -337,4 +341,21 @@ public class SaslClientAuthenticator implements Authenticator {
                     response.error(), mechanism, response.enabledMechanisms()));
         }
     }
+
+    /**
+     * Returns the first Principal from Subject.
+     * @throws KafkaException if there are no Principals in the Subject.
+     *     During Kerberos re-login, principal is reset on Subject. An exception is
+     *     thrown so that the connection is retried after any configured backoff.
+     */
+    static final String firstPrincipal(Subject subject) {
+        Set<Principal> principals = subject.getPrincipals();
+        synchronized (principals) {
+            Iterator<Principal> iterator = principals.iterator();
+            if (iterator.hasNext())
+                return iterator.next().getName();
+            else
+                throw new KafkaException("Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9934d28a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index c249b68..62b4039 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -133,9 +133,7 @@ public class SaslServerAuthenticator implements Authenticator {
             callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class));
         callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
         if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
-            if (subject.getPrincipals().isEmpty())
-                throw new IllegalArgumentException("subject must have at least one principal");
-            saslServer = createSaslKerberosServer(callbackHandler, configs);
+            saslServer = createSaslKerberosServer(callbackHandler, configs, subject);
         } else {
             try {
                 saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
@@ -149,12 +147,12 @@ public class SaslServerAuthenticator implements Authenticator {
         }
     }
 
-    private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs) throws IOException {
+    private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs, Subject subject) throws IOException {
         // server is using a JAAS-authenticated subject: determine service principal name and hostname from kafka server's subject.
-        final Principal servicePrincipal = subject.getPrincipals().iterator().next();
+        final String servicePrincipal = SaslClientAuthenticator.firstPrincipal(subject);
         KerberosName kerberosName;
         try {
-            kerberosName = KerberosName.parse(servicePrincipal.getName());
+            kerberosName = KerberosName.parse(servicePrincipal);
         } catch (IllegalArgumentException e) {
             throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9934d28a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 631ae08..77cbdbe 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -56,6 +56,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -68,6 +69,8 @@ import java.util.Random;
 import javax.security.auth.login.Configuration;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -171,8 +174,11 @@ public class SaslAuthenticatorTest {
         try {
             selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
             fail("SASL/PLAIN channel created without username");
-        } catch (KafkaException e) {
+        } catch (IOException e) {
             // Expected exception
+            assertTrue("Channels not closed", selector.channels().isEmpty());
+            for (SelectionKey key : selector.keys())
+                assertFalse("Key not cancelled", key.isValid());
         }
     }
 
@@ -192,7 +198,7 @@ public class SaslAuthenticatorTest {
         try {
             selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
             fail("SASL/PLAIN channel created without password");
-        } catch (KafkaException e) {
+        } catch (IOException e) {
             // Expected exception
         }
     }