You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2024/01/30 18:08:57 UTC
(kafka) branch trunk updated: KAFKA-15853: Move PasswordEncoder to server-common (#15246)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3e9ef708536 KAFKA-15853: Move PasswordEncoder to server-common (#15246)
3e9ef708536 is described below
commit 3e9ef708536612fdfd581623a74e6366c5aea0f6
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Tue Jan 30 19:08:50 2024 +0100
KAFKA-15853: Move PasswordEncoder to server-common (#15246)
Reviewers: Luke Chen <sh...@gmail.com>, Omnia Ibrahim <o....@gmail.com>
---
checkstyle/import-control-server-common.xml | 8 +
checkstyle/import-control-server.xml | 1 +
.../src/main/scala/kafka/admin/ConfigCommand.scala | 25 +--
.../main/scala/kafka/server/ControllerServer.scala | 5 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 5 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 22 ++-
core/src/main/scala/kafka/utils/CoreUtils.scala | 19 +-
.../main/scala/kafka/utils/PasswordEncoder.scala | 202 ---------------------
.../scala/kafka/utils/VerifiableProperties.scala | 3 +-
.../main/scala/kafka/zk/ZkMigrationClient.scala | 3 +-
.../zk/migration/ZkConfigMigrationClient.scala | 4 +-
.../server/DynamicBrokerReconfigurationTest.scala | 1 +
.../kafka/zk/ZkMigrationFailoverTest.scala | 5 +-
.../kafka/zk/ZkMigrationIntegrationTest.scala | 7 +-
.../scala/unit/kafka/utils/CoreUtilsTest.scala | 37 +---
.../unit/kafka/utils/PasswordEncoderTest.scala | 123 -------------
.../zk/migration/ZkMigrationTestHarness.scala | 2 +-
.../apache/kafka/security/CipherParamsEncoder.java | 29 +++
.../kafka/security/EncryptingPasswordEncoder.java | 154 ++++++++++++++++
.../apache/kafka/security/GcmParamsEncoder.java | 47 +++++
.../org/apache/kafka/security/IvParamsEncoder.java | 41 +++++
.../org/apache/kafka/security/PasswordEncoder.java | 69 +++++++
.../kafka/security/PasswordEncoderConfigs.java | 30 +++
.../java/org/apache/kafka/server/util/Csv.java | 41 +++++
.../apache/kafka/security/PasswordEncoderTest.java | 124 +++++++++++++
.../java/org/apache/kafka/server/util/CsvTest.java | 63 +++++++
.../org/apache/kafka/server/config/Defaults.java | 7 +-
27 files changed, 660 insertions(+), 417 deletions(-)
diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
index 43406958711..2c5c652e979 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -58,6 +58,14 @@
<allow pkg="org.apache.kafka.common.protocol" />
</subpackage>
+ <subpackage name="security">
+ <allow pkg="org.apache.kafka.common.config" />
+ <allow pkg="org.apache.kafka.common.config.types" />
+ <allow pkg="org.apache.kafka.server.util" />
+ <allow pkg="javax.crypto" />
+ <allow pkg="javax.crypto.spec" />
+ </subpackage>
+
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
<allow pkg="joptsimple" />
diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml
index 2f6a7c4c058..c395a5f2214 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -57,6 +57,7 @@
<!-- utilities and reusable classes from server-common -->
<allow pkg="org.apache.kafka.queue" />
+ <allow pkg="org.apache.kafka.security" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 3f75b7d7ceb..0d82d23d7ed 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -21,10 +21,10 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import joptsimple._
-import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
-import kafka.utils.{Exit, Logging, PasswordEncoder}
+import kafka.server.DynamicConfig.QuotaConfigs
import kafka.utils.Implicits._
+import kafka.utils.{Exit, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@@ -35,7 +35,8 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
-import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, Defaults}
+import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.client.ZKClientConfig
@@ -211,19 +212,19 @@ object ConfigCommand extends Logging {
}
private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = {
- encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
- val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
+ encoderConfigs.get(PasswordEncoderConfigs.SECRET)
+ val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET,
throw new IllegalArgumentException("Password encoder secret not specified"))
PasswordEncoder.encrypting(new Password(encoderSecret),
- None,
- encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderCipherAlgorithmProp, Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM),
- encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_KEY_LENGTH),
- encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_ITERATIONS))
+ null,
+ encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM),
+ encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH),
+ encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS))
}
/**
* Pre-process broker configs provided to convert them to persistent format.
- * Password configs are encrypted using the secret `KafkaConfig.PasswordEncoderSecretProp`.
+ * Password configs are encrypted using the secret `PasswordEncoderConfigs.SECRET`.
* The secret is removed from `configsToBeAdded` and will not be persisted in ZooKeeper.
*/
private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean): Unit = {
@@ -238,8 +239,8 @@ object ConfigCommand extends Logging {
DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
if (passwordConfigs.nonEmpty) {
- require(passwordEncoderConfigs.containsKey(KafkaConfig.PasswordEncoderSecretProp),
- s"${KafkaConfig.PasswordEncoderSecretProp} must be specified to update $passwordConfigs." +
+ require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.SECRET),
+ s"${PasswordEncoderConfigs.SECRET} must be specified to update $passwordConfigs." +
" Other password encoder configs like cipher algorithm and iterations may also be specified" +
" to override the default encoding parameters. Password encoder configs will not be persisted" +
" in ZooKeeper."
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index f74bb3de7a4..8f14d814b63 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -27,7 +27,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
-import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
+import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
@@ -44,6 +44,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
import org.apache.kafka.metadata.publisher.FeaturesPublisher
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -285,7 +286,7 @@ class ControllerServer(
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
config.passwordEncoderIterations)
- case None => PasswordEncoder.noop()
+ case None => PasswordEncoder.NOOP
}
val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 7f607d49a4f..74880971bc3 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,7 +25,7 @@ import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
-import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
+import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
@@ -35,6 +35,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
+import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -223,7 +224,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
- Some(PasswordEncoder.noop())
+ Some(PasswordEncoder.NOOP)
}
private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c8910a5ccf8..a9cda78dd0f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -40,15 +40,17 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms}
-import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
-import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
+import org.apache.kafka.server.util.Csv
+import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.zookeeper.client.ZKClientConfig
import scala.annotation.nowarn
@@ -435,12 +437,12 @@ object KafkaConfig {
val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
/** ********* Password encryption configuration for dynamic configs *********/
- val PasswordEncoderSecretProp = "password.encoder.secret"
- val PasswordEncoderOldSecretProp = "password.encoder.old.secret"
- val PasswordEncoderKeyFactoryAlgorithmProp = "password.encoder.keyfactory.algorithm"
- val PasswordEncoderCipherAlgorithmProp = "password.encoder.cipher.algorithm"
- val PasswordEncoderKeyLengthProp = "password.encoder.key.length"
- val PasswordEncoderIterationsProp = "password.encoder.iterations"
+ val PasswordEncoderSecretProp = PasswordEncoderConfigs.SECRET
+ val PasswordEncoderOldSecretProp = PasswordEncoderConfigs.OLD_SECRET
+ val PasswordEncoderKeyFactoryAlgorithmProp = PasswordEncoderConfigs.KEYFACTORY_ALGORITHM
+ val PasswordEncoderCipherAlgorithmProp = PasswordEncoderConfigs.CIPHER_ALGORITHM
+ val PasswordEncoderKeyLengthProp = PasswordEncoderConfigs.KEY_LENGTH
+ val PasswordEncoderIterationsProp = PasswordEncoderConfigs.ITERATIONS
/** Internal Configurations **/
val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
@@ -1873,7 +1875,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp))
def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp))
def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp)
- def passwordEncoderKeyFactoryAlgorithm = Option(getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp))
+ def passwordEncoderKeyFactoryAlgorithm = getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp)
def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp)
def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)
@@ -1935,7 +1937,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
private def getMap(propName: String, propValue: String): Map[String, String] = {
try {
- CoreUtils.parseCsvMap(propValue)
+ Csv.parseCsvMap(propValue).asScala
} catch {
case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(propName, e.getMessage))
}
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 88e9f8aa2a8..6af1f3594e2 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -26,7 +26,7 @@ import com.typesafe.scalalogging.Logger
import javax.management._
import scala.collection._
-import scala.collection.{Seq, mutable}
+import scala.collection.Seq
import kafka.cluster.EndPoint
import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.network.ListenerName
@@ -109,23 +109,6 @@ object CoreUtils {
}
}
- /**
- * This method gets comma separated values which contains key,value pairs and returns a map of
- * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
- * Also supports strings with multiple ":" such as IpV6 addresses, taking the last occurrence
- * of the ":" in the pair as the split, eg a:b:c:val1, d:e:f:val2 => a:b:c -> val1, d:e:f -> val2
- */
- def parseCsvMap(str: String): Map[String, String] = {
- val map = new mutable.HashMap[String, String]
- if ("".equals(str))
- return map
- val keyVals = str.split("\\s*,\\s*").map(s => {
- val lio = s.lastIndexOf(":")
- (s.substring(0,lio).trim, s.substring(lio + 1).trim)
- })
- keyVals.toMap
- }
-
/**
* Parse a comma separated string into a sequence of strings.
* Whitespace surrounding the comma will be removed.
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
deleted file mode 100644
index d4737be08ce..00000000000
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.utils
-
-import java.nio.charset.StandardCharsets
-import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom}
-import java.security.spec.AlgorithmParameterSpec
-import java.util.Base64
-
-import javax.crypto.{Cipher, SecretKeyFactory}
-import javax.crypto.spec._
-import kafka.utils.PasswordEncoder._
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.config.types.Password
-
-import scala.collection.Map
-
-object PasswordEncoder {
- val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm"
- val CipherAlgorithmProp = "cipherAlgorithm"
- val InitializationVectorProp = "initializationVector"
- val KeyLengthProp = "keyLength"
- val SaltProp = "salt"
- val IterationsProp = "iterations"
- val EncryptedPasswordProp = "encryptedPassword"
- val PasswordLengthProp = "passwordLength"
-
- def encrypting(secret: Password,
- keyFactoryAlgorithm: Option[String],
- cipherAlgorithm: String,
- keyLength: Int,
- iterations: Int): EncryptingPasswordEncoder = {
- new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations)
- }
-
- def noop(): NoOpPasswordEncoder = {
- new NoOpPasswordEncoder()
- }
-}
-
-trait PasswordEncoder {
- def encode(password: Password): String
- def decode(encodedPassword: String): Password
-
- private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
-}
-
-/**
- * A password encoder that does not modify the given password. This is used in KRaft mode only.
- */
-class NoOpPasswordEncoder extends PasswordEncoder {
- override def encode(password: Password): String = password.value()
- override def decode(encodedPassword: String): Password = new Password(encodedPassword)
-}
-
-/**
- * Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map
- * containing the encoded password in base64 and along with the properties used for encryption.
- *
- * @param secret The secret used for encoding and decoding
- * @param keyFactoryAlgorithm Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is
- * used if available, PBKDF2WithHmacSHA1 otherwise.
- * @param cipherAlgorithm Cipher algorithm used for encoding.
- * @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
- * @param iterations Iteration count used for encoding.
- *
- * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
- * The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
- *
- */
-class EncryptingPasswordEncoder(
- secret: Password,
- keyFactoryAlgorithm: Option[String],
- cipherAlgorithm: String,
- keyLength: Int,
- iterations: Int
-) extends PasswordEncoder with Logging {
-
- private val secureRandom = new SecureRandom
- private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
-
- override def encode(password: Password): String = {
- val salt = new Array[Byte](256)
- secureRandom.nextBytes(salt)
- val cipher = Cipher.getInstance(cipherAlgorithm)
- val keyFactory = secretKeyFactory(keyFactoryAlgorithm)
- val keySpec = secretKeySpec(keyFactory, cipherAlgorithm, keyLength, salt, iterations)
- cipher.init(Cipher.ENCRYPT_MODE, keySpec)
- val encryptedPassword = cipher.doFinal(password.value.getBytes(StandardCharsets.UTF_8))
- val encryptedMap = Map(
- KeyFactoryAlgorithmProp -> keyFactory.getAlgorithm,
- CipherAlgorithmProp -> cipherAlgorithm,
- KeyLengthProp -> keyLength,
- SaltProp -> base64Encode(salt),
- IterationsProp -> iterations.toString,
- EncryptedPasswordProp -> base64Encode(encryptedPassword),
- PasswordLengthProp -> password.value.length
- ) ++ cipherParamsEncoder.toMap(cipher.getParameters)
- encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
- }
-
- override def decode(encodedPassword: String): Password = {
- val params = CoreUtils.parseCsvMap(encodedPassword)
- val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
- val cipherAlg = params(CipherAlgorithmProp)
- val keyLength = params(KeyLengthProp).toInt
- val salt = base64Decode(params(SaltProp))
- val iterations = params(IterationsProp).toInt
- val encryptedPassword = base64Decode(params(EncryptedPasswordProp))
- val passwordLengthProp = params(PasswordLengthProp).toInt
- val cipher = Cipher.getInstance(cipherAlg)
- val keyFactory = secretKeyFactory(Some(keyFactoryAlg))
- val keySpec = secretKeySpec(keyFactory, cipherAlg, keyLength, salt, iterations)
- cipher.init(Cipher.DECRYPT_MODE, keySpec, cipherParamsEncoder.toParameterSpec(params))
- val password = try {
- val decrypted = cipher.doFinal(encryptedPassword)
- new String(decrypted, StandardCharsets.UTF_8)
- } catch {
- case e: Exception => throw new ConfigException("Password could not be decoded", e)
- }
- if (password.length != passwordLengthProp) // Sanity check
- throw new ConfigException("Password could not be decoded, sanity check of length failed")
- new Password(password)
- }
-
- private def secretKeyFactory(keyFactoryAlg: Option[String]): SecretKeyFactory = {
- keyFactoryAlg match {
- case Some(algorithm) => SecretKeyFactory.getInstance(algorithm)
- case None =>
- try {
- SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
- } catch {
- case _: NoSuchAlgorithmException => SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1")
- }
- }
- }
-
- private def secretKeySpec(keyFactory: SecretKeyFactory,
- cipherAlg: String,
- keyLength: Int,
- salt: Array[Byte], iterations: Int): SecretKeySpec = {
- val keySpec = new PBEKeySpec(secret.value.toCharArray, salt, iterations, keyLength)
- val algorithm = if (cipherAlg.indexOf('/') > 0) cipherAlg.substring(0, cipherAlg.indexOf('/')) else cipherAlg
- new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded, algorithm)
- }
-
- private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes)
-
- private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
- val aesPattern = "AES/(.*)/.*".r
- cipherAlgorithm match {
- case aesPattern("GCM") => new GcmParamsEncoder
- case _ => new IvParamsEncoder
- }
- }
-
- private trait CipherParamsEncoder {
- def toMap(cipher: AlgorithmParameters): Map[String, String]
- def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec
- }
-
- private class IvParamsEncoder extends CipherParamsEncoder {
- def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
- if (cipherParams != null) {
- val ivSpec = cipherParams.getParameterSpec(classOf[IvParameterSpec])
- Map(InitializationVectorProp -> base64Encode(ivSpec.getIV))
- } else
- throw new IllegalStateException("Could not determine initialization vector for cipher")
- }
- def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
- new IvParameterSpec(base64Decode(paramMap(InitializationVectorProp)))
- }
- }
-
- private class GcmParamsEncoder extends CipherParamsEncoder {
- def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
- if (cipherParams != null) {
- val spec = cipherParams.getParameterSpec(classOf[GCMParameterSpec])
- Map(InitializationVectorProp -> base64Encode(spec.getIV),
- "authenticationTagLength" -> spec.getTLen.toString)
- } else
- throw new IllegalStateException("Could not determine initialization vector for cipher")
- }
- def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
- new GCMParameterSpec(paramMap("authenticationTagLength").toInt, base64Decode(paramMap(InitializationVectorProp)))
- }
- }
-}
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 54490e3dd65..33ba418fc8c 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -22,6 +22,7 @@ import java.util.Collections
import scala.collection._
import scala.jdk.CollectionConverters._
import kafka.utils.Implicits._
+import org.apache.kafka.server.util.Csv
object VerifiableProperties {
def apply(map: java.util.Map[String, AnyRef]): VerifiableProperties = {
@@ -186,7 +187,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
*/
def getMap(name: String, valid: String => Boolean = _ => true): Map[String, String] = {
try {
- val m = CoreUtils.parseCsvMap(getString(name, ""))
+ val m = Csv.parseCsvMap(getString(name, "")).asScala
m.foreach {
case(key, value) =>
if(!valid(value))
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index a11a84c017b..5e06cd5e918 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -16,7 +16,7 @@
*/
package kafka.zk
-import kafka.utils.{Logging, PasswordEncoder}
+import kafka.utils.Logging
import kafka.zk.ZkMigrationClient.wrapZkException
import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, ZkDelegationTokenMigrationClient, ZkTopicMigrationClient}
import kafka.zookeeper._
@@ -33,6 +33,7 @@ import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor, TopicVisitorInterest}
import org.apache.kafka.metadata.migration._
+import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
index b0986445e30..0f37d142bcb 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
@@ -18,7 +18,7 @@
package kafka.zk.migration
import kafka.server.{DynamicBrokerConfig, DynamicConfig, ZkAdminManager}
-import kafka.utils.{Logging, PasswordEncoder}
+import kafka.utils.Logging
import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
import kafka.zk._
import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
@@ -31,10 +31,12 @@ import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
import org.apache.kafka.metadata.migration.{ConfigMigrationClient, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.{CreateMode, KeeperException}
+
import java.{lang, util}
import java.util.Properties
import java.util.function.{BiConsumer, Consumer}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 5735cb1cf9b..808fe9da419 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -59,6 +59,7 @@ import org.apache.kafka.common.requests.MetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
+import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala
index 20dd5d91a83..6efb3f4e3e9 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.zk
-import kafka.utils.{Logging, PasswordEncoder, TestUtils}
+import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metadata.{FeatureLevelRecord, TopicRecord}
import org.apache.kafka.common.utils.{Time, Utils}
@@ -28,6 +28,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.migration._
import org.apache.kafka.raft.{LeaderAndEpoch, OffsetAndEpoch}
+import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.zookeeper.client.ZKClientConfig
@@ -144,7 +145,7 @@ class ZkMigrationFailoverTest extends Logging {
}
// Safe to reuse these since they don't keep any state
- val zkMigrationClient = ZkMigrationClient(zkClient, PasswordEncoder.noop())
+ val zkMigrationClient = ZkMigrationClient(zkClient, PasswordEncoder.NOOP)
val (driver1, faultHandler1) = buildMigrationDriver(3000, zkMigrationClient)
val (driver2, faultHandler2) = buildMigrationDriver(3001, zkMigrationClient)
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index b1437f8c368..e52bc4e567a 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -23,7 +23,7 @@ import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate,
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
-import kafka.utils.{PasswordEncoder, TestUtils}
+import kafka.utils.TestUtils
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.{TopicPartition, Uuid}
@@ -45,6 +45,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.config.ConfigType
@@ -139,7 +140,7 @@ class ZkMigrationIntegrationTest {
val underlying = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying()
val zkClient = underlying.zkClient
- val migrationClient = ZkMigrationClient(zkClient, PasswordEncoder.noop())
+ val migrationClient = ZkMigrationClient(zkClient, PasswordEncoder.NOOP)
val verifier = new MetadataDeltaVerifier()
migrationClient.readAllMetadata(batch => verifier.accept(batch), _ => { })
verifier.verify { image =>
@@ -246,7 +247,7 @@ class ZkMigrationIntegrationTest {
kafkaConfig.passwordEncoderCipherAlgorithm,
kafkaConfig.passwordEncoderKeyLength,
kafkaConfig.passwordEncoderIterations)
- case None => PasswordEncoder.noop()
+ case None => PasswordEncoder.NOOP
}
val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index ee3401062c8..f1c0dd0371f 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -36,7 +36,7 @@ import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorServic
class CoreUtilsTest extends Logging {
- val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
+ val clusterIdPattern: Pattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
@Test
def testSwallow(): Unit = {
@@ -73,41 +73,6 @@ class CoreUtilsTest extends Logging {
assertTrue(emptyStringList.equals(emptyList))
}
- @Test
- def testCsvMap(): Unit = {
- val emptyString: String = ""
- val emptyMap = CoreUtils.parseCsvMap(emptyString)
- val emptyStringMap = Map.empty[String, String]
- assertTrue(emptyMap != null)
- assertTrue(emptyStringMap.equals(emptyStringMap))
-
- val kvPairsIpV6: String = "a:b:c:v,a:b:c:v"
- val ipv6Map = CoreUtils.parseCsvMap(kvPairsIpV6)
- for (m <- ipv6Map) {
- assertTrue(m._1.equals("a:b:c"))
- assertTrue(m._2.equals("v"))
- }
-
- val singleEntry:String = "key:value"
- val singleMap = CoreUtils.parseCsvMap(singleEntry)
- val value = singleMap.getOrElse("key", 0)
- assertTrue(value.equals("value"))
-
- val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow"
- val ipv4Map = CoreUtils.parseCsvMap(kvPairsIpV4)
- for (m <- ipv4Map) {
- assertTrue(m._1.equals("192.168.2.1/30"))
- assertTrue(m._2.equals("allow"))
- }
-
- val kvPairsSpaces: String = "key:value , key: value"
- val spaceMap = CoreUtils.parseCsvMap(kvPairsSpaces)
- for (m <- spaceMap) {
- assertTrue(m._1.equals("key"))
- assertTrue(m._2.equals("value"))
- }
- }
-
@Test
def testInLock(): Unit = {
val lock = new ReentrantLock()
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
deleted file mode 100755
index 1073ad0082b..00000000000
--- a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-
-import javax.crypto.SecretKeyFactory
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.server.config.Defaults
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-class PasswordEncoderTest {
-
- @Test
- def testEncodeDecode(): Unit = {
- val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
- None,
- Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM,
- Defaults.PASSWORD_ENCODER_KEY_LENGTH,
- Defaults.PASSWORD_ENCODER_ITERATIONS)
- val password = "test-password"
- val encoded = encoder.encode(new Password(password))
- val encodedMap = CoreUtils.parseCsvMap(encoded)
- assertEquals("4096", encodedMap(PasswordEncoder.IterationsProp))
- assertEquals("128", encodedMap(PasswordEncoder.KeyLengthProp))
- val defaultKeyFactoryAlgorithm = try {
- SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
- "PBKDF2WithHmacSHA512"
- } catch {
- case _: Exception => "PBKDF2WithHmacSHA1"
- }
- assertEquals(defaultKeyFactoryAlgorithm, encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
- assertEquals("AES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
-
- verifyEncodedPassword(encoder, password, encoded)
- }
-
- @Test
- def testEncoderConfigChange(): Unit = {
- val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
- Some("PBKDF2WithHmacSHA1"),
- "DES/CBC/PKCS5Padding",
- 64,
- 1024)
- val password = "test-password"
- val encoded = encoder.encode(new Password(password))
- val encodedMap = CoreUtils.parseCsvMap(encoded)
- assertEquals("1024", encodedMap(PasswordEncoder.IterationsProp))
- assertEquals("64", encodedMap(PasswordEncoder.KeyLengthProp))
- assertEquals("PBKDF2WithHmacSHA1", encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
- assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
-
- // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
- val decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
- Some("PBKDF2WithHmacSHA1"),
- "AES/CBC/PKCS5Padding",
- 128,
- 2048)
- assertEquals(password, decoder.decode(encoded).value)
-
- // Test that decoding fails if secret is altered
- val decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
- Some("PBKDF2WithHmacSHA1"),
- "AES/CBC/PKCS5Padding",
- 128,
- 1024)
- try {
- decoder2.decode(encoded)
- } catch {
- case e: ConfigException => // expected exception
- }
- }
-
- @Test
- def testEncodeDecodeAlgorithms(): Unit = {
-
- def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
- val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
- keyFactoryAlg,
- cipherAlg,
- keyLength,
- Defaults.PASSWORD_ENCODER_ITERATIONS)
- val password = "test-password"
- val encoded = encoder.encode(new Password(password))
- verifyEncodedPassword(encoder, password, encoded)
- }
-
- verifyEncodeDecode(keyFactoryAlg = None, "DES/CBC/PKCS5Padding", keyLength = 64)
- verifyEncodeDecode(keyFactoryAlg = None, "DESede/CBC/PKCS5Padding", keyLength = 192)
- verifyEncodeDecode(keyFactoryAlg = None, "AES/CBC/PKCS5Padding", keyLength = 128)
- verifyEncodeDecode(keyFactoryAlg = None, "AES/CFB/PKCS5Padding", keyLength = 128)
- verifyEncodeDecode(keyFactoryAlg = None, "AES/OFB/PKCS5Padding", keyLength = 128)
- verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA1"), Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, keyLength = 128)
- verifyEncodeDecode(keyFactoryAlg = None, "AES/GCM/NoPadding", keyLength = 128)
- verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA256"), Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, keyLength = 128)
- verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA512"), Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, keyLength = 128)
- }
-
- private def verifyEncodedPassword(encoder: PasswordEncoder, password: String, encoded: String): Unit = {
- val encodedMap = CoreUtils.parseCsvMap(encoded)
- assertEquals(password.length.toString, encodedMap(PasswordEncoder.PasswordLengthProp))
- assertNotNull(encoder.base64Decode(encodedMap("salt")), "Invalid salt")
- assertNotNull(encoder.base64Decode(encodedMap(PasswordEncoder.InitializationVectorProp)), "Invalid encoding parameters")
- assertNotNull(encoder.base64Decode(encodedMap(PasswordEncoder.EncryptedPasswordProp)), "Invalid encoded password")
- assertEquals(password, encoder.decode(encoded).value)
- }
-}
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
index d04798542ea..569cb5764b4 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
@@ -17,10 +17,10 @@
package kafka.zk.migration
import kafka.server.{KafkaConfig, QuorumTestHarness}
-import kafka.utils.PasswordEncoder
import kafka.zk.ZkMigrationClient
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.security.PasswordEncoder
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import java.util.Properties
diff --git a/server-common/src/main/java/org/apache/kafka/security/CipherParamsEncoder.java b/server-common/src/main/java/org/apache/kafka/security/CipherParamsEncoder.java
new file mode 100644
index 00000000000..d829e9885fa
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/CipherParamsEncoder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import java.security.AlgorithmParameters;
+import java.security.spec.AlgorithmParameterSpec;
+import java.security.spec.InvalidParameterSpecException;
+import java.util.Map;
+
+public interface CipherParamsEncoder {
+
+ Map<String, String> toMap(AlgorithmParameters cipher) throws InvalidParameterSpecException;
+
+ AlgorithmParameterSpec toParameterSpec(Map<String, String> paramMap);
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/EncryptingPasswordEncoder.java b/server-common/src/main/java/org/apache/kafka/security/EncryptingPasswordEncoder.java
new file mode 100644
index 00000000000..747505645f2
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/EncryptingPasswordEncoder.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.server.util.Csv;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.spec.InvalidKeySpecException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map
+ * containing the encoded password in base64 and along with the properties used for encryption.
+ */
+public class EncryptingPasswordEncoder implements PasswordEncoder {
+
+ private final SecureRandom secureRandom = new SecureRandom();
+
+ private final Password secret;
+ private final String keyFactoryAlgorithm;
+ private final String cipherAlgorithm;
+ private final int keyLength;
+ private final int iterations;
+ private final CipherParamsEncoder cipherParamsEncoder;
+
+
+ /**
+ * @param secret The secret used for encoding and decoding
+ * @param keyFactoryAlgorithm Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is
+ * used if available, PBKDF2WithHmacSHA1 otherwise.
+ * @param cipherAlgorithm Cipher algorithm used for encoding.
+ * @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
+ * @param iterations Iteration count used for encoding.
+ * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
+ * The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
+ */
+ public EncryptingPasswordEncoder(
+ Password secret,
+ String keyFactoryAlgorithm,
+ String cipherAlgorithm,
+ int keyLength,
+ int iterations) {
+ this.secret = secret;
+ this.keyFactoryAlgorithm = keyFactoryAlgorithm;
+ this.cipherAlgorithm = cipherAlgorithm;
+ this.keyLength = keyLength;
+ this.iterations = iterations;
+ this.cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm);
+ }
+
+ @Override
+ public String encode(Password password) throws GeneralSecurityException {
+ byte[] salt = new byte[256];
+ secureRandom.nextBytes(salt);
+ Cipher cipher = Cipher.getInstance(cipherAlgorithm);
+ SecretKeyFactory keyFactory = secretKeyFactory(keyFactoryAlgorithm);
+ SecretKeySpec keySpec = secretKeySpec(keyFactory, cipherAlgorithm, keyLength, salt, iterations);
+ cipher.init(Cipher.ENCRYPT_MODE, keySpec);
+ byte[] encryptedPassword = cipher.doFinal(password.value().getBytes(StandardCharsets.UTF_8));
+ Map<String, String> encryptedMap = new HashMap<>();
+ encryptedMap.put(PasswordEncoder.KEY_FACTORY_ALGORITHM, keyFactory.getAlgorithm());
+ encryptedMap.put(PasswordEncoder.CIPHER_ALGORITHM, cipherAlgorithm);
+ encryptedMap.put(PasswordEncoder.KEY_LENGTH, String.valueOf(keyLength));
+ encryptedMap.put(PasswordEncoder.SALT, PasswordEncoder.base64Encode(salt));
+ encryptedMap.put(PasswordEncoder.ITERATIONS, String.valueOf(iterations));
+ encryptedMap.put(PasswordEncoder.ENCRYPTED_PASSWORD, PasswordEncoder.base64Encode(encryptedPassword));
+ encryptedMap.put(PasswordEncoder.PASSWORD_LENGTH, String.valueOf(password.value().length()));
+ encryptedMap.putAll(cipherParamsEncoder.toMap(cipher.getParameters()));
+
+ return encryptedMap.entrySet().stream()
+ .map(entry -> entry.getKey() + ":" + entry.getValue())
+ .collect(Collectors.joining(","));
+ }
+
+ @Override
+ public Password decode(String encodedPassword) throws GeneralSecurityException {
+ Map<String, String> params = Csv.parseCsvMap(encodedPassword);
+ String keyFactoryAlg = params.get(PasswordEncoder.KEY_FACTORY_ALGORITHM);
+ String cipherAlg = params.get(PasswordEncoder.CIPHER_ALGORITHM);
+ int keyLength = Integer.parseInt(params.get(PasswordEncoder.KEY_LENGTH));
+ byte[] salt = PasswordEncoder.base64Decode(params.get(PasswordEncoder.SALT));
+ int iterations = Integer.parseInt(params.get(PasswordEncoder.ITERATIONS));
+ byte[] encryptedPassword = PasswordEncoder.base64Decode(params.get(PasswordEncoder.ENCRYPTED_PASSWORD));
+ int passwordLengthProp = Integer.parseInt(params.get(PasswordEncoder.PASSWORD_LENGTH));
+ Cipher cipher = Cipher.getInstance(cipherAlg);
+ SecretKeyFactory keyFactory = secretKeyFactory(keyFactoryAlg);
+ SecretKeySpec keySpec = secretKeySpec(keyFactory, cipherAlg, keyLength, salt, iterations);
+ cipher.init(Cipher.DECRYPT_MODE, keySpec, cipherParamsEncoder.toParameterSpec(params));
+ try {
+ byte[] decrypted = cipher.doFinal(encryptedPassword);
+ String password = new String(decrypted, StandardCharsets.UTF_8);
+ if (password.length() != passwordLengthProp) // Sanity check
+ throw new ConfigException("Password could not be decoded, sanity check of length failed");
+ return new Password(password);
+ } catch (Exception e) {
+ throw new ConfigException("Password could not be decoded", e);
+ }
+ }
+
+ private SecretKeyFactory secretKeyFactory(String keyFactoryAlg) throws NoSuchAlgorithmException {
+ if (keyFactoryAlg != null) {
+ return SecretKeyFactory.getInstance(keyFactoryAlg);
+ } else {
+ try {
+ return SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512");
+ } catch (NoSuchAlgorithmException nsae) {
+ return SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1");
+ }
+ }
+ }
+
+ private SecretKeySpec secretKeySpec(SecretKeyFactory keyFactory,
+ String cipherAlg,
+ int keyLength,
+ byte[] salt,
+ int iterations) throws InvalidKeySpecException {
+ PBEKeySpec keySpec = new PBEKeySpec(secret.value().toCharArray(), salt, iterations, keyLength);
+ String algorithm = (cipherAlg.indexOf('/') > 0) ? cipherAlg.substring(0, cipherAlg.indexOf('/')) : cipherAlg;
+ return new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded(), algorithm);
+ }
+
+ private CipherParamsEncoder cipherParamsInstance(String cipherAlgorithm) {
+ if (cipherAlgorithm.startsWith("AES/GCM/")) {
+ return new GcmParamsEncoder();
+ } else {
+ return new IvParamsEncoder();
+ }
+ }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/GcmParamsEncoder.java b/server-common/src/main/java/org/apache/kafka/security/GcmParamsEncoder.java
new file mode 100644
index 00000000000..afe3387b2e7
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/GcmParamsEncoder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import javax.crypto.spec.GCMParameterSpec;
+import java.security.AlgorithmParameters;
+import java.security.spec.AlgorithmParameterSpec;
+import java.security.spec.InvalidParameterSpecException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GcmParamsEncoder implements CipherParamsEncoder {
+
+ private static final String AUTHENTICATION_TAG_LENGTH = "authenticationTagLength";
+
+ @Override
+ public Map<String, String> toMap(AlgorithmParameters cipherParams) throws InvalidParameterSpecException {
+ if (cipherParams != null) {
+ GCMParameterSpec spec = cipherParams.getParameterSpec(GCMParameterSpec.class);
+ Map<String, String> map = new HashMap<>();
+ map.put(PasswordEncoder.INITIALIZATION_VECTOR, PasswordEncoder.base64Encode(spec.getIV()));
+ map.put(AUTHENTICATION_TAG_LENGTH, String.valueOf(spec.getTLen()));
+ return map;
+ } else
+ throw new IllegalStateException("Could not determine initialization vector for cipher");
+ }
+
+ @Override
+ public AlgorithmParameterSpec toParameterSpec(Map<String, String> paramMap) {
+ return new GCMParameterSpec(Integer.parseInt(paramMap.get(AUTHENTICATION_TAG_LENGTH)),
+ PasswordEncoder.base64Decode(paramMap.get(PasswordEncoder.INITIALIZATION_VECTOR)));
+ }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/IvParamsEncoder.java b/server-common/src/main/java/org/apache/kafka/security/IvParamsEncoder.java
new file mode 100644
index 00000000000..c156d502bd7
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/IvParamsEncoder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import javax.crypto.spec.IvParameterSpec;
+import java.security.AlgorithmParameters;
+import java.security.spec.AlgorithmParameterSpec;
+import java.security.spec.InvalidParameterSpecException;
+import java.util.Collections;
+import java.util.Map;
+
+public class IvParamsEncoder implements CipherParamsEncoder {
+
+ @Override
+ public Map<String, String> toMap(AlgorithmParameters cipherParams) throws InvalidParameterSpecException {
+ if (cipherParams != null) {
+ IvParameterSpec ivSpec = cipherParams.getParameterSpec(IvParameterSpec.class);
+ return Collections.singletonMap(PasswordEncoder.INITIALIZATION_VECTOR, PasswordEncoder.base64Encode(ivSpec.getIV()));
+ } else
+ throw new IllegalStateException("Could not determine initialization vector for cipher");
+ }
+
+ @Override
+ public AlgorithmParameterSpec toParameterSpec(Map<String, String> paramMap) {
+ return new IvParameterSpec(PasswordEncoder.base64Decode(paramMap.get(PasswordEncoder.INITIALIZATION_VECTOR)));
+ }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/PasswordEncoder.java b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoder.java
new file mode 100644
index 00000000000..7d7822823b1
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import org.apache.kafka.common.config.types.Password;
+
+import java.security.GeneralSecurityException;
+import java.util.Base64;
+
+public interface PasswordEncoder {
+
+ String KEY_FACTORY_ALGORITHM = "keyFactoryAlgorithm";
+ String CIPHER_ALGORITHM = "cipherAlgorithm";
+ String INITIALIZATION_VECTOR = "initializationVector";
+ String KEY_LENGTH = "keyLength";
+ String SALT = "salt";
+ String ITERATIONS = "iterations";
+ String ENCRYPTED_PASSWORD = "encryptedPassword";
+ String PASSWORD_LENGTH = "passwordLength";
+
+ /**
+ * A password encoder that does not modify the given password. This is used in KRaft mode only.
+ */
+ PasswordEncoder NOOP = new PasswordEncoder() {
+
+ @Override
+ public String encode(Password password) {
+ return password.value();
+ }
+
+ @Override
+ public Password decode(String encodedPassword) {
+ return new Password(encodedPassword);
+ }
+ };
+
+ static byte[] base64Decode(String encoded) {
+ return Base64.getDecoder().decode(encoded);
+ }
+
+ static String base64Encode(byte[] bytes) {
+ return Base64.getEncoder().encodeToString(bytes);
+ }
+
+ static EncryptingPasswordEncoder encrypting(Password secret,
+ String keyFactoryAlgorithm,
+ String cipherAlgorithm,
+ int keyLength,
+ int iterations) {
+ return new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations);
+ }
+
+ String encode(Password password) throws GeneralSecurityException;
+ Password decode(String encodedPassword) throws GeneralSecurityException;
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java
new file mode 100644
index 00000000000..e5c9bc13fe7
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+public class PasswordEncoderConfigs {
+
+ public static final String SECRET = "password.encoder.secret";
+ public static final String OLD_SECRET = "password.encoder.old.secret";
+ public static final String KEYFACTORY_ALGORITHM = "password.encoder.keyfactory.algorithm";
+ public static final String CIPHER_ALGORITHM = "password.encoder.cipher.algorithm";
+ public static final String DEFAULT_CIPHER_ALGORITHM = "AES/CBC/PKCS5Padding";
+ public static final String KEY_LENGTH = "password.encoder.key.length";
+ public static final int DEFAULT_KEY_LENGTH = 128;
+ public static final String ITERATIONS = "password.encoder.iterations";
+ public static final int DEFAULT_ITERATIONS = 4096;
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/Csv.java b/server-common/src/main/java/org/apache/kafka/server/util/Csv.java
new file mode 100644
index 00000000000..d09b45263c2
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/util/Csv.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Csv {
+
+ /**
+ * This method gets comma separated values which contains key,value pairs and returns a map of
+ * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
+ * Also supports strings with multiple ":" such as IpV6 addresses, taking the last occurrence
+ * of the ":" in the pair as the split, eg a:b:c:val1, d:e:f:val2 => a:b:c -> val1, d:e:f -> val2
+ */
+ public static Map<String, String> parseCsvMap(String str) {
+ Map<String, String> map = new HashMap<>();
+ if (str == null || "".equals(str))
+ return map;
+ String[] keyVals = str.split("\\s*,\\s*");
+ for (String s : keyVals) {
+ int lio = s.lastIndexOf(":");
+ map.put(s.substring(0, lio).trim(), s.substring(lio + 1).trim());
+ }
+ return map;
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java b/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java
new file mode 100644
index 00000000000..c61715b02cd
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import javax.crypto.SecretKeyFactory;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.server.util.Csv;
+import org.junit.jupiter.api.Test;
+
+import java.security.GeneralSecurityException;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PasswordEncoderTest {
+
+ @Test
+ public void testEncodeDecode() throws GeneralSecurityException {
+ PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
+ null,
+ PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM,
+ PasswordEncoderConfigs.DEFAULT_KEY_LENGTH,
+ PasswordEncoderConfigs.DEFAULT_ITERATIONS);
+ String password = "test-password";
+ String encoded = encoder.encode(new Password(password));
+ Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
+ assertEquals("4096", encodedMap.get(PasswordEncoder.ITERATIONS));
+ assertEquals("128", encodedMap.get(PasswordEncoder.KEY_LENGTH));
+ String defaultKeyFactoryAlgorithm;
+ try {
+ SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512");
+ defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA512";
+
+ } catch (Exception e) {
+ defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA1";
+ }
+ assertEquals(defaultKeyFactoryAlgorithm, encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
+ assertEquals("AES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
+ verifyEncodedPassword(encoder, password, encoded);
+ }
+
+ @Test
+ public void testEncoderConfigChange() throws GeneralSecurityException {
+ PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
+ "PBKDF2WithHmacSHA1",
+ "DES/CBC/PKCS5Padding",
+ 64,
+ 1024);
+ String password = "test-password";
+ String encoded = encoder.encode(new Password(password));
+ Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
+ assertEquals("1024", encodedMap.get(PasswordEncoder.ITERATIONS));
+ assertEquals("64", encodedMap.get(PasswordEncoder.KEY_LENGTH));
+ assertEquals("PBKDF2WithHmacSHA1", encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
+ assertEquals("DES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
+
+ // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
+ PasswordEncoder decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
+ "PBKDF2WithHmacSHA1",
+ "AES/CBC/PKCS5Padding",
+ 128,
+ 2048);
+ assertEquals(password, decoder.decode(encoded).value());
+
+ // Test that decoding fails if secret is altered
+ PasswordEncoder decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
+ "PBKDF2WithHmacSHA1",
+ "AES/CBC/PKCS5Padding",
+ 128,
+ 1024);
+ assertThrows(ConfigException.class, () -> decoder2.decode(encoded));
+ }
+
+ @Test
+ public void testEncodeDecodeAlgorithms() throws GeneralSecurityException {
+ verifyEncodeDecode(null, "DES/CBC/PKCS5Padding", 64);
+ verifyEncodeDecode(null, "DESede/CBC/PKCS5Padding", 192);
+ verifyEncodeDecode(null, "AES/CBC/PKCS5Padding", 128);
+ verifyEncodeDecode(null, "AES/CFB/PKCS5Padding", 128);
+ verifyEncodeDecode(null, "AES/OFB/PKCS5Padding", 128);
+ verifyEncodeDecode("PBKDF2WithHmacSHA1", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128);
+ verifyEncodeDecode(null, "AES/GCM/NoPadding", 128);
+ verifyEncodeDecode("PBKDF2WithHmacSHA256", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128);
+ verifyEncodeDecode("PBKDF2WithHmacSHA512", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128);
+ }
+
+ private void verifyEncodeDecode(String keyFactoryAlg, String cipherAlg, int keyLength) throws GeneralSecurityException {
+ PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
+ keyFactoryAlg,
+ cipherAlg,
+ keyLength,
+ PasswordEncoderConfigs.DEFAULT_ITERATIONS);
+ String password = "test-password";
+ String encoded = encoder.encode(new Password(password));
+ verifyEncodedPassword(encoder, password, encoded);
+ }
+
+ private void verifyEncodedPassword(PasswordEncoder encoder, String password, String encoded) throws GeneralSecurityException {
+ Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
+ assertEquals(String.valueOf(password.length()), encodedMap.get(PasswordEncoder.PASSWORD_LENGTH));
+ assertNotNull(PasswordEncoder.base64Decode(encodedMap.get("salt")), "Invalid salt");
+ assertNotNull(PasswordEncoder.base64Decode(encodedMap.get(PasswordEncoder.INITIALIZATION_VECTOR)), "Invalid encoding parameters");
+ assertNotNull(PasswordEncoder.base64Decode(encodedMap.get(PasswordEncoder.ENCRYPTED_PASSWORD)), "Invalid encoded password");
+ assertEquals(password, encoder.decode(encoded).value());
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/util/CsvTest.java b/server-common/src/test/java/org/apache/kafka/server/util/CsvTest.java
new file mode 100644
index 00000000000..46d54b42c2b
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/util/CsvTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class CsvTest {
+
+ @Test
+ public void testCsvMap() {
+ String emptyString = "";
+ Map<String, String> emptyMap = Csv.parseCsvMap(emptyString);
+ Map<String, String> emptyStringMap = Collections.emptyMap();
+ assertNotNull(emptyMap);
+ assertEquals(emptyStringMap, emptyStringMap);
+
+ String kvPairsIpV6 = "a:b:c:v,a:b:c:v";
+ Map<String, String> ipv6Map = Csv.parseCsvMap(kvPairsIpV6);
+ for (Map.Entry<String, String> entry : ipv6Map.entrySet()) {
+ assertEquals("a:b:c", entry.getKey());
+ assertEquals("v", entry.getValue());
+ }
+
+ String singleEntry = "key:value";
+ Map<String, String> singleMap = Csv.parseCsvMap(singleEntry);
+ String value = singleMap.get("key");
+ assertEquals("value", value);
+
+ String kvPairsIpV4 = "192.168.2.1/30:allow, 192.168.2.1/30:allow";
+ Map<String, String> ipv4Map = Csv.parseCsvMap(kvPairsIpV4);
+ for (Map.Entry<String, String> entry : ipv4Map.entrySet()) {
+ assertEquals("192.168.2.1/30", entry.getKey());
+ assertEquals("allow", entry.getValue());
+ }
+
+ String kvPairsSpaces = "key:value , key: value";
+ Map<String, String> spaceMap = Csv.parseCsvMap(kvPairsSpaces);
+ for (Map.Entry<String, String> entry : spaceMap.entrySet()) {
+ assertEquals("key", entry.getKey());
+ assertEquals("value", entry.getValue());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index 8ad319b822f..1c502e1564b 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -33,6 +33,7 @@ import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.Arrays;
@@ -270,9 +271,9 @@ public class Defaults {
public static final long DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS = 1 * 60 * 60 * 1000L;
/** ********* Password Encryption Configuration for Dynamic Configs *********/
- public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM = "AES/CBC/PKCS5Padding";
- public static final int PASSWORD_ENCODER_KEY_LENGTH = 128;
- public static final int PASSWORD_ENCODER_ITERATIONS = 4096;
+ public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM = PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM;
+ public static final int PASSWORD_ENCODER_KEY_LENGTH = PasswordEncoderConfigs.DEFAULT_KEY_LENGTH;
+ public static final int PASSWORD_ENCODER_ITERATIONS = PasswordEncoderConfigs.DEFAULT_ITERATIONS;
/** ********* Raft Quorum Configuration *********/
public static final List<String> QUORUM_VOTERS = RaftConfig.DEFAULT_QUORUM_VOTERS;