You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/06 02:44:30 UTC
kafka git commit: KAFKA-5325;
Avoid and handle exceptions for Kerberos re-login
Repository: kafka
Updated Branches:
refs/heads/trunk 2bb48a4fa -> 9934d28a3
KAFKA-5325; Avoid and handle exceptions for Kerberos re-login
If producer creates a connection during Kerberos re-login (after logout,
before login), there are no principals in the subject and
`SaslClientAuthenticator.configure` may throw an exception while trying
to determine the principal. A socket channel is created and its key
registered with the selector, but the `RuntimeException` thrown leaves
the key registered with the selector without adding the channel to the
channel list. This results in an infinite loop of `NullPointerExceptions`.
The PR applies two fixes:
1. Convert the `RuntimeException` to a meaningful `KafkaException`
2. Handle any exception in `buildChannel`, cleanup and throw `IOException`.
Retries will take care of re-connections.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3208 from rajinisivaram/KAFKA-5325
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9934d28a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9934d28a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9934d28a
Branch: refs/heads/trunk
Commit: 9934d28a34351a63738f8e9bac6b53fd699a4905
Parents: 2bb48a4
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Tue Jun 6 03:43:07 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 6 03:43:14 2017 +0100
----------------------------------------------------------------------
.../apache/kafka/common/network/Selector.java | 17 +++++++++-
.../authenticator/SaslClientAuthenticator.java | 35 ++++++++++++++++----
.../authenticator/SaslServerAuthenticator.java | 10 +++---
.../authenticator/SaslAuthenticatorTest.java | 10 ++++--
4 files changed, 56 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9934d28a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8f85202..5dbe83b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -198,7 +198,17 @@ public class Selector implements Selectable, AutoCloseable {
throw e;
}
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
- KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+ KafkaChannel channel;
+ try {
+ channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+ } catch (Exception e) {
+ try {
+ socketChannel.close();
+ } finally {
+ key.cancel();
+ }
+ throw new IOException("Channel could not be created for socket " + socketChannel, e);
+ }
key.attach(channel);
this.channels.put(id, channel);
@@ -682,6 +692,11 @@ public class Selector implements Selectable, AutoCloseable {
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}
+ // only for testing
+ public Set<SelectionKey> keys() {
+ return new HashSet<>(nioSelector.keys());
+ }
+
private class SelectorMetrics {
private final Metrics metrics;
private final String metricGrpPrefix;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9934d28a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 149148e..6dab8f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
@@ -52,7 +53,9 @@ import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
public class SaslClientAuthenticator implements Authenticator {
@@ -106,13 +109,14 @@ public class SaslClientAuthenticator implements Authenticator {
setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
- // determine client principal from subject.
- if (!subject.getPrincipals().isEmpty()) {
- Principal clientPrincipal = subject.getPrincipals().iterator().next();
- this.clientPrincipalName = clientPrincipal.getName();
- } else {
- clientPrincipalName = null;
- }
+ // determine client principal from subject for Kerberos to use as authorization id for the SaslClient.
+ // For other mechanisms, the authenticated principal (username for PLAIN and SCRAM) is used as
+ // authorization id. Hence the principal is not specified for creating the SaslClient.
+ if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM))
+ this.clientPrincipalName = firstPrincipal(subject);
+ else
+ this.clientPrincipalName = null;
+
callbackHandler = new SaslClientCallbackHandler();
callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism);
@@ -337,4 +341,21 @@ public class SaslClientAuthenticator implements Authenticator {
response.error(), mechanism, response.enabledMechanisms()));
}
}
+
+ /**
+ * Returns the first Principal from Subject.
+ * @throws KafkaException if there are no Principals in the Subject.
+ * During Kerberos re-login, principal is reset on Subject. An exception is
+ * thrown so that the connection is retried after any configured backoff.
+ */
+ static final String firstPrincipal(Subject subject) {
+ Set<Principal> principals = subject.getPrincipals();
+ synchronized (principals) {
+ Iterator<Principal> iterator = principals.iterator();
+ if (iterator.hasNext())
+ return iterator.next().getName();
+ else
+ throw new KafkaException("Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9934d28a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index c249b68..62b4039 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -133,9 +133,7 @@ public class SaslServerAuthenticator implements Authenticator {
callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class));
callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
- if (subject.getPrincipals().isEmpty())
- throw new IllegalArgumentException("subject must have at least one principal");
- saslServer = createSaslKerberosServer(callbackHandler, configs);
+ saslServer = createSaslKerberosServer(callbackHandler, configs, subject);
} else {
try {
saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
@@ -149,12 +147,12 @@ public class SaslServerAuthenticator implements Authenticator {
}
}
- private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs) throws IOException {
+ private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs, Subject subject) throws IOException {
// server is using a JAAS-authenticated subject: determine service principal name and hostname from kafka server's subject.
- final Principal servicePrincipal = subject.getPrincipals().iterator().next();
+ final String servicePrincipal = SaslClientAuthenticator.firstPrincipal(subject);
KerberosName kerberosName;
try {
- kerberosName = KerberosName.parse(servicePrincipal.getName());
+ kerberosName = KerberosName.parse(servicePrincipal);
} catch (IllegalArgumentException e) {
throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9934d28a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 631ae08..77cbdbe 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -56,6 +56,7 @@ import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -68,6 +69,8 @@ import java.util.Random;
import javax.security.auth.login.Configuration;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -171,8 +174,11 @@ public class SaslAuthenticatorTest {
try {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
fail("SASL/PLAIN channel created without username");
- } catch (KafkaException e) {
+ } catch (IOException e) {
// Expected exception
+ assertTrue("Channels not closed", selector.channels().isEmpty());
+ for (SelectionKey key : selector.keys())
+ assertFalse("Key not cancelled", key.isValid());
}
}
@@ -192,7 +198,7 @@ public class SaslAuthenticatorTest {
try {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
fail("SASL/PLAIN channel created without password");
- } catch (KafkaException e) {
+ } catch (IOException e) {
// Expected exception
}
}