You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/04 21:49:29 UTC
kafka git commit: KAFKA-3051 KAFKA-3048;
Security config docs improvements
Repository: kafka
Updated Branches:
refs/heads/trunk b93f48f74 -> 57df460f8
KAFKA-3051 KAFKA-3048; Security config docs improvements
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #724 from ijuma/minor-security-fixes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57df460f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57df460f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57df460f
Branch: refs/heads/trunk
Commit: 57df460f8d7b225509dd8c061a5b6efa65c8ac9c
Parents: b93f48f
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Jan 4 12:49:23 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 4 12:49:23 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/clients/ClientUtils.java | 3 +-
.../kafka/clients/CommonClientConfigs.java | 16 ++++++-
.../apache/kafka/common/config/SaslConfigs.java | 5 +--
.../apache/kafka/common/config/SslConfigs.java | 13 +++---
.../kafka/common/network/KafkaChannel.java | 18 +++-----
.../kafka/common/protocol/SecurityProtocol.java | 47 +++++++++++++++-----
.../kafka/common/network/SslSelectorTest.java | 2 +-
.../main/scala/kafka/server/KafkaConfig.scala | 6 ++-
8 files changed, 69 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 41e5d74..b614198 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
-
public class ClientUtils {
private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
@@ -75,7 +74,7 @@ public class ClientUtils {
*/
public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
- if (securityProtocol == SecurityProtocol.TRACE)
+ if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 48e4919..2c5e67c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -13,6 +13,12 @@
package org.apache.kafka.clients;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Some configurations shared by both producer and consumer
*/
@@ -56,7 +62,7 @@ public class CommonClientConfigs {
public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
- public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.";
+ public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + Utils.mkString(nonTestingSecurityProtocolNames(), ", ") + ".";
public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
@@ -67,4 +73,12 @@ public class CommonClientConfigs {
+ "for the response of a request. If the response is not received before the timeout "
+ "elapses the client will resend the request if necessary or fail the request if "
+ "retries are exhausted.";
+
+ private static List<String> nonTestingSecurityProtocolNames() {
+ List<String> names = new ArrayList<>();
+ for (SecurityProtocol protocol : SecurityProtocol.nonTestingValues())
+ names.add(protocol.name);
+ return names;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 78ae06a..ef29743 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -26,8 +26,7 @@ public class SaslConfigs {
+ "This can be defined either in Kafka's JAAS config or in Kafka's config.";
public static final String SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
- public static final String SASL_KERBEROS_KINIT_CMD_DOC = "Kerberos kinit command path. "
- + "Default is /usr/bin/kinit";
+ public static final String SASL_KERBEROS_KINIT_CMD_DOC = "Kerberos kinit command path.";
public static final String DEFAULT_KERBEROS_KINIT_CMD = "/usr/bin/kinit";
public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = "sasl.kerberos.ticket.renew.window.factor";
@@ -46,7 +45,7 @@ public class SaslConfigs {
public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = "sasl.kerberos.principal.to.local.rules";
public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal names to short names (typically operating system usernames). " +
"The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. " +
- "By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.";
+ "By default, principal names of the form {username}/{hostname}@{REALM} are mapped to {username}.";
public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
public static void addClientSaslSupport(ConfigDef config) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index a893b75..1ccd039 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -23,8 +23,7 @@ public class SslConfigs {
public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the PrincipalBuilder interface, " +
- "which is currently used to build the Principal for connections with the SSL SecurityProtocol. " +
- "Default is DefaultPrincipalBuilder.";
+ "which is currently used to build the Principal for connections with the SSL SecurityProtocol.";
public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
@@ -43,13 +42,12 @@ public class SslConfigs {
+ "By default all the available cipher suites are supported.";
public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
- public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
- + "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.";
+ public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections.";
public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
- + "This is optional for client. Default value is JKS";
+ + "This is optional for client.";
public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
@@ -65,8 +63,7 @@ public class SslConfigs {
+ "This is optional for client.";
public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
- public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. "
- + "Default value is JKS.";
+ public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file.";
public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
@@ -92,7 +89,7 @@ public class SslConfigs {
public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
+ " The following settings are common: "
+ " <ul>"
- + " <li><code>ssl.want.client.auth=required</code> If set to required"
+ + " <li><code>ssl.client.auth=required</code> If set to required"
+ " client authentication is required."
+ " <li><code>ssl.client.auth=requested</code> This means client authentication is optional."
+ " unlike requested , if this option is set client can choose not to provide authentication information about itself"
http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index ac436c3..defcc24 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -26,18 +26,13 @@ import java.nio.channels.SelectionKey;
import java.security.Principal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
public class KafkaChannel {
- private static final Logger log = LoggerFactory.getLogger(KafkaChannel.class);
private final String id;
- private TransportLayer transportLayer;
- private Authenticator authenticator;
+ private final TransportLayer transportLayer;
+ private final Authenticator authenticator;
+ private final int maxReceiveSize;
private NetworkReceive receive;
private Send send;
- private int maxReceiveSize;
public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
this.id = id;
@@ -52,9 +47,7 @@ public class KafkaChannel {
}
/**
- * returns user principal for the session
- * In case of PLAINTEXT and No Authentication returns ANONYMOUS as the userPrincipal
- * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal
+ * Returns the principal returned by `authenticator.principal()`.
*/
public Principal principal() throws IOException {
return authenticator.principal();
@@ -157,8 +150,7 @@ public class KafkaChannel {
}
private long receive(NetworkReceive receive) throws IOException {
- long result = receive.readFrom(transportLayer);
- return result;
+ return receive.readFrom(transportLayer);
}
private boolean send(Send send) throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index 70130a4..cbd0c42 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -17,30 +17,43 @@
package org.apache.kafka.common.protocol;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public enum SecurityProtocol {
/** Un-authenticated, non-encrypted channel */
- PLAINTEXT(0, "PLAINTEXT"),
+ PLAINTEXT(0, "PLAINTEXT", false),
/** SSL channel */
- SSL(1, "SSL"),
+ SSL(1, "SSL", false),
/** SASL authenticated, non-encrypted channel */
- SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),
+ SASL_PLAINTEXT(2, "SASL_PLAINTEXT", false),
/** SASL authenticated, SSL channel */
- SASL_SSL(3, "SASL_SSL"),
+ SASL_SSL(3, "SASL_SSL", false),
/** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
- TRACE(Short.MAX_VALUE, "TRACE");
+ TRACE(Short.MAX_VALUE, "TRACE", true);
- private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL = new HashMap<Short, SecurityProtocol>();
- private static final List<String> NAMES = new ArrayList<String>();
+ private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL;
+ private static final List<String> NAMES;
+ private static final Set<SecurityProtocol> NON_TESTING_VALUES;
static {
- for (SecurityProtocol proto: SecurityProtocol.values()) {
- CODE_TO_SECURITY_PROTOCOL.put(proto.id, proto);
- NAMES.add(proto.name);
+ SecurityProtocol[] protocols = SecurityProtocol.values();
+ List<String> names = new ArrayList<>(protocols.length);
+ Map<Short, SecurityProtocol> codeToSecurityProtocol = new HashMap<>(protocols.length);
+ Set<SecurityProtocol> nonTestingValues = EnumSet.noneOf(SecurityProtocol.class);
+ for (SecurityProtocol proto : protocols) {
+ codeToSecurityProtocol.put(proto.id, proto);
+ names.add(proto.name);
+ if (!proto.isTesting)
+ nonTestingValues.add(proto);
}
+ CODE_TO_SECURITY_PROTOCOL = Collections.unmodifiableMap(codeToSecurityProtocol);
+ NAMES = Collections.unmodifiableList(names);
+ NON_TESTING_VALUES = Collections.unmodifiableSet(nonTestingValues);
}
/** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */
@@ -49,9 +62,13 @@ public enum SecurityProtocol {
/** Name of the security protocol. This may be used by client configuration. */
public final String name;
- private SecurityProtocol(int id, String name) {
+ /* Whether this security protocol is for testing/debugging */
+ private final boolean isTesting;
+
+ private SecurityProtocol(int id, String name, boolean isTesting) {
this.id = (short) id;
this.name = name;
+ this.isTesting = isTesting;
}
public static String getName(int id) {
@@ -66,4 +83,12 @@ public enum SecurityProtocol {
return CODE_TO_SECURITY_PROTOCOL.get(id);
}
+ /**
+ * Returns the set of non-testing SecurityProtocol instances, that is, SecurityProtocol instances that are suitable
+ * for production usage.
+ */
+ public static Set<SecurityProtocol> nonTestingValues() {
+ return NON_TESTING_VALUES;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index bbc8fe1..2c098ea 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -157,7 +157,7 @@ public class SslSelectorTest extends SelectorTest {
}
/**
- * Connects and waits for handshake to complete. This is required since SSLTransportLayer
+ * Connects and waits for handshake to complete. This is required since SslTransportLayer
* implementation requires the channel to be ready before send is invoked (unlike plaintext
* where send can be invoked straight after connect)
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/57df460f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9556799..0cb11ab 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -32,7 +32,8 @@ import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.protocol.SecurityProtocol
-import scala.collection.{Map, immutable}
+import scala.collection.{Map, immutable, JavaConverters}
+import JavaConverters._
object Defaults {
/** ********* Zookeeper Configuration ***********/
@@ -435,7 +436,8 @@ object KafkaConfig {
val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage."
val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller"
val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss"
- val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Defaults to plain text."
+ val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Valid values are: " +
+ s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", ")}."
val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" +
" This is typically bumped after all brokers were upgraded to a new version.\n" +
" Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list."