You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2024/01/30 18:08:57 UTC

(kafka) branch trunk updated: KAFKA-15853: Move PasswordEncoder to server-common (#15246)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e9ef708536 KAFKA-15853: Move PasswordEncoder to server-common (#15246)
3e9ef708536 is described below

commit 3e9ef708536612fdfd581623a74e6366c5aea0f6
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Tue Jan 30 19:08:50 2024 +0100

    KAFKA-15853: Move PasswordEncoder to server-common (#15246)
    
    
    Reviewers: Luke Chen <sh...@gmail.com>, Omnia Ibrahim <o....@gmail.com>
---
 checkstyle/import-control-server-common.xml        |   8 +
 checkstyle/import-control-server.xml               |   1 +
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  25 +--
 .../main/scala/kafka/server/ControllerServer.scala |   5 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   5 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  22 ++-
 core/src/main/scala/kafka/utils/CoreUtils.scala    |  19 +-
 .../main/scala/kafka/utils/PasswordEncoder.scala   | 202 ---------------------
 .../scala/kafka/utils/VerifiableProperties.scala   |   3 +-
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |   3 +-
 .../zk/migration/ZkConfigMigrationClient.scala     |   4 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   1 +
 .../kafka/zk/ZkMigrationFailoverTest.scala         |   5 +-
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |   7 +-
 .../scala/unit/kafka/utils/CoreUtilsTest.scala     |  37 +---
 .../unit/kafka/utils/PasswordEncoderTest.scala     | 123 -------------
 .../zk/migration/ZkMigrationTestHarness.scala      |   2 +-
 .../apache/kafka/security/CipherParamsEncoder.java |  29 +++
 .../kafka/security/EncryptingPasswordEncoder.java  | 154 ++++++++++++++++
 .../apache/kafka/security/GcmParamsEncoder.java    |  47 +++++
 .../org/apache/kafka/security/IvParamsEncoder.java |  41 +++++
 .../org/apache/kafka/security/PasswordEncoder.java |  69 +++++++
 .../kafka/security/PasswordEncoderConfigs.java     |  30 +++
 .../java/org/apache/kafka/server/util/Csv.java     |  41 +++++
 .../apache/kafka/security/PasswordEncoderTest.java | 124 +++++++++++++
 .../java/org/apache/kafka/server/util/CsvTest.java |  63 +++++++
 .../org/apache/kafka/server/config/Defaults.java   |   7 +-
 27 files changed, 660 insertions(+), 417 deletions(-)

diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
index 43406958711..2c5c652e979 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -58,6 +58,14 @@
         <allow pkg="org.apache.kafka.common.protocol" />
     </subpackage>
 
+    <subpackage name="security">
+        <allow pkg="org.apache.kafka.common.config" />
+        <allow pkg="org.apache.kafka.common.config.types" />
+        <allow pkg="org.apache.kafka.server.util" />
+        <allow pkg="javax.crypto" />
+        <allow pkg="javax.crypto.spec" />
+    </subpackage>
+
     <subpackage name="server">
         <allow pkg="org.apache.kafka.common" />
         <allow pkg="joptsimple" />
diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml
index 2f6a7c4c058..c395a5f2214 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -57,6 +57,7 @@
 
   <!-- utilities and reusable classes from server-common -->
   <allow pkg="org.apache.kafka.queue" />
+  <allow pkg="org.apache.kafka.security" />
   <allow pkg="org.apache.kafka.server.common" />
   <allow pkg="org.apache.kafka.server.metrics" />
   <allow pkg="org.apache.kafka.server.util" />
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 3f75b7d7ceb..0d82d23d7ed 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -21,10 +21,10 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 import java.util.{Collections, Properties}
 import joptsimple._
-import kafka.server.DynamicConfig.QuotaConfigs
 import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
-import kafka.utils.{Exit, Logging, PasswordEncoder}
+import kafka.server.DynamicConfig.QuotaConfigs
 import kafka.utils.Implicits._
+import kafka.utils.{Exit, Logging}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@@ -35,7 +35,8 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
-import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, Defaults}
+import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.apache.zookeeper.client.ZKClientConfig
@@ -211,19 +212,19 @@ object ConfigCommand extends Logging {
   }
 
   private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = {
-    encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
-    val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
+    encoderConfigs.get(PasswordEncoderConfigs.SECRET)
+    val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET,
       throw new IllegalArgumentException("Password encoder secret not specified"))
     PasswordEncoder.encrypting(new Password(encoderSecret),
-      None,
-      encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderCipherAlgorithmProp, Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM),
-      encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_KEY_LENGTH),
-      encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_ITERATIONS))
+      null,
+      encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM),
+      encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH),
+      encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS))
   }
 
   /**
    * Pre-process broker configs provided to convert them to persistent format.
-   * Password configs are encrypted using the secret `KafkaConfig.PasswordEncoderSecretProp`.
+   * Password configs are encrypted using the secret `PasswordEncoderConfigs.SECRET`.
    * The secret is removed from `configsToBeAdded` and will not be persisted in ZooKeeper.
    */
   private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean): Unit = {
@@ -238,8 +239,8 @@ object ConfigCommand extends Logging {
     DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
     val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
     if (passwordConfigs.nonEmpty) {
-      require(passwordEncoderConfigs.containsKey(KafkaConfig.PasswordEncoderSecretProp),
-        s"${KafkaConfig.PasswordEncoderSecretProp} must be specified to update $passwordConfigs." +
+      require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.SECRET),
+        s"${PasswordEncoderConfigs.SECRET} must be specified to update $passwordConfigs." +
           " Other password encoder configs like cipher algorithm and iterations may also be specified" +
           " to override the default encoding parameters. Password encoder configs will not be persisted" +
           " in ZooKeeper."
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index f74bb3de7a4..8f14d814b63 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -27,7 +27,7 @@ import kafka.server.QuotaFactory.QuotaManagers
 
 import scala.collection.immutable
 import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
-import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
+import kafka.utils.{CoreUtils, Logging}
 import kafka.zk.{KafkaZkClient, ZkMigrationClient}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.network.ListenerName
@@ -44,6 +44,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
 import org.apache.kafka.metadata.publisher.FeaturesPublisher
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.NodeToControllerChannelManager
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -285,7 +286,7 @@ class ControllerServer(
             config.passwordEncoderCipherAlgorithm,
             config.passwordEncoderKeyLength,
             config.passwordEncoderIterations)
-          case None => PasswordEncoder.noop()
+          case None => PasswordEncoder.NOOP
         }
         val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
         val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 7f607d49a4f..74880971bc3 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,7 +25,7 @@ import kafka.cluster.EndPoint
 import kafka.log.{LogCleaner, LogManager}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.server.DynamicBrokerConfig._
-import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
+import kafka.utils.{CoreUtils, Logging}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
@@ -35,6 +35,7 @@ import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.{ConfigUtils, Utils}
+import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -223,7 +224,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
     maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
   } else {
-    Some(PasswordEncoder.noop())
+    Some(PasswordEncoder.NOOP)
   }
 
   private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c8910a5ccf8..a9cda78dd0f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -40,15 +40,17 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.security.PasswordEncoderConfigs
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms}
-import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
-import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.record.BrokerCompressionType
+import org.apache.kafka.server.util.Csv
+import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
 import org.apache.zookeeper.client.ZKClientConfig
 
 import scala.annotation.nowarn
