You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/23 18:22:01 UTC

kafka git commit: MINOR: SaslChannelBuilder should be idempotent

Repository: kafka
Updated Branches:
  refs/heads/trunk b49036873 -> d65844b52


MINOR: SaslChannelBuilder should be idempotent

After we call `release`, we should null out the reference so
that we neither use it or release it a second time.

This should fix the following exception that has been reported:

```text
[2017-06-23 03:24:02,485] ERROR stream-thread [...] Failed to close consumer:  (org.apache.kafka.streams.processor.internals.StreamThread:1054)
org.apache.kafka.common.KafkaException: Failed to close kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1623)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1573)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1549)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:1052)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:538)
Caused by: java.lang.IllegalStateException: release called on LoginManager with refCount == 0
        at org.apache.kafka.common.security.authenticator.LoginManager.release(LoginManager.java:106)
        at org.apache.kafka.common.network.SaslChannelBuilder.close(SaslChannelBuilder.java:125)
        at org.apache.kafka.common.network.Selector.close(Selector.java:257)
        at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:505)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.close(ConsumerNetworkClient.java:439)
        at org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1613)
```

It's worth noting that it's not clear how `SaslChannelBuilder.close()` is called more than
once and it would be good to understand that as well.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Damian Guy <da...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #3422 from ijuma/sasl-channel-builder-idempotent


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d65844b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d65844b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d65844b5

Branch: refs/heads/trunk
Commit: d65844b52723c7a093d969c4b2822e3b36389b70
Parents: b490368
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Jun 23 11:19:14 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jun 23 11:19:14 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/admin/KafkaAdminClient.java   |  4 +-
 .../common/network/SaslChannelBuilder.java      | 12 ++-
 .../security/authenticator/LoginManager.java    | 16 +++-
 .../common/network/SaslChannelBuilderTest.java  | 78 ++++++++++++++++++++
 4 files changed, 104 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d65844b5/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 881f8d2..2129059 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -272,7 +272,6 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {
-        Metadata metadata = null;
         Metrics metrics = null;
         NetworkClient networkClient = null;
         Time time = Time.SYSTEM;
@@ -284,7 +283,7 @@ public class KafkaAdminClient extends AdminClient {
         try {
             // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
             // simplifies communication with older brokers)
-            metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+            Metadata metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                     config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true);
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
@@ -312,7 +311,6 @@ public class KafkaAdminClient extends AdminClient {
                 time,
                 true,
                 apiVersions);
-            channelBuilder = null;
             return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient,
                 timeoutProcessorFactory);
         } catch (Throwable exc) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d65844b5/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 9fe200f..33a9f4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -95,6 +95,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
                 this.sslFactory.configure(configs);
             }
         } catch (Exception e) {
+            close();
             throw new KafkaException(e);
         }
     }
@@ -121,8 +122,10 @@ public class SaslChannelBuilder implements ChannelBuilder {
     }
 
     public void close()  {
-        if (this.loginManager != null)
-            this.loginManager.release();
+        if (loginManager != null) {
+            loginManager.release();
+            loginManager = null;
+        }
     }
 
     protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
@@ -134,6 +137,11 @@ public class SaslChannelBuilder implements ChannelBuilder {
         }
     }
 
+    // Package private for testing
+    LoginManager loginManager() {
+        return loginManager;
+    }
+
     private static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException,
             IllegalArgumentException, IllegalAccessException, InvocationTargetException {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d65844b5/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 1730d66..66d5e3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -29,9 +29,13 @@ import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.Login;
 import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LoginManager {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(LoginManager.class);
+
     // static configs (broker or client)
     private static final Map<String, LoginManager> STATIC_INSTANCES = new HashMap<>();
 
@@ -94,6 +98,7 @@ public class LoginManager {
 
     private LoginManager acquire() {
         ++refCount;
+        LOGGER.trace("{} acquired", this);
         return this;
     }
 
@@ -103,7 +108,7 @@ public class LoginManager {
     public void release() {
         synchronized (LoginManager.class) {
             if (refCount == 0)
-                throw new IllegalStateException("release called on LoginManager with refCount == 0");
+                throw new IllegalStateException("release() called on disposed " + this);
             else if (refCount == 1) {
                 if (cacheKey instanceof Password) {
                     DYNAMIC_INSTANCES.remove(cacheKey);
@@ -113,9 +118,18 @@ public class LoginManager {
                 login.close();
             }
             --refCount;
+            LOGGER.trace("{} released", this);
         }
     }
 
+    @Override
+    public String toString() {
+        return "LoginManager(serviceName=" + serviceName() +
+                // subject.toString() exposes private credentials, so we can't use it
+                ", publicCredentials=" + subject().getPublicCredentials() +
+                ", refCount=" + refCount + ')';
+    }
+
     /* Should only be used in tests. */
     public static void closeAll() {
         synchronized (LoginManager.class) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d65844b5/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
new file mode 100644
index 0000000..2f41c77
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.authenticator.TestJaasConfig;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+public class SaslChannelBuilderTest {
+
+    @Test
+    public void testCloseBeforeConfigureIsIdempotent() {
+        SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
+        builder.close();
+        assertNull(builder.loginManager());
+        builder.close();
+        assertNull(builder.loginManager());
+    }
+
+    @Test
+    public void testCloseAfterConfigIsIdempotent() {
+        SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
+        builder.configure(new HashMap<String, Object>());
+        assertNotNull(builder.loginManager());
+        builder.close();
+        assertNull(builder.loginManager());
+        builder.close();
+        assertNull(builder.loginManager());
+    }
+
+    @Test
+    public void testLoginManagerReleasedIfConfigureThrowsException() {
+        SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_SSL);
+        try {
+            // Use invalid config so that an exception is thrown
+            builder.configure(Collections.singletonMap(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "1"));
+            fail("Exception should have been thrown");
+        } catch (KafkaException e) {
+            assertNull(builder.loginManager());
+        }
+        builder.close();
+        assertNull(builder.loginManager());
+    }
+
+    private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) {
+        TestJaasConfig jaasConfig = new TestJaasConfig();
+        jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
+        JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
+        return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, "PLAIN",
+                true, null);
+    }
+
+}