You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/12 19:07:14 UTC

kafka git commit: KAFKA-2690: Hide passwords while logging the config.

Repository: kafka
Updated Branches:
  refs/heads/trunk 370ce2b4b -> ab5ac264a


KAFKA-2690: Hide passwords while logging the config.

Added PASSWORD_STRING in ConfigDef that returns "[hidden]" when method toString is invoked.

Author: Jakub Nowak <ja...@interia.pl>

Reviewers: Ismael Juma, Gwen Shapira, Jun Rao

Closes #371 from Mszak/ssl-password-protection


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ab5ac264
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ab5ac264
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ab5ac264

Branch: refs/heads/trunk
Commit: ab5ac264a71d7f895b21b4acfd93d9581dabd7c1
Parents: 370ce2b
Author: Jakub Nowak <ja...@interia.pl>
Authored: Thu Nov 12 10:07:04 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Nov 12 10:07:04 2015 -0800

----------------------------------------------------------------------
 .../kafka/common/config/AbstractConfig.java     |  5 ++
 .../apache/kafka/common/config/ConfigDef.java   | 10 ++-
 .../apache/kafka/common/config/SslConfigs.java  |  6 +-
 .../kafka/common/config/types/Password.java     | 68 ++++++++++++++++++++
 .../kafka/common/security/ssl/SslFactory.java   | 23 +++----
 .../kafka/common/config/ConfigDefTest.java      | 26 +++++++-
 .../common/network/SslTransportLayerTest.java   |  3 +-
 .../org/apache/kafka/test/TestSslUtils.java     | 31 ++++-----
 .../main/scala/kafka/server/KafkaConfig.scala   | 12 ++--
 .../test/scala/unit/kafka/KafkaConfigTest.scala | 17 +++++
 10 files changed, 163 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 1029356..afb3b3f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -16,6 +16,7 @@ import java.util.*;
 
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,6 +96,10 @@ public class AbstractConfig {
         return (String) get(key);
     }
 