@@ -435,12 +437,12 @@ object KafkaConfig {
   val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
 
   /** ********* Password encryption configuration for dynamic configs *********/
-  val PasswordEncoderSecretProp = "password.encoder.secret"
-  val PasswordEncoderOldSecretProp = "password.encoder.old.secret"
-  val PasswordEncoderKeyFactoryAlgorithmProp = "password.encoder.keyfactory.algorithm"
-  val PasswordEncoderCipherAlgorithmProp = "password.encoder.cipher.algorithm"
-  val PasswordEncoderKeyLengthProp =  "password.encoder.key.length"
-  val PasswordEncoderIterationsProp =  "password.encoder.iterations"
+  val PasswordEncoderSecretProp = PasswordEncoderConfigs.SECRET
+  val PasswordEncoderOldSecretProp = PasswordEncoderConfigs.OLD_SECRET
+  val PasswordEncoderKeyFactoryAlgorithmProp = PasswordEncoderConfigs.KEYFACTORY_ALGORITHM
+  val PasswordEncoderCipherAlgorithmProp = PasswordEncoderConfigs.CIPHER_ALGORITHM
+  val PasswordEncoderKeyLengthProp = PasswordEncoderConfigs.KEY_LENGTH
+  val PasswordEncoderIterationsProp = PasswordEncoderConfigs.ITERATIONS
 
   /** Internal Configurations **/
   val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
@@ -1873,7 +1875,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp))
   def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp))
   def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp)
-  def passwordEncoderKeyFactoryAlgorithm = Option(getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp))
+  def passwordEncoderKeyFactoryAlgorithm = getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp)
   def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp)
   def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)
 
@@ -1935,7 +1937,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
 
   private def getMap(propName: String, propValue: String): Map[String, String] = {
     try {
-      CoreUtils.parseCsvMap(propValue)
+      Csv.parseCsvMap(propValue).asScala
     } catch {
       case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(propName, e.getMessage))
     }
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 88e9f8aa2a8..6af1f3594e2 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -26,7 +26,7 @@ import com.typesafe.scalalogging.Logger
 
 import javax.management._
 import scala.collection._
-import scala.collection.{Seq, mutable}
+import scala.collection.Seq
 import kafka.cluster.EndPoint
 import org.apache.commons.validator.routines.InetAddressValidator
 import org.apache.kafka.common.network.ListenerName
@@ -109,23 +109,6 @@ object CoreUtils {
     }
   }
 
