You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2015/10/24 18:45:07 UTC
[4/4] kafka git commit: KAFKA-2460; Fix capitalisation in SSL classes
KAFKA-2460; Fix capitalisation in SSL classes
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>
Closes #355 from ijuma/kafka-2460-fix-capitalisation-in-ssl-classes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/16f194b2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/16f194b2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/16f194b2
Branch: refs/heads/trunk
Commit: 16f194b20ad9795188f1d7781e7cbca1cd2a6a2d
Parents: 6f2f1f9
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sat Oct 24 09:42:19 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sat Oct 24 09:42:19 2015 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerConfig.java | 32 +-
.../kafka/clients/producer/ProducerConfig.java | 32 +-
.../apache/kafka/common/config/SSLConfigs.java | 99 ---
.../apache/kafka/common/config/SslConfigs.java | 99 +++
.../kafka/common/network/ChannelBuilders.java | 2 +-
.../common/network/PlaintextChannelBuilder.java | 5 +-
.../kafka/common/network/SSLChannelBuilder.java | 75 --
.../kafka/common/network/SSLTransportLayer.java | 734 -------------------
.../common/network/SaslChannelBuilder.java | 14 +-
.../kafka/common/network/SslChannelBuilder.java | 75 ++
.../kafka/common/network/SslTransportLayer.java | 733 ++++++++++++++++++
.../kafka/common/network/TransportLayer.java | 3 +-
.../kafka/common/security/ssl/SSLFactory.java | 198 -----
.../kafka/common/security/ssl/SslFactory.java | 198 +++++
.../clients/producer/KafkaProducerTest.java | 4 +-
.../apache/kafka/common/network/EchoServer.java | 6 +-
.../kafka/common/network/SSLSelectorTest.java | 170 -----
.../common/network/SSLTransportLayerTest.java | 652 ----------------
.../kafka/common/network/SelectorTest.java | 4 +-
.../kafka/common/network/SslSelectorTest.java | 170 +++++
.../common/network/SslTransportLayerTest.java | 651 ++++++++++++++++
.../common/security/ssl/SSLFactoryTest.java | 61 --
.../common/security/ssl/SslFactoryTest.java | 61 ++
.../org/apache/kafka/test/TestSSLUtils.java | 243 ------
.../org/apache/kafka/test/TestSslUtils.java | 242 ++++++
.../distributed/DistributedHerderConfig.java | 32 +-
.../main/scala/kafka/api/FetchResponse.scala | 2 +-
core/src/main/scala/kafka/cluster/Broker.scala | 6 +-
.../main/scala/kafka/server/KafkaConfig.scala | 146 ++--
.../kafka/server/ReplicaFetcherThread.scala | 2 +-
.../unit/kafka/network/SocketServerTest.scala | 2 +-
.../unit/kafka/server/KafkaConfigTest.scala | 30 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 8 +-
33 files changed, 2392 insertions(+), 2399 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5cc0419..14c54c2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -17,7 +17,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.Deserializer;
@@ -287,21 +287,21 @@ public class ConsumerConfig extends AbstractConfig {
Importance.HIGH,
VALUE_DESERIALIZER_CLASS_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
- .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
- .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
- .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
- .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
- .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
- .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
- .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
- .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
- .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
- .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
- .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
- .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
- .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
- .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
- .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+ .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+ .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+ .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
+ .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+ .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+ .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+ .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+ .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+ .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+ .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+ .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+ .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index b3cfe70..6d40b77 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -22,7 +22,7 @@ import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -265,21 +265,21 @@ public class ProducerConfig extends AbstractConfig {
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
- .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
- .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
- .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
- .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
- .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
- .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
- .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
- .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
- .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
- .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
- .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
- .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
- .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
- .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
- .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+ .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+ .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+ .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
+ .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+ .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+ .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+ .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+ .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+ .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+ .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+ .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+ .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
deleted file mode 100644
index 207a349..0000000
--- a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.common.config;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManagerFactory;
-
-public class SSLConfigs {
- /*
- * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
- */
-
- public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
- public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client.";
- public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
-
- public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
- public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. "
- + "Default setting is TLS, which is fine for most cases. "
- + "Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 "
- + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.";
-
- public static final String DEFAULT_SSL_PROTOCOL = "TLS";
-
- public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
- public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
-
- public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
- public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol."
- + "By default all the available cipher suites are supported.";
-
- public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
- public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
- + "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.";
- public static final String DEFAULT_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
-
- public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
- public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
- + "This is optional for client. Default value is JKS";
- public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
-
- public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
- public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
- + "This is optional for client and can be used for two-way authentication for client.";
-
- public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
- public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file."
- + "This is optional for client and only needed if ssl.keystore.location is configured. ";
-
- public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
- public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. "
- + "This is optional for client.";
-
- public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
- public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. "
- + "Default value is JKS.";
- public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
-
- public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
- public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
-
- public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
- public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. ";
-
- public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
- public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. "
- + "Default value is the key manager factory algorithm configured for the Java Virtual Machine.";
- public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
-
- public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";
- public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by trust manager factory for SSL connections. "
- + "Default value is the trust manager factory algorithm configured for the Java Virtual Machine.";
- public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm();
-
- public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm";
- public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. ";
-
- public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
- public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
- + " The following settings are common: "
- + " <ul>"
- + " <li><code>ssl.want.client.auth=required</code> If set to required"
- + " client authentication is required."
- + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional."
- + " unlike requested , if this option is set client can choose not to provide authentication information about itself"
- + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
new file mode 100644
index 0000000..60e1eb3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+public class SslConfigs {
+ /*
+ * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+ */
+
+ public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
+ public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client.";
+ public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
+
+ public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
+ public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. "
+ + "Default setting is TLS, which is fine for most cases. "
+ + "Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 "
+ + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.";
+
+ public static final String DEFAULT_SSL_PROTOCOL = "TLS";
+
+ public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
+ public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
+
+ public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
+ public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol."
+ + "By default all the available cipher suites are supported.";
+
+ public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
+ public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
+ + "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.";
+ public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
+
+ public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
+ public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
+ + "This is optional for client. Default value is JKS";
+ public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
+
+ public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
+ public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
+ + "This is optional for client and can be used for two-way authentication for client.";
+
+ public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
+ public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file."
+ + "This is optional for client and only needed if ssl.keystore.location is configured. ";
+
+ public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
+ public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. "
+ + "This is optional for client.";
+
+ public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
+ public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. "
+ + "Default value is JKS.";
+ public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
+
+ public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
+ public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
+
+ public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
+ public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. ";
+
+ public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
+ public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. "
+ + "Default value is the key manager factory algorithm configured for the Java Virtual Machine.";
+ public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
+
+ public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";
+ public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by trust manager factory for SSL connections. "
+ + "Default value is the trust manager factory algorithm configured for the Java Virtual Machine.";
+ public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm();
+
+ public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm";
+ public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. ";
+
+ public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
+ public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
+ + " The following settings are common: "
+ + " <ul>"
+ + " <li><code>ssl.want.client.auth=required</code> If set to required"
+ + " client authentication is required."
+ + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional."
+ + " unlike requested , if this option is set client can choose not to provide authentication information about itself"
+ + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 1e5d840..03c663d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -36,7 +36,7 @@ public class ChannelBuilders {
switch (securityProtocol) {
case SSL:
requireNonNullMode(mode, securityProtocol);
- channelBuilder = new SSLChannelBuilder(mode);
+ channelBuilder = new SslChannelBuilder(mode);
break;
case SASL_SSL:
case SASL_PLAINTEXT:
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index a028159..bc1536a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -15,15 +15,14 @@ package org.apache.kafka.common.network;
import java.nio.channels.SelectionKey;
import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.common.config.SSLConfigs;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class PlaintextChannelBuilder implements ChannelBuilder {
private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class);
private PrincipalBuilder principalBuilder;
@@ -32,7 +31,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
public void configure(Map<String, ?> configs) throws KafkaException {
try {
this.configs = configs;
- this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+ this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
this.principalBuilder.configure(this.configs);
} catch (Exception e) {
throw new KafkaException(e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
deleted file mode 100644
index 1dd1ecd..0000000
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.network;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Map;
-
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.common.security.ssl.SSLFactory;
-import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.KafkaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SSLChannelBuilder implements ChannelBuilder {
- private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class);
- private SSLFactory sslFactory;
- private PrincipalBuilder principalBuilder;
- private Mode mode;
- private Map<String, ?> configs;
-
- public SSLChannelBuilder(Mode mode) {
- this.mode = mode;
- }
-
- public void configure(Map<String, ?> configs) throws KafkaException {
- try {
- this.configs = configs;
- this.sslFactory = new SSLFactory(mode);
- this.sslFactory.configure(this.configs);
- this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
- this.principalBuilder.configure(this.configs);
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- }
-
- public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
- KafkaChannel channel = null;
- try {
- SSLTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
- Authenticator authenticator = new DefaultAuthenticator();
- authenticator.configure(transportLayer, this.principalBuilder, this.configs);
- channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
- } catch (Exception e) {
- log.info("Failed to create channel due to ", e);
- throw new KafkaException(e);
- }
- return channel;
- }
-
- public void close() {
- this.principalBuilder.close();
- }
-
- protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
- SocketChannel socketChannel = (SocketChannel) key.channel();
- return SSLTransportLayer.create(id, key,
- sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
- socketChannel.socket().getPort()));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
deleted file mode 100644
index 813f0b1..0000000
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
+++ /dev/null
@@ -1,734 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.network;
-
-import java.io.IOException;
-import java.io.EOFException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.CancelledKeyException;
-
-import java.security.Principal;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.SSLPeerUnverifiedException;
-
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * Transport layer for SSL communication
- */
-
-public class SSLTransportLayer implements TransportLayer {
- private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class);
- private final String channelId;
- private final SSLEngine sslEngine;
- private final SelectionKey key;
- private final SocketChannel socketChannel;
- private final boolean enableRenegotiation;
-
- private HandshakeStatus handshakeStatus;
- private SSLEngineResult handshakeResult;
- private boolean handshakeComplete = false;
- private boolean closing = false;
- private ByteBuffer netReadBuffer;
- private ByteBuffer netWriteBuffer;
- private ByteBuffer appReadBuffer;
- private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
-
- public static SSLTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
- // Disable renegotiation by default until we have fixed the known issues with the existing implementation
- SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine, false);
- transportLayer.startHandshake();
- return transportLayer;
- }
-
- // Prefer `create`, only use this in tests
- SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException {
- this.channelId = channelId;
- this.key = key;
- this.socketChannel = (SocketChannel) key.channel();
- this.sslEngine = sslEngine;
- this.enableRenegotiation = enableRenegotiation;
- }
-
- /**
- * starts sslEngine handshake process
- */
- protected void startHandshake() throws IOException {
-
- this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
- this.netWriteBuffer = ByteBuffer.allocate(netWriteBufferSize());
- this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
-
- //clear & set netRead & netWrite buffers
- netWriteBuffer.position(0);
- netWriteBuffer.limit(0);
- netReadBuffer.position(0);
- netReadBuffer.limit(0);
- handshakeComplete = false;
- closing = false;
- //initiate handshake
- sslEngine.beginHandshake();
- handshakeStatus = sslEngine.getHandshakeStatus();
- }
-
- @Override
- public boolean ready() {
- return handshakeComplete;
- }
-
- /**
- * does socketChannel.finishConnect()
- */
- @Override
- public void finishConnect() throws IOException {
- socketChannel.finishConnect();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
- }
-
- /**
- * disconnects selectionKey.
- */
- @Override
- public void disconnect() {
- key.cancel();
- }
-
- @Override
- public SocketChannel socketChannel() {
- return socketChannel;
- }
-
- @Override
- public boolean isOpen() {
- return socketChannel.isOpen();
- }
-
- @Override
- public boolean isConnected() {
- return socketChannel.isConnected();
- }
-
-
- /**
- * Sends a SSL close message and closes socketChannel.
- * @throws IOException if an I/O error occurs
- * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it
- */
- @Override
- public void close() throws IOException {
- if (closing) return;
- closing = true;
- sslEngine.closeOutbound();
- try {
- if (!flush(netWriteBuffer)) {
- throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
- }
- //prep the buffer for the close message
- netWriteBuffer.clear();
- //perform the close, since we called sslEngine.closeOutbound
- SSLEngineResult handshake = sslEngine.wrap(emptyBuf, netWriteBuffer);
- //we should be in a close state
- if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) {
- throw new IOException("Invalid close state, will not send network data.");
- }
- netWriteBuffer.flip();
- flush(netWriteBuffer);
- socketChannel.socket().close();
- socketChannel.close();
- } catch (IOException ie) {
- log.warn("Failed to send SSL Close message ", ie);
- }
- key.attach(null);
- key.cancel();
- }
-
- /**
- * returns true if there are any pending contents in netWriteBuffer
- */
- @Override
- public boolean hasPendingWrites() {
- return netWriteBuffer.hasRemaining();
- }
-
- /**
- * Flushes the buffer to the network, non blocking
- * @param buf ByteBuffer
- * @return boolean true if the buffer has been emptied out, false otherwise
- * @throws IOException
- */
- private boolean flush(ByteBuffer buf) throws IOException {
- int remaining = buf.remaining();
- if (remaining > 0) {
- int written = socketChannel.write(buf);
- return written >= remaining;
- }
- return true;
- }
-
- /**
- * Performs SSL handshake, non blocking.
- * Before application data (kafka protocols) can be sent client & kafka broker must
- * perform ssl handshake.
- * During the handshake SSLEngine generates encrypted data that will be transported over socketChannel.
- * Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus field is used to
- * determine what operation needs to occur to move handshake along.
- * A typical handshake might look like this.
- * +-------------+----------------------------------+-------------+
- * | client | SSL/TLS message | HSStatus |
- * +-------------+----------------------------------+-------------+
- * | wrap() | ClientHello | NEED_UNWRAP |
- * | unwrap() | ServerHello/Cert/ServerHelloDone | NEED_WRAP |
- * | wrap() | ClientKeyExchange | NEED_WRAP |
- * | wrap() | ChangeCipherSpec | NEED_WRAP |
- * | wrap() | Finished | NEED_UNWRAP |
- * | unwrap() | ChangeCipherSpec | NEED_UNWRAP |
- * | unwrap() | Finished | FINISHED |
- * +-------------+----------------------------------+-------------+
- *
- * @throws IOException
- */
- @Override
- public void handshake() throws IOException {
- boolean read = key.isReadable();
- boolean write = key.isWritable();
- handshakeComplete = false;
- handshakeStatus = sslEngine.getHandshakeStatus();
- if (!flush(netWriteBuffer)) {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- return;
- }
- try {
- switch (handshakeStatus) {
- case NEED_TASK:
- log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
- channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
- handshakeStatus = runDelegatedTasks();
- break;
- case NEED_WRAP:
- log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
- channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
- handshakeResult = handshakeWrap(write);
- if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
- int currentNetWriteBufferSize = netWriteBufferSize();
- netWriteBuffer.compact();
- netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
- netWriteBuffer.flip();
- if (netWriteBuffer.limit() >= currentNetWriteBufferSize) {
- throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.limit() +
- ") >= network buffer size (" + currentNetWriteBufferSize + ")");
- }
- } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
- throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP.");
- } else if (handshakeResult.getStatus() == Status.CLOSED) {
- throw new EOFException();
- }
- log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
- channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
- //if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents
- //we will break here otherwise we can do need_unwrap in the same call.
- if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || !flush(netWriteBuffer)) {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
- }
- case NEED_UNWRAP:
- log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
- channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
- do {
- handshakeResult = handshakeUnwrap(read);
- if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
- int currentAppBufferSize = applicationBufferSize();
- appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
- if (appReadBuffer.position() > currentAppBufferSize) {
- throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() +
- ") > packet buffer size (" + currentAppBufferSize + ")");
- }
- }
- } while (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW);
- if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
- int currentNetReadBufferSize = netReadBufferSize();
- netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
- if (netReadBuffer.position() >= currentNetReadBufferSize) {
- throw new IllegalStateException("Buffer underflow when there is available data");
- }
- } else if (handshakeResult.getStatus() == Status.CLOSED) {
- throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP");
- }
- log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
- channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
-
- //if handshakeStatus completed than fall-through to finished status.
- //after handshake is finished there is no data left to read/write in socketChannel.
- //so the selector won't invoke this channel if we don't go through the handshakeFinished here.
- if (handshakeStatus != HandshakeStatus.FINISHED) {
- if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- }
- break;
- }
- case FINISHED:
- handshakeFinished();
- break;
- case NOT_HANDSHAKING:
- handshakeFinished();
- break;
- default:
- throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
- }
-
- } catch (SSLException e) {
- handshakeFailure();
- throw e;
- }
- }
-
- private void renegotiate() throws IOException {
- if (!enableRenegotiation)
- throw new SSLHandshakeException("Renegotiation is not supported");
- handshake();
- }
-
-
- /**
- * Executes the SSLEngine tasks needed.
- * @return HandshakeStatus
- */
- private HandshakeStatus runDelegatedTasks() {
- for (;;) {
- Runnable task = delegatedTask();
- if (task == null) {
- break;
- }
- task.run();
- }
- return sslEngine.getHandshakeStatus();
- }
-
- /**
- * Checks if the handshake status is finished
- * Sets the interestOps for the selectionKey.
- */
- private void handshakeFinished() throws IOException {
- // SSLEngine.getHandshakeStatus is transient and it doesn't record FINISHED status properly.
- // It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed.
- // Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake finished or not
- if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) {
- //we are complete if we have delivered the last package
- handshakeComplete = !netWriteBuffer.hasRemaining();
- //remove OP_WRITE if we are complete, otherwise we still have data to write
- if (!handshakeComplete)
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- else
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
- log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ",
- channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
- } else {
- throw new IOException("NOT_HANDSHAKING during handshake");
- }
- }
-
- /**
- * Performs the WRAP function
- * @param doWrite boolean
- * @return SSLEngineResult
- * @throws IOException
- */
- private SSLEngineResult handshakeWrap(boolean doWrite) throws IOException {
- log.trace("SSLHandshake handshakeWrap {}", channelId);
- if (netWriteBuffer.hasRemaining())
- throw new IllegalStateException("handshakeWrap called with netWriteBuffer not empty");
- //this should never be called with a network buffer that contains data
- //so we can clear it here.
- netWriteBuffer.clear();
- SSLEngineResult result = sslEngine.wrap(emptyBuf, netWriteBuffer);
- //prepare the results to be written
- netWriteBuffer.flip();
- handshakeStatus = result.getHandshakeStatus();
- if (result.getStatus() == SSLEngineResult.Status.OK &&
- result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
- handshakeStatus = runDelegatedTasks();
- }
-
- if (doWrite) flush(netWriteBuffer);
- return result;
- }
-
- /**
- * Perform handshake unwrap
- * @param doRead boolean
- * @return SSLEngineResult
- * @throws IOException
- */
- private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
- log.trace("SSLHandshake handshakeUnwrap {}", channelId);
- SSLEngineResult result;
- boolean cont = false;
- int read = 0;
- if (doRead) {
- read = socketChannel.read(netReadBuffer);
- if (read == -1) throw new EOFException("EOF during handshake.");
- }
- do {
- //prepare the buffer with the incoming data
- netReadBuffer.flip();
- result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
- netReadBuffer.compact();
- handshakeStatus = result.getHandshakeStatus();
- if (result.getStatus() == SSLEngineResult.Status.OK &&
- result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
- handshakeStatus = runDelegatedTasks();
- }
- cont = result.getStatus() == SSLEngineResult.Status.OK &&
- handshakeStatus == HandshakeStatus.NEED_UNWRAP;
- log.trace("SSLHandshake handshakeUnwrap: handshakeStatus {} status {}", handshakeStatus, result.getStatus());
- } while (netReadBuffer.position() != 0 && cont);
-
- return result;
- }
-
-
- /**
- * Reads a sequence of bytes from this channel into the given buffer.
- *
- * @param dst The buffer into which bytes are to be transferred
- * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
- * @throws IOException if some other I/O error occurs
- */
- @Override
- public int read(ByteBuffer dst) throws IOException {
- if (closing) return -1;
- int read = 0;
- if (!handshakeComplete) return read;
-
- //if we have unread decrypted data in appReadBuffer read that into dst buffer.
- if (appReadBuffer.position() > 0) {
- read = readFromAppBuffer(dst);
- }
-
- if (dst.remaining() > 0) {
- netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
- if (netReadBuffer.remaining() > 0) {
- int netread = socketChannel.read(netReadBuffer);
- if (netread == 0) return netread;
- else if (netread < 0) throw new EOFException("EOF during read");
- }
- do {
- netReadBuffer.flip();
- SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
- netReadBuffer.compact();
- // handle ssl renegotiation.
- if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && unwrapResult.getStatus() == Status.OK) {
- log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
- channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
- renegotiate();
- break;
- }
-
- if (unwrapResult.getStatus() == Status.OK) {
- read += readFromAppBuffer(dst);
- } else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
- int currentApplicationBufferSize = applicationBufferSize();
- appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize);
- if (appReadBuffer.position() >= currentApplicationBufferSize) {
- throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() +
- ") >= application buffer size (" + currentApplicationBufferSize + ")");
- }
-
- // appReadBuffer will extended upto currentApplicationBufferSize
- // we need to read the existing content into dst before we can do unwrap again. If there are no space in dst
- // we can break here.
- if (dst.hasRemaining())
- read += readFromAppBuffer(dst);
- else
- break;
- } else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
- int currentNetReadBufferSize = netReadBufferSize();
- netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
- if (netReadBuffer.position() >= currentNetReadBufferSize) {
- throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() +
- ") > packet buffer size (" + currentNetReadBufferSize + ")");
- }
- break;
- } else if (unwrapResult.getStatus() == Status.CLOSED) {
- throw new EOFException();
- }
- } while (netReadBuffer.position() != 0);
- }
- return read;
- }
-
-
- /**
- * Reads a sequence of bytes from this channel into the given buffers.
- *
- * @param dsts - The buffers into which bytes are to be transferred.
- * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
- * @throws IOException if some other I/O error occurs
- */
- @Override
- public long read(ByteBuffer[] dsts) throws IOException {
- return read(dsts, 0, dsts.length);
- }
-
-
- /**
- * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
- * @param dsts - The buffers into which bytes are to be transferred
- * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
- * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
- * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
- * @throws IOException if some other I/O error occurs
- */
- @Override
- public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
- if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
- throw new IndexOutOfBoundsException();
-
- int totalRead = 0;
- int i = offset;
- while (i < length) {
- if (dsts[i].hasRemaining()) {
- int read = read(dsts[i]);
- if (read > 0)
- totalRead += read;
- else
- break;
- }
- if (!dsts[i].hasRemaining()) {
- i++;
- }
- }
- return totalRead;
- }
-
-
- /**
- * Writes a sequence of bytes to this channel from the given buffer.
- *
- * @param src The buffer from which bytes are to be retrieved
- * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
- * @throws IOException If some other I/O error occurs
- */
- @Override
- public int write(ByteBuffer src) throws IOException {
- int written = 0;
- if (closing) throw new IllegalStateException("Channel is in closing state");
- if (!handshakeComplete) return written;
-
- if (!flush(netWriteBuffer))
- return written;
-
- netWriteBuffer.clear();
- SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
- netWriteBuffer.flip();
-
- //handle ssl renegotiation
- if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) {
- renegotiate();
- return written;
- }
-
- if (wrapResult.getStatus() == Status.OK) {
- written = wrapResult.bytesConsumed();
- flush(netWriteBuffer);
- } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
- int currentNetWriteBufferSize = netWriteBufferSize();
- netWriteBuffer.compact();
- netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
- netWriteBuffer.flip();
- if (netWriteBuffer.limit() >= currentNetWriteBufferSize)
- throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.limit() + ") >= network buffer size (" + currentNetWriteBufferSize + ")");
- } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
- throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
- } else if (wrapResult.getStatus() == Status.CLOSED) {
- throw new EOFException();
- }
- return written;
- }
-
- /**
- * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
- *
- * @param srcs The buffers from which bytes are to be retrieved
- * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
- * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
- * @return returns no.of bytes written , possibly zero.
- * @throws IOException If some other I/O error occurs
- */
- @Override
- public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
- if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
- throw new IndexOutOfBoundsException();
- int totalWritten = 0;
- int i = offset;
- while (i < length) {
- if (srcs[i].hasRemaining() || hasPendingWrites()) {
- int written = write(srcs[i]);
- if (written > 0) {
- totalWritten += written;
- }
- }
- if (!srcs[i].hasRemaining() && !hasPendingWrites()) {
- i++;
- } else {
- // if we are unable to write the current buffer to socketChannel we should break,
- // as we might have reached max socket send buffer size.
- break;
- }
- }
- return totalWritten;
- }
-
- /**
- * Writes a sequence of bytes to this channel from the given buffers.
- *
- * @param srcs The buffers from which bytes are to be retrieved
- * @return returns no.of bytes consumed by SSLEngine.wrap , possibly zero.
- * @throws IOException If some other I/O error occurs
- */
- @Override
- public long write(ByteBuffer[] srcs) throws IOException {
- return write(srcs, 0, srcs.length);
- }
-
-
- /**
- * SSLSession's peerPrincipal for the remote host.
- * @return Principal
- */
- public Principal peerPrincipal() throws IOException {
- try {
- return sslEngine.getSession().getPeerPrincipal();
- } catch (SSLPeerUnverifiedException se) {
- log.warn("SSL peer is not authenticated, returning ANONYMOUS instead");
- return KafkaPrincipal.ANONYMOUS;
- }
- }
-
- /**
- * returns a SSL Session after the handshake is established
- * throws IllegalStateException if the handshake is not established
- */
- public SSLSession sslSession() throws IllegalStateException {
- return sslEngine.getSession();
- }
-
- /**
- * Adds interestOps to SelectionKey of the TransportLayer
- * @param ops SelectionKey interestOps
- */
- @Override
- public void addInterestOps(int ops) {
- if (!key.isValid())
- throw new CancelledKeyException();
- else if (!handshakeComplete)
- throw new IllegalStateException("handshake is not completed");
-
- key.interestOps(key.interestOps() | ops);
- }
-
- /**
- * removes interestOps to SelectionKey of the TransportLayer
- * @param ops SelectionKey interestOps
- */
- @Override
- public void removeInterestOps(int ops) {
- if (!key.isValid())
- throw new CancelledKeyException();
- else if (!handshakeComplete)
- throw new IllegalStateException("handshake is not completed");
-
- key.interestOps(key.interestOps() & ~ops);
- }
-
-
- /**
- * returns delegatedTask for the SSLEngine.
- */
- protected Runnable delegatedTask() {
- return sslEngine.getDelegatedTask();
- }
-
- /**
- * transfers appReadBuffer contents (decrypted data) into dst bytebuffer
- * @param dst ByteBuffer
- */
- private int readFromAppBuffer(ByteBuffer dst) {
- appReadBuffer.flip();
- int remaining = Math.min(appReadBuffer.remaining(), dst.remaining());
- if (remaining > 0) {
- int limit = appReadBuffer.limit();
- appReadBuffer.limit(appReadBuffer.position() + remaining);
- dst.put(appReadBuffer);
- appReadBuffer.limit(limit);
- }
- appReadBuffer.compact();
- return remaining;
- }
-
- protected int netReadBufferSize() {
- return sslEngine.getSession().getPacketBufferSize();
- }
-
- protected int netWriteBufferSize() {
- return sslEngine.getSession().getPacketBufferSize();
- }
-
- protected int applicationBufferSize() {
- return sslEngine.getSession().getApplicationBufferSize();
- }
-
- protected ByteBuffer netReadBuffer() {
- return netReadBuffer;
- }
-
- private void handshakeFailure() {
- //Release all resources such as internal buffers that SSLEngine is managing
- sslEngine.closeOutbound();
- try {
- sslEngine.closeInbound();
- } catch (SSLException e) {
- log.debug("SSLEngine.closeInBound() raised an exception.", e);
- }
- }
-
- @Override
- public boolean isMute() {
- return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
- }
-
- @Override
- public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
- return fileChannel.transferTo(position, count, this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 53953c5..148e549 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -25,9 +25,9 @@ import org.apache.kafka.common.security.kerberos.KerberosNameParser;
import org.apache.kafka.common.security.kerberos.LoginManager;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
-import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.KafkaException;
@@ -43,7 +43,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
private LoginManager loginManager;
private PrincipalBuilder principalBuilder;
- private SSLFactory sslFactory;
+ private SslFactory sslFactory;
private Map<String, ?> configs;
private KerberosNameParser kerberosNameParser;
@@ -57,7 +57,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
try {
this.configs = configs;
this.loginManager = LoginManager.acquireLoginManager(loginType, configs);
- this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+ this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
this.principalBuilder.configure(configs);
String defaultRealm;
@@ -69,7 +69,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.AUTH_TO_LOCAL));
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
- this.sslFactory = new SSLFactory(mode);
+ this.sslFactory = new SslFactory(mode);
this.sslFactory.configure(this.configs);
}
} catch (Exception e) {
@@ -102,8 +102,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
- return SSLTransportLayer.create(id, key,
- sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
+ return SslTransportLayer.create(id, key,
+ sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
socketChannel.socket().getPort()));
} else {
return new PlaintextTransportLayer(key);
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
new file mode 100644
index 0000000..8edd37e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SslChannelBuilder implements ChannelBuilder {
+ private static final Logger log = LoggerFactory.getLogger(SslChannelBuilder.class);
+ private SslFactory sslFactory;
+ private PrincipalBuilder principalBuilder;
+ private Mode mode;
+ private Map<String, ?> configs;
+
+ public SslChannelBuilder(Mode mode) {
+ this.mode = mode;
+ }
+
+ public void configure(Map<String, ?> configs) throws KafkaException {
+ try {
+ this.configs = configs;
+ this.sslFactory = new SslFactory(mode);
+ this.sslFactory.configure(this.configs);
+ this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+ this.principalBuilder.configure(this.configs);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+ KafkaChannel channel = null;
+ try {
+ SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
+ Authenticator authenticator = new DefaultAuthenticator();
+ authenticator.configure(transportLayer, this.principalBuilder, this.configs);
+ channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ } catch (Exception e) {
+ log.info("Failed to create channel due to ", e);
+ throw new KafkaException(e);
+ }
+ return channel;
+ }
+
+ public void close() {
+ this.principalBuilder.close();
+ }
+
+ protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
+ SocketChannel socketChannel = (SocketChannel) key.channel();
+ return SslTransportLayer.create(id, key,
+ sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
+ socketChannel.socket().getPort()));
+ }
+}