You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/04 21:49:29 UTC

kafka git commit: KAFKA-3051 KAFKA-3048; Security config docs improvements

Repository: kafka
Updated Branches:
  refs/heads/trunk b93f48f74 -> 57df460f8


KAFKA-3051 KAFKA-3048; Security config docs improvements

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #724 from ijuma/minor-security-fixes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57df460f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57df460f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57df460f

Branch: refs/heads/trunk
Commit: 57df460f8d7b225509dd8c061a5b6efa65c8ac9c
Parents: b93f48f
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Jan 4 12:49:23 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 4 12:49:23 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientUtils.java   |  3 +-
 .../kafka/clients/CommonClientConfigs.java      | 16 ++++++-
 .../apache/kafka/common/config/SaslConfigs.java |  5 +--
 .../apache/kafka/common/config/SslConfigs.java  | 13 +++---
 .../kafka/common/network/KafkaChannel.java      | 18 +++-----
 .../kafka/common/protocol/SecurityProtocol.java | 47 +++++++++++++++-----
 .../kafka/common/network/SslSelectorTest.java   |  2 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  6 ++-
 8 files changed, 69 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 41e5d74..b614198 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 
-
 public class ClientUtils {
     private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
 
@@ -75,7 +74,7 @@ public class ClientUtils {
      */
     public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
         SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
-        if (securityProtocol == SecurityProtocol.TRACE)
+        if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
             throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
         return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 48e4919..2c5e67c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -13,6 +13,12 @@
 
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Some configurations shared by both producer and consumer
  */
@@ -56,7 +62,7 @@ public class CommonClientConfigs {
     public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
 
     public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
-    public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.";
+    public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + Utils.mkString(nonTestingSecurityProtocolNames(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
@@ -67,4 +73,12 @@ public class CommonClientConfigs {
                                                          + "for the response of a request. If the response is not received before the timeout "
                                                          + "elapses the client will resend the request if necessary or fail the request if "
                                                          + "retries are exhausted.";
+
+    private static List<String> nonTestingSecurityProtocolNames() {
+        List<String> names = new ArrayList<>();
+        for (SecurityProtocol protocol : SecurityProtocol.nonTestingValues())
+            names.add(protocol.name);
+        return names;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 78ae06a..ef29743 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -26,8 +26,7 @@ public class SaslConfigs {
         + "This can be defined either in Kafka's JAAS config or in Kafka's config.";
 
     public static final String SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
-    public static final String SASL_KERBEROS_KINIT_CMD_DOC = "Kerberos kinit command path. "
-        + "Default is /usr/bin/kinit";
+    public static final String SASL_KERBEROS_KINIT_CMD_DOC = "Kerberos kinit command path.";
     public static final String DEFAULT_KERBEROS_KINIT_CMD = "/usr/bin/kinit";
 
     public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = "sasl.kerberos.ticket.renew.window.factor";
@@ -46,7 +45,7 @@ public class SaslConfigs {
     public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = "sasl.kerberos.principal.to.local.rules";
     public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal names to short names (typically operating system usernames). " +
             "The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. " +
-            "By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.";
+            "By default, principal names of the form {username}/{hostname}@{REALM} are mapped to {username}.";
     public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
 
     public static void addClientSaslSupport(ConfigDef config) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index a893b75..1ccd039 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -23,8 +23,7 @@ public class SslConfigs {
 
     public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
     public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the PrincipalBuilder interface, " +
-            "which is currently used to build the Principal for connections with the SSL SecurityProtocol. " +
-            "Default is DefaultPrincipalBuilder.";
+            "which is currently used to build the Principal for connections with the SSL SecurityProtocol.";
     public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
 
     public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
@@ -43,13 +42,12 @@ public class SslConfigs {
             + "By default all the available cipher suites are supported.";
 
     public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
-    public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
-            + "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.";
+    public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections.";
     public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
 
     public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
     public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
-            + "This is optional for client. Default value is JKS";
+            + "This is optional for client.";
     public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
 
     public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
@@ -65,8 +63,7 @@ public class SslConfigs {
             + "This is optional for client.";
 
     public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
-    public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. "
-            + "Default value is JKS.";
+    public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file.";
     public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
 
     public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
@@ -92,7 +89,7 @@ public class SslConfigs {
     public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
                                            + " The following settings are common: "
                                            + " <ul>"
-                                           + " <li><code>ssl.want.client.auth=required</code> If set to required"
+                                           + " <li><code>ssl.client.auth=required</code> If set to required"
                                            + " client authentication is required."
                                            + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional."
                                            + " unlike requested , if this option is set client can choose not to provide authentication information about itself"

http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index ac436c3..defcc24 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -26,18 +26,13 @@ import java.nio.channels.SelectionKey;
 
 import java.security.Principal;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
 public class KafkaChannel {
-    private static final Logger log = LoggerFactory.getLogger(KafkaChannel.class);
     private final String id;
-    private TransportLayer transportLayer;
-    private Authenticator authenticator;
+    private final TransportLayer transportLayer;
+    private final Authenticator authenticator;
+    private final int maxReceiveSize;
     private NetworkReceive receive;
     private Send send;
-    private int maxReceiveSize;
 
     public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
         this.id = id;
@@ -52,9 +47,7 @@ public class KafkaChannel {
     }
 
     /**
-     * returns user principal for the session
-     * In case of PLAINTEXT and No Authentication returns ANONYMOUS as the userPrincipal
-     * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal
+     * Returns the principal returned by `authenticator.principal()`.
      */
     public Principal principal() throws IOException {
         return authenticator.principal();
@@ -157,8 +150,7 @@ public class KafkaChannel {
     }
 
     private long receive(NetworkReceive receive) throws IOException {
-        long result = receive.readFrom(transportLayer);
-        return result;
+        return receive.readFrom(transportLayer);
     }
 
     private boolean send(Send send) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index 70130a4..cbd0c42 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -17,30 +17,43 @@
 package org.apache.kafka.common.protocol;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public enum SecurityProtocol {
     /** Un-authenticated, non-encrypted channel */
-    PLAINTEXT(0, "PLAINTEXT"),
+    PLAINTEXT(0, "PLAINTEXT", false),
     /** SSL channel */
-    SSL(1, "SSL"),
+    SSL(1, "SSL", false),
     /** SASL authenticated, non-encrypted channel */
-    SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),
+    SASL_PLAINTEXT(2, "SASL_PLAINTEXT", false),
     /** SASL authenticated, SSL channel */
-    SASL_SSL(3, "SASL_SSL"),
+    SASL_SSL(3, "SASL_SSL", false),
     /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
-    TRACE(Short.MAX_VALUE, "TRACE");
+    TRACE(Short.MAX_VALUE, "TRACE", true);
 
-    private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL = new HashMap<Short, SecurityProtocol>();
-    private static final List<String> NAMES = new ArrayList<String>();
+    private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL;
+    private static final List<String> NAMES;
+    private static final Set<SecurityProtocol> NON_TESTING_VALUES;
 
     static {
-        for (SecurityProtocol proto: SecurityProtocol.values()) {
-            CODE_TO_SECURITY_PROTOCOL.put(proto.id, proto);
-            NAMES.add(proto.name);
+        SecurityProtocol[] protocols = SecurityProtocol.values();
+        List<String> names = new ArrayList<>(protocols.length);
+        Map<Short, SecurityProtocol> codeToSecurityProtocol = new HashMap<>(protocols.length);
+        Set<SecurityProtocol> nonTestingValues = EnumSet.noneOf(SecurityProtocol.class);
+        for (SecurityProtocol proto : protocols) {
+            codeToSecurityProtocol.put(proto.id, proto);
+            names.add(proto.name);
+            if (!proto.isTesting)
+                nonTestingValues.add(proto);
         }
+        CODE_TO_SECURITY_PROTOCOL = Collections.unmodifiableMap(codeToSecurityProtocol);
+        NAMES = Collections.unmodifiableList(names);
+        NON_TESTING_VALUES = Collections.unmodifiableSet(nonTestingValues);
     }
 
     /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol  */
@@ -49,9 +62,13 @@ public enum SecurityProtocol {
     /** Name of the security protocol. This may be used by client configuration. */
     public final String name;
 
-    private SecurityProtocol(int id, String name) {
+    /* Whether this security protocol is for testing/debugging */
+    private final boolean isTesting;
+
+    private SecurityProtocol(int id, String name, boolean isTesting) {
         this.id = (short) id;
         this.name = name;
+        this.isTesting = isTesting;
     }
 
     public static String getName(int id) {
@@ -66,4 +83,12 @@ public enum SecurityProtocol {
         return CODE_TO_SECURITY_PROTOCOL.get(id);
     }
 
+    /**
+     * Returns the set of non-testing SecurityProtocol instances, that is, SecurityProtocol instances that are suitable
+     * for production usage.
+     */
+    public static Set<SecurityProtocol> nonTestingValues() {
+        return NON_TESTING_VALUES;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index bbc8fe1..2c098ea 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -157,7 +157,7 @@ public class SslSelectorTest extends SelectorTest {
     }
 
     /**
-     * Connects and waits for handshake to complete. This is required since SSLTransportLayer
+     * Connects and waits for handshake to complete. This is required since SslTransportLayer
      * implementation requires the channel to be ready before send is invoked (unlike plaintext
      * where send can be invoked straight after connect)
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9556799..0cb11ab 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -32,7 +32,8 @@ import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
 
-import scala.collection.{Map, immutable}
+import scala.collection.{Map, immutable, JavaConverters}
+import JavaConverters._
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -435,7 +436,8 @@ object KafkaConfig {
   val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage."
   val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller"
   val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss"
-  val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Defaults to plain text."
+  val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Valid values are: " +
+    s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", ")}."
   val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" +
   " This is typically bumped after all brokers were upgraded to a new version.\n" +
   " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list."