-  /**
-   * This method gets comma separated values which contains key,value pairs and returns a map of
-   * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
-   * Also supports strings with multiple ":" such as IpV6 addresses, taking the last occurrence
-   * of the ":" in the pair as the split, eg a:b:c:val1, d:e:f:val2 => a:b:c -> val1, d:e:f -> val2
-   */
-  def parseCsvMap(str: String): Map[String, String] = {
-    val map = new mutable.HashMap[String, String]
-    if ("".equals(str))
-      return map
-    val keyVals = str.split("\\s*,\\s*").map(s => {
-      val lio = s.lastIndexOf(":")
-      (s.substring(0,lio).trim, s.substring(lio + 1).trim)
-    })
-    keyVals.toMap
-  }
-
   /**
    * Parse a comma separated string into a sequence of strings.
    * Whitespace surrounding the comma will be removed.
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
deleted file mode 100644
index d4737be08ce..00000000000
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.utils
-
-import java.nio.charset.StandardCharsets
-import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom}
-import java.security.spec.AlgorithmParameterSpec
-import java.util.Base64
-
-import javax.crypto.{Cipher, SecretKeyFactory}
-import javax.crypto.spec._
-import kafka.utils.PasswordEncoder._
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.config.types.Password
-
-import scala.collection.Map
-
-object PasswordEncoder {
-  val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm"
-  val CipherAlgorithmProp = "cipherAlgorithm"
-  val InitializationVectorProp = "initializationVector"
-  val KeyLengthProp = "keyLength"
-  val SaltProp = "salt"
-  val IterationsProp = "iterations"
-  val EncryptedPasswordProp = "encryptedPassword"
-  val PasswordLengthProp = "passwordLength"
-
-  def encrypting(secret: Password,
-                 keyFactoryAlgorithm: Option[String],
-                 cipherAlgorithm: String,
-                 keyLength: Int,
-                 iterations: Int): EncryptingPasswordEncoder = {
-    new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations)
-  }
-
-  def noop(): NoOpPasswordEncoder = {
-    new NoOpPasswordEncoder()
-  }
-}
-
-trait PasswordEncoder {
-  def encode(password: Password): String
-  def decode(encodedPassword: String): Password
-
-  private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
-}
-
-/**
- * A password encoder that does not modify the given password. This is used in KRaft mode only.
- */
-class NoOpPasswordEncoder extends PasswordEncoder {
-  override def encode(password: Password): String = password.value()
-  override def decode(encodedPassword: String): Password = new Password(encodedPassword)
-}
-
-/**
-  * Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map
-  * containing the encoded password in base64 and along with the properties used for encryption.
-  *
-  * @param secret The secret used for encoding and decoding
-  * @param keyFactoryAlgorithm  Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is
-  *                             used if available, PBKDF2WithHmacSHA1 otherwise.
-  * @param cipherAlgorithm Cipher algorithm used for encoding.
-  * @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
-  * @param iterations Iteration count used for encoding.
-  *
-  * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
-  * The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
-  *
-  */
-class EncryptingPasswordEncoder(
-  secret: Password,
-  keyFactoryAlgorithm: Option[String],
-  cipherAlgorithm: String,
-  keyLength: Int,
-  iterations: Int
-) extends PasswordEncoder with Logging {
-
-  private val secureRandom = new SecureRandom
-  private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
-
-  override def encode(password: Password): String = {
-    val salt = new Array[Byte](256)
-    secureRandom.nextBytes(salt)
-    val cipher = Cipher.getInstance(cipherAlgorithm)
-    val keyFactory = secretKeyFactory(keyFactoryAlgorithm)
-    val keySpec = secretKeySpec(keyFactory, cipherAlgorithm, keyLength, salt, iterations)
-    cipher.init(Cipher.ENCRYPT_MODE, keySpec)
-    val encryptedPassword = cipher.doFinal(password.value.getBytes(StandardCharsets.UTF_8))
-    val encryptedMap = Map(
-      KeyFactoryAlgorithmProp -> keyFactory.getAlgorithm,
-      CipherAlgorithmProp -> cipherAlgorithm,
-      KeyLengthProp -> keyLength,
-      SaltProp -> base64Encode(salt),
-      IterationsProp -> iterations.toString,
-      EncryptedPasswordProp -> base64Encode(encryptedPassword),
-      PasswordLengthProp -> password.value.length
-    ) ++ cipherParamsEncoder.toMap(cipher.getParameters)
-    encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
-  }
-
-  override def decode(encodedPassword: String): Password = {
-    val params = CoreUtils.parseCsvMap(encodedPassword)
-    val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
-    val cipherAlg = params(CipherAlgorithmProp)
-    val keyLength = params(KeyLengthProp).toInt
-    val salt = base64Decode(params(SaltProp))
-    val iterations = params(IterationsProp).toInt
-    val encryptedPassword = base64Decode(params(EncryptedPasswordProp))
-    val passwordLengthProp = params(PasswordLengthProp).toInt
-    val cipher = Cipher.getInstance(cipherAlg)
-    val keyFactory = secretKeyFactory(Some(keyFactoryAlg))
-    val keySpec = secretKeySpec(keyFactory, cipherAlg, keyLength, salt, iterations)
-    cipher.init(Cipher.DECRYPT_MODE, keySpec, cipherParamsEncoder.toParameterSpec(params))
-    val password = try {
-      val decrypted = cipher.doFinal(encryptedPassword)
-      new String(decrypted, StandardCharsets.UTF_8)
-    } catch {
-      case e: Exception => throw new ConfigException("Password could not be decoded", e)
-    }
-    if (password.length != passwordLengthProp) // Sanity check
-      throw new ConfigException("Password could not be decoded, sanity check of length failed")
-    new Password(password)
-  }
-
-  private def secretKeyFactory(keyFactoryAlg: Option[String]): SecretKeyFactory = {
-    keyFactoryAlg match {
-      case Some(algorithm) => SecretKeyFactory.getInstance(algorithm)
-      case None =>
-        try {
-          SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
-        } catch {
-          case _: NoSuchAlgorithmException => SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1")
-        }
-    }
-  }
-
-  private def secretKeySpec(keyFactory: SecretKeyFactory,
-                            cipherAlg: String,
-                            keyLength: Int,
-                            salt: Array[Byte], iterations: Int): SecretKeySpec = {
-    val keySpec = new PBEKeySpec(secret.value.toCharArray, salt, iterations, keyLength)
-    val algorithm = if (cipherAlg.indexOf('/') > 0) cipherAlg.substring(0, cipherAlg.indexOf('/')) else cipherAlg
-    new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded, algorithm)
-  }
-
-  private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes)
-
-  private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
-    val aesPattern = "AES/(.*)/.*".r
-    cipherAlgorithm match {
-      case aesPattern("GCM") => new GcmParamsEncoder
-      case _ => new IvParamsEncoder
-    }
-  }
-
-  private trait CipherParamsEncoder {
-    def toMap(cipher: AlgorithmParameters): Map[String, String]
-    def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec
-  }
-
-  private class IvParamsEncoder extends CipherParamsEncoder {
-    def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
-      if (cipherParams != null) {
-        val ivSpec = cipherParams.getParameterSpec(classOf[IvParameterSpec])
-        Map(InitializationVectorProp -> base64Encode(ivSpec.getIV))
-      } else
-        throw new IllegalStateException("Could not determine initialization vector for cipher")
-    }
-    def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
-      new IvParameterSpec(base64Decode(paramMap(InitializationVectorProp)))
-    }
-  }
-
-  private class GcmParamsEncoder extends CipherParamsEncoder {
-    def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
-      if (cipherParams != null) {
-        val spec = cipherParams.getParameterSpec(classOf[GCMParameterSpec])
-        Map(InitializationVectorProp -> base64Encode(spec.getIV),
-            "authenticationTagLength" -> spec.getTLen.toString)
-      } else
-        throw new IllegalStateException("Could not determine initialization vector for cipher")
-    }
-    def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
-      new GCMParameterSpec(paramMap("authenticationTagLength").toInt, base64Decode(paramMap(InitializationVectorProp)))
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 54490e3dd65..33ba418fc8c 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -22,6 +22,7 @@ import java.util.Collections
 import scala.collection._
 import scala.jdk.CollectionConverters._
 import kafka.utils.Implicits._
