You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/23 18:22:01 UTC
kafka git commit: MINOR: SaslChannelBuilder should be idempotent
Repository: kafka
Updated Branches:
refs/heads/trunk b49036873 -> d65844b52
MINOR: SaslChannelBuilder should be idempotent
After we call `release`, we should null out the reference so
that we neither use it or release it a second time.
This should fix the following exception that has been reported:
```text
[2017-06-23 03:24:02,485] ERROR stream-thread [...] Failed to close consumer: (org.apache.kafka.streams.processor.internals.StreamThread:1054)
org.apache.kafka.common.KafkaException: Failed to close kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1623)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1573)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1549)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:1052)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:538)
Caused by: java.lang.IllegalStateException: release called on LoginManager with refCount == 0
at org.apache.kafka.common.security.authenticator.LoginManager.release(LoginManager.java:106)
at org.apache.kafka.common.network.SaslChannelBuilder.close(SaslChannelBuilder.java:125)
at org.apache.kafka.common.network.Selector.close(Selector.java:257)
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:505)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.close(ConsumerNetworkClient.java:439)
at org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1613)
```
It's worth noting that it's not clear how `SaslChannelBuilder.close()` is called more than
once and it would be good to understand that as well.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Damian Guy <da...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #3422 from ijuma/sasl-channel-builder-idempotent
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d65844b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d65844b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d65844b5
Branch: refs/heads/trunk
Commit: d65844b52723c7a093d969c4b2822e3b36389b70
Parents: b490368
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Jun 23 11:19:14 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jun 23 11:19:14 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/admin/KafkaAdminClient.java | 4 +-
.../common/network/SaslChannelBuilder.java | 12 ++-
.../security/authenticator/LoginManager.java | 16 +++-
.../common/network/SaslChannelBuilderTest.java | 78 ++++++++++++++++++++
4 files changed, 104 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d65844b5/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 881f8d2..2129059 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -272,7 +272,6 @@ public class KafkaAdminClient extends AdminClient {
}
static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {
- Metadata metadata = null;
Metrics metrics = null;
NetworkClient networkClient = null;
Time time = Time.SYSTEM;
@@ -284,7 +283,7 @@ public class KafkaAdminClient extends AdminClient {
try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
- metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+ Metadata metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true);
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
@@ -312,7 +311,6 @@ public class KafkaAdminClient extends AdminClient {
time,
true,
apiVersions);
- channelBuilder = null;
return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient,
timeoutProcessorFactory);
} catch (Throwable exc) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d65844b5/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 9fe200f..33a9f4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -95,6 +95,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
this.sslFactory.configure(configs);
}
} catch (Exception e) {
+ close();
throw new KafkaException(e);
}
}
@@ -121,8 +122,10 @@ public class SaslChannelBuilder implements ChannelBuilder {
}
public void close() {
- if (this.loginManager != null)
- this.loginManager.release();
+ if (loginManager != null) {
+ loginManager.release();
+ loginManager = null;
+ }
}
protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
@@ -134,6 +137,11 @@ public class SaslChannelBuilder implements ChannelBuilder {
}
}
+ // Package private for testing
+ LoginManager loginManager() {
+ return loginManager;
+ }
+
private static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException,
IllegalArgumentException, IllegalAccessException, InvocationTargetException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d65844b5/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 1730d66..66d5e3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -29,9 +29,13 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LoginManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoginManager.class);
+
// static configs (broker or client)
private static final Map<String, LoginManager> STATIC_INSTANCES = new HashMap<>();
@@ -94,6 +98,7 @@ public class LoginManager {
private LoginManager acquire() {
++refCount;
+ LOGGER.trace("{} acquired", this);
return this;
}
@@ -103,7 +108,7 @@ public class LoginManager {
public void release() {
synchronized (LoginManager.class) {
if (refCount == 0)
- throw new IllegalStateException("release called on LoginManager with refCount == 0");
+ throw new IllegalStateException("release() called on disposed " + this);
else if (refCount == 1) {
if (cacheKey instanceof Password) {
DYNAMIC_INSTANCES.remove(cacheKey);
@@ -113,9 +118,18 @@ public class LoginManager {
login.close();
}
--refCount;
+ LOGGER.trace("{} released", this);
}
}
+ @Override
+ public String toString() {
+ return "LoginManager(serviceName=" + serviceName() +
+ // subject.toString() exposes private credentials, so we can't use it
+ ", publicCredentials=" + subject().getPublicCredentials() +
+ ", refCount=" + refCount + ')';
+ }
+
/* Should only be used in tests. */
public static void closeAll() {
synchronized (LoginManager.class) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d65844b5/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
new file mode 100644
index 0000000..2f41c77
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.authenticator.TestJaasConfig;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+public class SaslChannelBuilderTest {
+
+ @Test
+ public void testCloseBeforeConfigureIsIdempotent() {
+ SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
+ builder.close();
+ assertNull(builder.loginManager());
+ builder.close();
+ assertNull(builder.loginManager());
+ }
+
+ @Test
+ public void testCloseAfterConfigIsIdempotent() {
+ SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
+ builder.configure(new HashMap<String, Object>());
+ assertNotNull(builder.loginManager());
+ builder.close();
+ assertNull(builder.loginManager());
+ builder.close();
+ assertNull(builder.loginManager());
+ }
+
+ @Test
+ public void testLoginManagerReleasedIfConfigureThrowsException() {
+ SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_SSL);
+ try {
+ // Use invalid config so that an exception is thrown
+ builder.configure(Collections.singletonMap(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "1"));
+ fail("Exception should have been thrown");
+ } catch (KafkaException e) {
+ assertNull(builder.loginManager());
+ }
+ builder.close();
+ assertNull(builder.loginManager());
+ }
+
+ private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) {
+ TestJaasConfig jaasConfig = new TestJaasConfig();
+ jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
+ JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
+ return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, "PLAIN",
+ true, null);
+ }
+
+}