+    public Password getPassword(String key) {
+        return (Password) get(key);
+    }
+
     public Class<?> getClass(String key) {
         return (Class<?>) get(key);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 13fb829..fe7bcce 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
 
 /**
@@ -184,6 +185,13 @@ public class ConfigDef {
                         return value;
                     else
                         throw new ConfigException(name, value, "Expected value to be either true or false");
+                case PASSWORD:
+                    if (value instanceof Password)
+                        return value;
+                    else if (value instanceof String)
+                        return new Password(trimmed);
+                    else
+                        throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
                 case STRING:
                     if (value instanceof String)
                         return trimmed;
@@ -252,7 +260,7 @@ public class ConfigDef {
      * The config types
      */
     public enum Type {
-        BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS;
+        BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
     }
 
     public enum Importance {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index d257e35..ae4667a 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -104,11 +104,11 @@ public class SslConfigs {
                 .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
                 .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
                 .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null,  ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
-                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
-                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
+                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC)
                 .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
                 .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
-                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
                 .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
                 .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
                 .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/clients/src/main/java/org/apache/kafka/common/config/types/Password.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/types/Password.java b/clients/src/main/java/org/apache/kafka/common/config/types/Password.java
new file mode 100644
index 0000000..db8f821
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/types/Password.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.config.types;
+
+/**
+ * A wrapper class for passwords to hide them while logging a config
+ */
+public class Password {
+
+    public static final String HIDDEN = "[hidden]";
+
+    private final String value;
+
+    /**
+     * Construct a new Password object
+     * @param value The value of a password
+     */
+    public Password(String value) {
+        this.value = value;
+    }
+
+    @Override
+    public int hashCode() {
+        return value.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof Password))
+            return false;
+        Password other = (Password) obj;
+        return value.equals(other.value);
+    }
+
+    /**
+     * Returns hidden password string
+     *
+     * @return hidden password string
+     */
+    @Override
+    public String toString() {
+        return HIDDEN;
+    }
+
+    /**
+     * Returns real password string
+     *
+     * @return real password string
+     */
+    public String value() {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 0984ba0..a7cf9a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.config.types.Password;
 
 import javax.net.ssl.*;
 import java.io.FileInputStream;
@@ -37,7 +38,7 @@ public class SslFactory implements Configurable {
     private String kmfAlgorithm;
     private String tmfAlgorithm;
     private SecurityStore keystore = null;
-    private String keyPassword;
+    private Password keyPassword;
     private SecurityStore truststore;
     private String[] cipherSuites;
     private String[] enabledProtocols;
@@ -82,12 +83,12 @@ public class SslFactory implements Configurable {
 
         createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
                        (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
-                       (String) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
-                       (String) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
+                       (Password) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+                       (Password) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
 
         createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
                          (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
-                         (String) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+                         (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
         try {
             this.sslContext = createSSLContext();
         } catch (Exception e) {
@@ -108,8 +109,8 @@ public class SslFactory implements Configurable {
             String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
             KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
             KeyStore ks = keystore.load();
-            String keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password;
-            kmf.init(ks, keyPassword.toCharArray());
+            Password keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password;
+            kmf.init(ks, keyPassword.value().toCharArray());
             keyManagers = kmf.getKeyManagers();
         }
 
@@ -150,7 +151,7 @@ public class SslFactory implements Configurable {
         return sslContext;
     }
 
-    private void createKeystore(String type, String path, String password, String keyPassword) {
+    private void createKeystore(String type, String path, Password password, Password keyPassword) {
         if (path == null && password != null) {
             throw new KafkaException("SSL key store is not specified, but key store password is specified.");
         } else if (path != null && password == null) {
@@ -161,7 +162,7 @@ public class SslFactory implements Configurable {
         }
     }
 
-    private void createTruststore(String type, String path, String password) {
+    private void createTruststore(String type, String path, Password password) {
         if (path == null && password != null) {
             throw new KafkaException("SSL trust store is not specified, but trust store password is specified.");
         } else if (path != null && password == null) {
@@ -174,9 +175,9 @@ public class SslFactory implements Configurable {
     private class SecurityStore {
         private final String type;
         private final String path;
-        private final String password;
+        private final Password password;
 
-        private SecurityStore(String type, String path, String password) {
+        private SecurityStore(String type, String path, Password password) {
             this.type = type == null ? KeyStore.getDefaultType() : type;
             this.path = path;
             this.password = password;
@@ -187,7 +188,7 @@ public class SslFactory implements Configurable {
             try {
                 KeyStore ks = KeyStore.getInstance(type);
                 in = new FileInputStream(path);
-                ks.load(in, password.toCharArray());
+                ks.load(in, password.value().toCharArray());
                 return ks;
             } finally {
                 if (in != null) in.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index cb22ce1..cb6de24 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Validator;
 import org.apache.kafka.common.config.ConfigDef.Range;
 import org.apache.kafka.common.config.ConfigDef.ValidString;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.types.Password;
 import org.junit.Test;
 
 public class ConfigDefTest {
@@ -39,7 +40,8 @@ public class ConfigDefTest {
                                        .define("f", Type.CLASS, Importance.HIGH, "docs")
                                        .define("g", Type.BOOLEAN, Importance.HIGH, "docs")
                                        .define("h", Type.BOOLEAN, Importance.HIGH, "docs")
-                                       .define("i", Type.BOOLEAN, Importance.HIGH, "docs");
+                                       .define("i", Type.BOOLEAN, Importance.HIGH, "docs")
+                                       .define("j", Type.PASSWORD, Importance.HIGH, "docs");
 
         Properties props = new Properties();
         props.put("a", "1   ");
@@ -50,6 +52,7 @@ public class ConfigDefTest {
         props.put("g", "true");
         props.put("h", "FalSE");
         props.put("i", "TRUE");
+        props.put("j", "password");
 
         Map<String, Object> vals = def.parse(props);
         assertEquals(1, vals.get("a"));
@@ -61,6 +64,8 @@ public class ConfigDefTest {
         assertEquals(true, vals.get("g"));
         assertEquals(false, vals.get("h"));
         assertEquals(true, vals.get("i"));
+        assertEquals(new Password("password"), vals.get("j"));
+        assertEquals(Password.HIDDEN, vals.get("j").toString());
     }
 
     @Test(expected = ConfigException.class)
@@ -127,6 +132,25 @@ public class ConfigDefTest {
                 new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs"});
     }
 
+    @Test
+    public void testSslPasswords() {
+        ConfigDef def = new ConfigDef();
+        SslConfigs.addClientSslSupport(def);
+
+        Properties props = new Properties();
+        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "key_password");
+        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystore_password");
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststore_password");
+
+        Map<String, Object> vals = def.parse(props);
+        assertEquals(new Password("key_password"), vals.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
+        assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG).toString());
+        assertEquals(new Password("keystore_password"), vals.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
+        assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG).toString());
+        assertEquals(new Password("truststore_password"), vals.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+        assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).toString());
+    }
+
     private void testValidators(Type type, Validator validator, Object defaultVal, Object[] okValues, Object[] badValues) {
         ConfigDef def = new ConfigDef().define("name", type, defaultVal, validator, Importance.HIGH, "docs");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 91bd47c..282ff8b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.test.TestUtils;
@@ -301,7 +302,7 @@ public class SslTransportLayerTest {
     @Test
     public void testInvalidKeyPassword() throws Exception {
         String node = "0";
-        sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid");
+        sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("invalid"));
         createEchoServer(sslServerConfigs);        
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 30bdb6d..5420b26 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -33,6 +33,7 @@ import java.security.cert.Certificate;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 
+import org.apache.kafka.common.config.types.Password;
 import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
 import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
@@ -105,20 +106,20 @@ public class TestSslUtils {
     }
 
     private static void saveKeyStore(KeyStore ks, String filename,
-                                     String password) throws GeneralSecurityException, IOException {
+                                     Password password) throws GeneralSecurityException, IOException {
         FileOutputStream out = new FileOutputStream(filename);
         try {
-            ks.store(out, password.toCharArray());
+            ks.store(out, password.value().toCharArray());
         } finally {
             out.close();
         }
     }
 
     public static void createKeyStore(String filename,
-                                      String password, String alias,
+                                      Password password, String alias,
                                       Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
         KeyStore ks = createEmptyKeyStore();
-        ks.setKeyEntry(alias, privateKey, password.toCharArray(),
+        ks.setKeyEntry(alias, privateKey, password.value().toCharArray(),
                 new Certificate[]{cert});
         saveKeyStore(ks, filename, password);
     }
@@ -136,16 +137,16 @@ public class TestSslUtils {
      * @throws IOException if there is an I/O error saving the file
      */
     public static void createKeyStore(String filename,
-                                      String password, String keyPassword, String alias,
+                                      Password password, Password keyPassword, String alias,
                                       Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
         KeyStore ks = createEmptyKeyStore();
-        ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(),
+        ks.setKeyEntry(alias, privateKey, keyPassword.value().toCharArray(),
                 new Certificate[]{cert});
         saveKeyStore(ks, filename, password);
     }
 
     public static void createTrustStore(String filename,
-                                        String password, String alias,
+                                        Password password, String alias,
                                         Certificate cert) throws GeneralSecurityException, IOException {
         KeyStore ks = createEmptyKeyStore();
         ks.setCertificateEntry(alias, cert);
@@ -153,11 +154,11 @@ public class TestSslUtils {
     }
 
     public static <T extends Certificate> void createTrustStore(
-            String filename, String password, Map<String, T> certs) throws GeneralSecurityException, IOException {
+            String filename, Password password, Map<String, T> certs) throws GeneralSecurityException, IOException {
         KeyStore ks = KeyStore.getInstance("JKS");
         try {
             FileInputStream in = new FileInputStream(filename);
-            ks.load(in, password.toCharArray());
+            ks.load(in, password.value().toCharArray());
             in.close();
         } catch (EOFException e) {
             ks = createEmptyKeyStore();
@@ -176,8 +177,8 @@ public class TestSslUtils {
         return certs;
     }
 
-    public static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, String password, String keyPassword,
-                                                      File trustStoreFile, String trustStorePassword) {
+    public static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword,
+                                                      File trustStoreFile, Password trustStorePassword) {
         Map<String, Object> sslConfigs = new HashMap<>();
         sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol
         sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
@@ -206,14 +207,14 @@ public class TestSslUtils {
         throws IOException, GeneralSecurityException {
         Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
         File keyStoreFile;
-        String password;
+        Password password;
 
         if (mode == Mode.SERVER)
-            password = "ServerPassword";
+            password = new Password("ServerPassword");
         else
-            password = "ClientPassword";
+            password = new Password("ClientPassword");
 
-        String trustStorePassword = "TrustStorePassword";
+        Password trustStorePassword = new Password("TrustStorePassword");
 
         if (useClientCert) {
             keyStoreFile = File.createTempFile("clientKS", ".jks");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index bcedfaf..1d25959 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -651,11 +651,11 @@ object KafkaConfig {
       .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc)
       .define(SslKeystoreTypeProp, STRING, Defaults.SslKeystoreType, MEDIUM, SslKeystoreTypeDoc)
       .define(SslKeystoreLocationProp, STRING, null, MEDIUM, SslKeystoreLocationDoc)
-      .define(SslKeystorePasswordProp, STRING, null, MEDIUM, SslKeystorePasswordDoc)
-      .define(SslKeyPasswordProp, STRING, null, MEDIUM, SslKeyPasswordDoc)
+      .define(SslKeystorePasswordProp, PASSWORD, null, MEDIUM, SslKeystorePasswordDoc)
+      .define(SslKeyPasswordProp, PASSWORD, null, MEDIUM, SslKeyPasswordDoc)
       .define(SslTruststoreTypeProp, STRING, Defaults.SslTruststoreType, MEDIUM, SslTruststoreTypeDoc)
       .define(SslTruststoreLocationProp, STRING, null, MEDIUM, SslTruststoreLocationDoc)
-      .define(SslTruststorePasswordProp, STRING, null, MEDIUM, SslTruststorePasswordDoc)
+      .define(SslTruststorePasswordProp, PASSWORD, null, MEDIUM, SslTruststorePasswordDoc)
       .define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc)
       .define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc)
       .define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, SslEndpointIdentificationAlgorithmDoc)
@@ -821,11 +821,11 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp)
   val sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
   val sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
-  val sslKeystorePassword = getString(KafkaConfig.SslKeystorePasswordProp)
-  val sslKeyPassword = getString(KafkaConfig.SslKeyPasswordProp)
+  val sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp)
+  val sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp)
   val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp)
   val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp)
-  val sslTruststorePassword = getString(KafkaConfig.SslTruststorePasswordProp)
+  val sslTruststorePassword = getPassword(KafkaConfig.SslTruststorePasswordProp)
   val sslKeyManagerAlgorithm = getString(KafkaConfig.SslKeyManagerAlgorithmProp)
   val sslTrustManagerAlgorithm = getString(KafkaConfig.SslTrustManagerAlgorithmProp)
   val sslClientAuth = getString(KafkaConfig.SslClientAuthProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab5ac264/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 1233104..806c704 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -20,6 +20,8 @@ import java.io.{FileOutputStream, File}
 import java.security.Permission
 
 import kafka.server.KafkaConfig
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.config.types.Password
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
@@ -99,6 +101,21 @@ class KafkaTest {
     KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2")))
   }
 
+  @Test
+  def testKafkaSslPasswords(): Unit = {
+    val propertiesFile = prepareDefaultConfig()
+    val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password",
+                                                                                    "--override", "ssl.key.password=key_password",
+                                                                                    "--override", "ssl.truststore.password=truststore_password")))
+    assertEquals(Password.HIDDEN, config.sslKeyPassword.toString)
+    assertEquals(Password.HIDDEN, config.sslKeystorePassword.toString)
+    assertEquals(Password.HIDDEN, config.sslTruststorePassword.toString)
+
+    assertEquals("key_password", config.sslKeyPassword.value)
+    assertEquals("keystore_password", config.sslKeystorePassword.value)
+    assertEquals("truststore_password", config.sslTruststorePassword.value)
+  }
+
   def prepareDefaultConfig(): String = {
     prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
   }