+import org.apache.kafka.server.util.Csv
 
 object VerifiableProperties {
   def apply(map: java.util.Map[String, AnyRef]): VerifiableProperties = {
@@ -186,7 +187,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
    */
   def getMap(name: String, valid: String => Boolean = _ => true): Map[String, String] = {
     try {
-      val m = CoreUtils.parseCsvMap(getString(name, ""))
+      val m = Csv.parseCsvMap(getString(name, "")).asScala
       m.foreach {
         case(key, value) => 
           if(!valid(value))
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index a11a84c017b..5e06cd5e918 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -16,7 +16,7 @@
  */
 package kafka.zk
 
-import kafka.utils.{Logging, PasswordEncoder}
+import kafka.utils.Logging
 import kafka.zk.ZkMigrationClient.wrapZkException
 import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, ZkDelegationTokenMigrationClient, ZkTopicMigrationClient}
 import kafka.zookeeper._
@@ -33,6 +33,7 @@ import org.apache.kafka.metadata.PartitionRegistration
 import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
 import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor, TopicVisitorInterest}
 import org.apache.kafka.metadata.migration._
+import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
index b0986445e30..0f37d142bcb 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
@@ -18,7 +18,7 @@
 package kafka.zk.migration
 
 import kafka.server.{DynamicBrokerConfig, DynamicConfig, ZkAdminManager}
-import kafka.utils.{Logging, PasswordEncoder}
+import kafka.utils.Logging
 import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
 import kafka.zk._
 import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
@@ -31,10 +31,12 @@ import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
 import org.apache.kafka.metadata.migration.{ConfigMigrationClient, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.{CreateMode, KeeperException}
 
+
 import java.{lang, util}
 import java.util.Properties
 import java.util.function.{BiConsumer, Consumer}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 5735cb1cf9b..808fe9da419 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -59,6 +59,7 @@ import org.apache.kafka.common.requests.MetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
+import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.config.ConfigType
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.record.BrokerCompressionType
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala
index 20dd5d91a83..6efb3f4e3e9 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.zk
 
-import kafka.utils.{Logging, PasswordEncoder, TestUtils}
+import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.metadata.{FeatureLevelRecord, TopicRecord}
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -28,6 +28,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
 import org.apache.kafka.metadata.KafkaConfigSchema
 import org.apache.kafka.metadata.migration._
 import org.apache.kafka.raft.{LeaderAndEpoch, OffsetAndEpoch}
+import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.zookeeper.client.ZKClientConfig
@@ -144,7 +145,7 @@ class ZkMigrationFailoverTest extends Logging {
     }
 
     // Safe to reuse these since they don't keep any state
-    val zkMigrationClient = ZkMigrationClient(zkClient, PasswordEncoder.noop())
+    val zkMigrationClient = ZkMigrationClient(zkClient, PasswordEncoder.NOOP)
 
     val (driver1, faultHandler1) = buildMigrationDriver(3000, zkMigrationClient)
     val (driver2, faultHandler2) = buildMigrationDriver(3001, zkMigrationClient)
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index b1437f8c368..e52bc4e567a 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -23,7 +23,7 @@ import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate,
 import kafka.test.junit.ClusterTestExtensions
 import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
-import kafka.utils.{PasswordEncoder, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.{TopicPartition, Uuid}
@@ -45,6 +45,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
 import org.apache.kafka.metadata.authorizer.StandardAcl
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.ControllerRequestCompletionHandler
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
 import org.apache.kafka.server.config.ConfigType
@@ -139,7 +140,7 @@ class ZkMigrationIntegrationTest {
 
     val underlying = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying()
     val zkClient = underlying.zkClient
-    val migrationClient = ZkMigrationClient(zkClient, PasswordEncoder.noop())
+    val migrationClient = ZkMigrationClient(zkClient, PasswordEncoder.NOOP)
     val verifier = new MetadataDeltaVerifier()
     migrationClient.readAllMetadata(batch => verifier.accept(batch), _ => { })
     verifier.verify { image =>
@@ -246,7 +247,7 @@ class ZkMigrationIntegrationTest {
           kafkaConfig.passwordEncoderCipherAlgorithm,
           kafkaConfig.passwordEncoderKeyLength,
           kafkaConfig.passwordEncoderIterations)
-      case None => PasswordEncoder.noop()
+      case None => PasswordEncoder.NOOP
     }
 
     val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index ee3401062c8..f1c0dd0371f 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -36,7 +36,7 @@ import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorServic
 
 class CoreUtilsTest extends Logging {
 
-  val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
+  val clusterIdPattern: Pattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
 
   @Test
   def testSwallow(): Unit = {
@@ -73,41 +73,6 @@ class CoreUtilsTest extends Logging {
     assertTrue(emptyStringList.equals(emptyList))
   }
 
-  @Test
-  def testCsvMap(): Unit = {
-    val emptyString: String = ""
-    val emptyMap = CoreUtils.parseCsvMap(emptyString)
-    val emptyStringMap = Map.empty[String, String]
-    assertTrue(emptyMap != null)
-    assertTrue(emptyStringMap.equals(emptyStringMap))
-
-    val kvPairsIpV6: String = "a:b:c:v,a:b:c:v"
-    val ipv6Map = CoreUtils.parseCsvMap(kvPairsIpV6)
-    for (m <- ipv6Map) {
-      assertTrue(m._1.equals("a:b:c"))
-      assertTrue(m._2.equals("v"))
-    }
-
-    val singleEntry:String = "key:value"
-    val singleMap = CoreUtils.parseCsvMap(singleEntry)
-    val value = singleMap.getOrElse("key", 0)
-    assertTrue(value.equals("value"))
-
-    val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow"
-    val ipv4Map = CoreUtils.parseCsvMap(kvPairsIpV4)
-    for (m <- ipv4Map) {
-      assertTrue(m._1.equals("192.168.2.1/30"))
-      assertTrue(m._2.equals("allow"))
-    }
-
-    val kvPairsSpaces: String = "key:value      , key:   value"
-    val spaceMap = CoreUtils.parseCsvMap(kvPairsSpaces)
-    for (m <- spaceMap) {
-      assertTrue(m._1.equals("key"))
-      assertTrue(m._2.equals("value"))
-    }
-  }
-
   @Test
   def testInLock(): Unit = {
     val lock = new ReentrantLock()
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
deleted file mode 100755
index 1073ad0082b..00000000000
--- a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-
-import javax.crypto.SecretKeyFactory
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.server.config.Defaults
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-class PasswordEncoderTest {
-
-  @Test
-  def testEncodeDecode(): Unit = {
-    val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
-      None,
-      Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM,
-      Defaults.PASSWORD_ENCODER_KEY_LENGTH,
-      Defaults.PASSWORD_ENCODER_ITERATIONS)
-    val password = "test-password"
-    val encoded = encoder.encode(new Password(password))
-    val encodedMap = CoreUtils.parseCsvMap(encoded)
-    assertEquals("4096", encodedMap(PasswordEncoder.IterationsProp))
-    assertEquals("128", encodedMap(PasswordEncoder.KeyLengthProp))
-    val defaultKeyFactoryAlgorithm = try {
-      SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
-      "PBKDF2WithHmacSHA512"
-    } catch {
-      case _: Exception => "PBKDF2WithHmacSHA1"
-    }
-    assertEquals(defaultKeyFactoryAlgorithm, encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
-    assertEquals("AES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
-
-    verifyEncodedPassword(encoder, password, encoded)
-  }
-
-  @Test
-  def testEncoderConfigChange(): Unit = {
-    val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
-      Some("PBKDF2WithHmacSHA1"),
-      "DES/CBC/PKCS5Padding",
-      64,
-      1024)
-    val password = "test-password"
-    val encoded = encoder.encode(new Password(password))
-    val encodedMap = CoreUtils.parseCsvMap(encoded)
-    assertEquals("1024", encodedMap(PasswordEncoder.IterationsProp))
-    assertEquals("64", encodedMap(PasswordEncoder.KeyLengthProp))
-    assertEquals("PBKDF2WithHmacSHA1", encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
-    assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
-
-    // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
-    val decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
-      Some("PBKDF2WithHmacSHA1"),
-      "AES/CBC/PKCS5Padding",
-      128,
-      2048)
-    assertEquals(password, decoder.decode(encoded).value)
-
-    // Test that decoding fails if secret is altered
-    val decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
-      Some("PBKDF2WithHmacSHA1"),
-      "AES/CBC/PKCS5Padding",
-      128,
-      1024)
-    try {
-      decoder2.decode(encoded)
-    } catch {
-      case e: ConfigException => // expected exception
-    }
-  }
-
-  @Test
-  def testEncodeDecodeAlgorithms(): Unit = {
-
-    def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
-      val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
-        keyFactoryAlg,
-        cipherAlg,
-        keyLength,
-        Defaults.PASSWORD_ENCODER_ITERATIONS)
-      val password = "test-password"
-      val encoded = encoder.encode(new Password(password))
-      verifyEncodedPassword(encoder, password, encoded)
-    }
-
-    verifyEncodeDecode(keyFactoryAlg = None, "DES/CBC/PKCS5Padding", keyLength = 64)
-    verifyEncodeDecode(keyFactoryAlg = None, "DESede/CBC/PKCS5Padding", keyLength = 192)
-    verifyEncodeDecode(keyFactoryAlg = None, "AES/CBC/PKCS5Padding", keyLength = 128)
-    verifyEncodeDecode(keyFactoryAlg = None, "AES/CFB/PKCS5Padding", keyLength = 128)
-    verifyEncodeDecode(keyFactoryAlg = None, "AES/OFB/PKCS5Padding", keyLength = 128)
-    verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA1"), Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, keyLength = 128)
-    verifyEncodeDecode(keyFactoryAlg = None, "AES/GCM/NoPadding", keyLength = 128)
-    verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA256"), Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, keyLength = 128)
-    verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA512"), Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, keyLength = 128)
-  }
-
-  private def verifyEncodedPassword(encoder: PasswordEncoder, password: String, encoded: String): Unit = {
-    val encodedMap = CoreUtils.parseCsvMap(encoded)
-    assertEquals(password.length.toString, encodedMap(PasswordEncoder.PasswordLengthProp))
-    assertNotNull(encoder.base64Decode(encodedMap("salt")), "Invalid salt")
-    assertNotNull(encoder.base64Decode(encodedMap(PasswordEncoder.InitializationVectorProp)), "Invalid encoding parameters")
-    assertNotNull(encoder.base64Decode(encodedMap(PasswordEncoder.EncryptedPasswordProp)), "Invalid encoded password")
-    assertEquals(password, encoder.decode(encoded).value)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
index d04798542ea..569cb5764b4 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
@@ -17,10 +17,10 @@
 package kafka.zk.migration
 
 import kafka.server.{KafkaConfig, QuorumTestHarness}
-import kafka.utils.PasswordEncoder
 import kafka.zk.ZkMigrationClient
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.security.PasswordEncoder
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 import java.util.Properties
diff --git a/server-common/src/main/java/org/apache/kafka/security/CipherParamsEncoder.java b/server-common/src/main/java/org/apache/kafka/security/CipherParamsEncoder.java
new file mode 100644
index 00000000000..d829e9885fa
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/CipherParamsEncoder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import java.security.AlgorithmParameters;
+import java.security.spec.AlgorithmParameterSpec;
+import java.security.spec.InvalidParameterSpecException;
+import java.util.Map;
+
+public interface CipherParamsEncoder {
+
+    Map<String, String> toMap(AlgorithmParameters cipher) throws InvalidParameterSpecException;
+
+    AlgorithmParameterSpec toParameterSpec(Map<String, String> paramMap);
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/EncryptingPasswordEncoder.java b/server-common/src/main/java/org/apache/kafka/security/EncryptingPasswordEncoder.java
new file mode 100644
index 00000000000..747505645f2
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/EncryptingPasswordEncoder.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.server.util.Csv;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.spec.InvalidKeySpecException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map
+ * containing the encoded password in base64 and along with the properties used for encryption.
+ */
+public class EncryptingPasswordEncoder implements PasswordEncoder {
+
+    private final SecureRandom secureRandom = new SecureRandom();
+
+    private final Password secret;
+    private final String keyFactoryAlgorithm;
+    private final String cipherAlgorithm;
+    private final int keyLength;
+    private final int iterations;
+    private final CipherParamsEncoder cipherParamsEncoder;
+
+
+    /**
+     * @param secret The secret used for encoding and decoding
+     * @param keyFactoryAlgorithm  Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is
+     *                             used if available, PBKDF2WithHmacSHA1 otherwise.
+     * @param cipherAlgorithm Cipher algorithm used for encoding.
+     * @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
+     * @param iterations Iteration count used for encoding.
+     * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
+     * The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
+     */
+    public EncryptingPasswordEncoder(
+            Password secret,
+            String keyFactoryAlgorithm,
+            String cipherAlgorithm,
+            int keyLength,
+            int iterations) {
+        this.secret = secret;
+        this.keyFactoryAlgorithm = keyFactoryAlgorithm;
+        this.cipherAlgorithm = cipherAlgorithm;
+        this.keyLength = keyLength;
+        this.iterations = iterations;
+        this.cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm);
+    }
+
+    @Override
+    public String encode(Password password) throws GeneralSecurityException {
+        byte[] salt = new byte[256];
+        secureRandom.nextBytes(salt);
+        Cipher cipher = Cipher.getInstance(cipherAlgorithm);
+        SecretKeyFactory keyFactory = secretKeyFactory(keyFactoryAlgorithm);
+        SecretKeySpec keySpec = secretKeySpec(keyFactory, cipherAlgorithm, keyLength, salt, iterations);
+        cipher.init(Cipher.ENCRYPT_MODE, keySpec);
+        byte[] encryptedPassword = cipher.doFinal(password.value().getBytes(StandardCharsets.UTF_8));
+        Map<String, String> encryptedMap = new HashMap<>();
+        encryptedMap.put(PasswordEncoder.KEY_FACTORY_ALGORITHM, keyFactory.getAlgorithm());
+        encryptedMap.put(PasswordEncoder.CIPHER_ALGORITHM, cipherAlgorithm);
+        encryptedMap.put(PasswordEncoder.KEY_LENGTH, String.valueOf(keyLength));
+        encryptedMap.put(PasswordEncoder.SALT, PasswordEncoder.base64Encode(salt));
+        encryptedMap.put(PasswordEncoder.ITERATIONS, String.valueOf(iterations));
+        encryptedMap.put(PasswordEncoder.ENCRYPTED_PASSWORD, PasswordEncoder.base64Encode(encryptedPassword));
+        encryptedMap.put(PasswordEncoder.PASSWORD_LENGTH, String.valueOf(password.value().length()));
+        encryptedMap.putAll(cipherParamsEncoder.toMap(cipher.getParameters()));
+
+        return encryptedMap.entrySet().stream()
+                .map(entry -> entry.getKey() + ":" + entry.getValue())
+                .collect(Collectors.joining(","));
+    }
+
+    @Override
+    public Password decode(String encodedPassword) throws GeneralSecurityException {
+        Map<String, String> params = Csv.parseCsvMap(encodedPassword);
+        String keyFactoryAlg = params.get(PasswordEncoder.KEY_FACTORY_ALGORITHM);
+        String cipherAlg = params.get(PasswordEncoder.CIPHER_ALGORITHM);
+        int keyLength = Integer.parseInt(params.get(PasswordEncoder.KEY_LENGTH));
+        byte[] salt = PasswordEncoder.base64Decode(params.get(PasswordEncoder.SALT));
+        int iterations = Integer.parseInt(params.get(PasswordEncoder.ITERATIONS));
+        byte[] encryptedPassword = PasswordEncoder.base64Decode(params.get(PasswordEncoder.ENCRYPTED_PASSWORD));
+        int passwordLengthProp = Integer.parseInt(params.get(PasswordEncoder.PASSWORD_LENGTH));
+        Cipher cipher = Cipher.getInstance(cipherAlg);
+        SecretKeyFactory keyFactory = secretKeyFactory(keyFactoryAlg);
+        SecretKeySpec keySpec = secretKeySpec(keyFactory, cipherAlg, keyLength, salt, iterations);
+        cipher.init(Cipher.DECRYPT_MODE, keySpec, cipherParamsEncoder.toParameterSpec(params));
+        try {
+            byte[] decrypted = cipher.doFinal(encryptedPassword);
+            String password = new String(decrypted, StandardCharsets.UTF_8);
+            if (password.length() != passwordLengthProp) // Sanity check
+                throw new ConfigException("Password could not be decoded, sanity check of length failed");
+            return new Password(password);
+        } catch (Exception e) {
+            throw new ConfigException("Password could not be decoded", e);
+        }
+    }
+
+    private SecretKeyFactory secretKeyFactory(String keyFactoryAlg) throws NoSuchAlgorithmException {
+        if (keyFactoryAlg != null) {
+            return SecretKeyFactory.getInstance(keyFactoryAlg);
+        } else {
+            try {
+                return SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512");
+            } catch (NoSuchAlgorithmException nsae) {
+                return SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1");
+            }
+        }
+    }
+
+    private SecretKeySpec secretKeySpec(SecretKeyFactory keyFactory,
+                                        String cipherAlg,
+                                        int keyLength,
+                                        byte[] salt,
+                                        int iterations) throws InvalidKeySpecException {
+        PBEKeySpec keySpec = new PBEKeySpec(secret.value().toCharArray(), salt, iterations, keyLength);
+        String algorithm = (cipherAlg.indexOf('/') > 0) ? cipherAlg.substring(0, cipherAlg.indexOf('/')) : cipherAlg;
+        return new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded(), algorithm);
+    }
+
+    private CipherParamsEncoder cipherParamsInstance(String cipherAlgorithm) {
+        if (cipherAlgorithm.startsWith("AES/GCM/")) {
+            return new GcmParamsEncoder();
+        } else {
+            return new IvParamsEncoder();
+        }
+    }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/GcmParamsEncoder.java b/server-common/src/main/java/org/apache/kafka/security/GcmParamsEncoder.java
new file mode 100644
index 00000000000..afe3387b2e7
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/GcmParamsEncoder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import javax.crypto.spec.GCMParameterSpec;
+import java.security.AlgorithmParameters;
+import java.security.spec.AlgorithmParameterSpec;
+import java.security.spec.InvalidParameterSpecException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GcmParamsEncoder implements CipherParamsEncoder {
+
+    private static final String AUTHENTICATION_TAG_LENGTH = "authenticationTagLength";
+
+    @Override
+    public Map<String, String> toMap(AlgorithmParameters cipherParams) throws InvalidParameterSpecException {
+        if (cipherParams != null) {
+            GCMParameterSpec spec = cipherParams.getParameterSpec(GCMParameterSpec.class);
+            Map<String, String> map = new HashMap<>();
+            map.put(PasswordEncoder.INITIALIZATION_VECTOR, PasswordEncoder.base64Encode(spec.getIV()));
+            map.put(AUTHENTICATION_TAG_LENGTH, String.valueOf(spec.getTLen()));
+            return map;
+        } else
+            throw new IllegalStateException("Could not determine initialization vector for cipher");
+    }
+
+    @Override
+    public AlgorithmParameterSpec toParameterSpec(Map<String, String> paramMap) {
+        return new GCMParameterSpec(Integer.parseInt(paramMap.get(AUTHENTICATION_TAG_LENGTH)),
+                                    PasswordEncoder.base64Decode(paramMap.get(PasswordEncoder.INITIALIZATION_VECTOR)));
+    }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/IvParamsEncoder.java b/server-common/src/main/java/org/apache/kafka/security/IvParamsEncoder.java
new file mode 100644
index 00000000000..c156d502bd7
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/IvParamsEncoder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import javax.crypto.spec.IvParameterSpec;
+import java.security.AlgorithmParameters;
+import java.security.spec.AlgorithmParameterSpec;
+import java.security.spec.InvalidParameterSpecException;
+import java.util.Collections;
+import java.util.Map;
+
+public class IvParamsEncoder implements CipherParamsEncoder {
+
+    @Override
+    public Map<String, String> toMap(AlgorithmParameters cipherParams) throws InvalidParameterSpecException {
+        if (cipherParams != null) {
+            IvParameterSpec ivSpec = cipherParams.getParameterSpec(IvParameterSpec.class);
+            return Collections.singletonMap(PasswordEncoder.INITIALIZATION_VECTOR, PasswordEncoder.base64Encode(ivSpec.getIV()));
+        } else
+            throw new IllegalStateException("Could not determine initialization vector for cipher");
+    }
+
+    @Override
+    public AlgorithmParameterSpec toParameterSpec(Map<String, String> paramMap) {
+        return new IvParameterSpec(PasswordEncoder.base64Decode(paramMap.get(PasswordEncoder.INITIALIZATION_VECTOR)));
+    }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/PasswordEncoder.java b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoder.java
new file mode 100644
index 00000000000..7d7822823b1
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import org.apache.kafka.common.config.types.Password;
+
+import java.security.GeneralSecurityException;
+import java.util.Base64;
+
+public interface PasswordEncoder {
+
+    String KEY_FACTORY_ALGORITHM = "keyFactoryAlgorithm";
+    String CIPHER_ALGORITHM = "cipherAlgorithm";
+    String INITIALIZATION_VECTOR = "initializationVector";
+    String KEY_LENGTH = "keyLength";
+    String SALT = "salt";
+    String ITERATIONS = "iterations";
+    String ENCRYPTED_PASSWORD = "encryptedPassword";
+    String PASSWORD_LENGTH = "passwordLength";
+
+    /**
+     * A password encoder that does not modify the given password. This is used in KRaft mode only.
+     */
+    PasswordEncoder NOOP = new PasswordEncoder() {
+
+        @Override
+        public String encode(Password password) {
+            return password.value();
+        }
+
+        @Override
+        public Password decode(String encodedPassword) {
+            return new Password(encodedPassword);
+        }
+    };
+
+    static byte[] base64Decode(String encoded) {
+        return Base64.getDecoder().decode(encoded);
+    }
+
+    static String base64Encode(byte[] bytes) {
+        return Base64.getEncoder().encodeToString(bytes);
+    }
+
+    static EncryptingPasswordEncoder encrypting(Password secret,
+                   String keyFactoryAlgorithm,
+                   String cipherAlgorithm,
+                   int keyLength,
+                   int iterations) {
+        return new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations);
+    }
+
+    String encode(Password password) throws GeneralSecurityException;
+    Password decode(String encodedPassword) throws GeneralSecurityException;
+}
diff --git a/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java
new file mode 100644
index 00000000000..e5c9bc13fe7
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+public class PasswordEncoderConfigs {
+    
+    public static final String SECRET = "password.encoder.secret";
+    public static final String OLD_SECRET = "password.encoder.old.secret";
+    public static final String KEYFACTORY_ALGORITHM = "password.encoder.keyfactory.algorithm";
+    public static final String CIPHER_ALGORITHM = "password.encoder.cipher.algorithm";
+    public static final String DEFAULT_CIPHER_ALGORITHM = "AES/CBC/PKCS5Padding";
+    public static final String KEY_LENGTH =  "password.encoder.key.length";
+    public static final int DEFAULT_KEY_LENGTH = 128;
+    public static final String ITERATIONS =  "password.encoder.iterations";
+    public static final int DEFAULT_ITERATIONS = 4096;
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/Csv.java b/server-common/src/main/java/org/apache/kafka/server/util/Csv.java
new file mode 100644
index 00000000000..d09b45263c2
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/util/Csv.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Csv {
+
+    /**
+     * This method gets comma separated values which contains key,value pairs and returns a map of
+     * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
+     * Also supports strings with multiple ":" such as IpV6 addresses, taking the last occurrence
+     * of the ":" in the pair as the split, eg a:b:c:val1, d:e:f:val2 => a:b:c -> val1, d:e:f -> val2
+     */
+    public static Map<String, String> parseCsvMap(String str) {
+        Map<String, String> map = new HashMap<>();
+        if (str == null || "".equals(str))
+            return map;
+        String[] keyVals = str.split("\\s*,\\s*");
+        for (String s : keyVals) {
+            int lio = s.lastIndexOf(":");
+            map.put(s.substring(0, lio).trim(), s.substring(lio + 1).trim());
+        }
+        return map;
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java b/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java
new file mode 100644
index 00000000000..c61715b02cd
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import javax.crypto.SecretKeyFactory;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.server.util.Csv;
+import org.junit.jupiter.api.Test;
+
+import java.security.GeneralSecurityException;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PasswordEncoderTest {
+
+    @Test
+    public void testEncodeDecode() throws GeneralSecurityException {
+        PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
+                null,
+                PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM,
+                PasswordEncoderConfigs.DEFAULT_KEY_LENGTH,
+                PasswordEncoderConfigs.DEFAULT_ITERATIONS);
+        String password = "test-password";
+        String encoded = encoder.encode(new Password(password));
+        Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
+        assertEquals("4096", encodedMap.get(PasswordEncoder.ITERATIONS));
+        assertEquals("128", encodedMap.get(PasswordEncoder.KEY_LENGTH));
+        String defaultKeyFactoryAlgorithm;
+        try {
+            SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512");
+            defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA512";
+
+        } catch (Exception e) {
+            defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA1";
+        }
+        assertEquals(defaultKeyFactoryAlgorithm, encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
+        assertEquals("AES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
+        verifyEncodedPassword(encoder, password, encoded);
+    }
+
+    @Test
+    public void testEncoderConfigChange() throws GeneralSecurityException {
+        PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
+                "PBKDF2WithHmacSHA1",
+                "DES/CBC/PKCS5Padding",
+                64,
+                1024);
+        String password = "test-password";
+        String encoded = encoder.encode(new Password(password));
+        Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
+        assertEquals("1024", encodedMap.get(PasswordEncoder.ITERATIONS));
+        assertEquals("64", encodedMap.get(PasswordEncoder.KEY_LENGTH));
+        assertEquals("PBKDF2WithHmacSHA1", encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
+        assertEquals("DES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
+
+        // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
+        PasswordEncoder decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
+                "PBKDF2WithHmacSHA1",
+                "AES/CBC/PKCS5Padding",
+                128,
+                2048);
+        assertEquals(password, decoder.decode(encoded).value());
+
+        // Test that decoding fails if secret is altered
+        PasswordEncoder decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
+                "PBKDF2WithHmacSHA1",
+                "AES/CBC/PKCS5Padding",
+                128,
+                1024);
+        assertThrows(ConfigException.class, () -> decoder2.decode(encoded));
+    }
+
+    @Test
+    public void  testEncodeDecodeAlgorithms() throws GeneralSecurityException {
+        verifyEncodeDecode(null, "DES/CBC/PKCS5Padding", 64);
+        verifyEncodeDecode(null, "DESede/CBC/PKCS5Padding", 192);
+        verifyEncodeDecode(null, "AES/CBC/PKCS5Padding", 128);
+        verifyEncodeDecode(null, "AES/CFB/PKCS5Padding", 128);
+        verifyEncodeDecode(null, "AES/OFB/PKCS5Padding", 128);
+        verifyEncodeDecode("PBKDF2WithHmacSHA1", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128);
+        verifyEncodeDecode(null, "AES/GCM/NoPadding", 128);
+        verifyEncodeDecode("PBKDF2WithHmacSHA256", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128);
+        verifyEncodeDecode("PBKDF2WithHmacSHA512", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128);
+    }
+
+    private void verifyEncodeDecode(String keyFactoryAlg, String cipherAlg, int keyLength) throws GeneralSecurityException {
+        PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
+                keyFactoryAlg,
+                cipherAlg,
+                keyLength,
+                PasswordEncoderConfigs.DEFAULT_ITERATIONS);
+        String password = "test-password";
+        String encoded = encoder.encode(new Password(password));
+        verifyEncodedPassword(encoder, password, encoded);
+    }
+
+    private void verifyEncodedPassword(PasswordEncoder encoder, String password, String encoded) throws GeneralSecurityException {
+        Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
+        assertEquals(String.valueOf(password.length()), encodedMap.get(PasswordEncoder.PASSWORD_LENGTH));
+        assertNotNull(PasswordEncoder.base64Decode(encodedMap.get("salt")), "Invalid salt");
+        assertNotNull(PasswordEncoder.base64Decode(encodedMap.get(PasswordEncoder.INITIALIZATION_VECTOR)), "Invalid encoding parameters");
+        assertNotNull(PasswordEncoder.base64Decode(encodedMap.get(PasswordEncoder.ENCRYPTED_PASSWORD)), "Invalid encoded password");
+        assertEquals(password, encoder.decode(encoded).value());
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/util/CsvTest.java b/server-common/src/test/java/org/apache/kafka/server/util/CsvTest.java
new file mode 100644
index 00000000000..46d54b42c2b
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/util/CsvTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class CsvTest {
+
+    @Test
+    public void testCsvMap() {
+        String emptyString = "";
+        Map<String, String> emptyMap = Csv.parseCsvMap(emptyString);
+        Map<String, String> emptyStringMap = Collections.emptyMap();
+        assertNotNull(emptyMap);
+        assertEquals(emptyStringMap, emptyStringMap);
+
+        String kvPairsIpV6 = "a:b:c:v,a:b:c:v";
+        Map<String, String> ipv6Map = Csv.parseCsvMap(kvPairsIpV6);
+        for (Map.Entry<String, String> entry : ipv6Map.entrySet()) {
+            assertEquals("a:b:c", entry.getKey());
+            assertEquals("v", entry.getValue());
+        }
+
+        String singleEntry = "key:value";
+        Map<String, String> singleMap = Csv.parseCsvMap(singleEntry);
+        String value = singleMap.get("key");
+        assertEquals("value", value);
+
+        String kvPairsIpV4 = "192.168.2.1/30:allow, 192.168.2.1/30:allow";
+        Map<String, String> ipv4Map = Csv.parseCsvMap(kvPairsIpV4);
+        for (Map.Entry<String, String> entry : ipv4Map.entrySet()) {
+            assertEquals("192.168.2.1/30", entry.getKey());
+            assertEquals("allow", entry.getValue());
+        }
+
+        String kvPairsSpaces = "key:value      , key:   value";
+        Map<String, String> spaceMap = Csv.parseCsvMap(kvPairsSpaces);
+        for (Map.Entry<String, String> entry : spaceMap.entrySet()) {
+            assertEquals("key", entry.getKey());
+            assertEquals("value", entry.getValue());
+        }
+    }
+}
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index 8ad319b822f..1c502e1564b 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -33,6 +33,7 @@ import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
 import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.Arrays;
@@ -270,9 +271,9 @@ public class Defaults {
     public static final long DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS = 1 * 60 * 60 * 1000L;
 
     /**  ********* Password Encryption Configuration for Dynamic Configs *********/
-    public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM = "AES/CBC/PKCS5Padding";
-    public static final int PASSWORD_ENCODER_KEY_LENGTH = 128;
-    public static final int PASSWORD_ENCODER_ITERATIONS = 4096;
+    public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM = PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM;
+    public static final int PASSWORD_ENCODER_KEY_LENGTH = PasswordEncoderConfigs.DEFAULT_KEY_LENGTH;
+    public static final int PASSWORD_ENCODER_ITERATIONS = PasswordEncoderConfigs.DEFAULT_ITERATIONS;
 
     /**  ********* Raft Quorum Configuration *********/
     public static final List<String> QUORUM_VOTERS = RaftConfig.DEFAULT_QUORUM_VOTERS;