You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/07/27 13:13:11 UTC

kafka git commit: KAFKA-1595; Remove deprecated and slower Scala JSON parser

Repository: kafka
Updated Branches:
  refs/heads/trunk 3620035c4 -> 8b14e1174


KAFKA-1595; Remove deprecated and slower Scala JSON parser

In a test by onurkaraman involving 3066 topics and 95895 partitions,
Controller initialisation time spent on JSON parsing would be reduced from
37.1 seconds to 0.7 seconds by switching from the current JSON parser to
Jackson. See the following JIRA comment for more details:

https://issues.apache.org/jira/browse/KAFKA-5328?focusedCommentId=16027086

I tested that we only use Jackson methods introduced in 2.0 in the main
codebase by compiling it with the older version locally. We use a
constructor introduced in 2.4 in one test, but I didn't remove it as it
seemed harmless. The reasoning for this is explained in the mailing list
thread:

http://search-hadoop.com/m/uyzND1FWbWw1qUbWe

Finally, this PR only handles the parsing side. It would be good to use Jackson
for serialising to JSON as well. I filed KAFKA-5631 for that.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Onur Karaman <ok...@linkedin.com>, Rajini Sivaram <ra...@googlemail.com>

Closes #83 from ijuma/kafka-1595-remove-deprecated-json-parser-jackson


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b14e117
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b14e117
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b14e117

Branch: refs/heads/trunk
Commit: 8b14e11743360a711b2bb670cf503acc0e604602
Parents: 3620035
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Jul 27 14:12:57 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jul 27 14:12:57 2017 +0100

----------------------------------------------------------------------
 build.gradle                                    |   2 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  26 +--
 .../kafka/admin/DeleteRecordsCommand.scala      |  24 +--
 .../PreferredReplicaLeaderElectionCommand.scala |  15 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |  29 +--
 .../main/scala/kafka/consumer/TopicCount.scala  |   8 +-
 .../kafka/controller/KafkaController.scala      |  44 ++--
 .../transaction/ProducerIdManager.scala         |   9 +-
 .../main/scala/kafka/security/auth/Acl.scala    |  21 +-
 .../kafka/server/DynamicConfigManager.scala     |  86 ++++----
 core/src/main/scala/kafka/utils/Json.scala      |  51 ++---
 .../main/scala/kafka/utils/LogDirUtils.scala    |  30 ++-
 .../scala/kafka/utils/ReplicationUtils.scala    |  15 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 124 +++++------
 .../scala/kafka/utils/json/DecodeJson.scala     | 122 +++++++++++
 .../main/scala/kafka/utils/json/JsonArray.scala |  27 +++
 .../scala/kafka/utils/json/JsonObject.scala     |  42 ++++
 .../main/scala/kafka/utils/json/JsonValue.scala | 118 +++++++++++
 .../kafka/api/LogDirFailureTest.scala           |   1 -
 .../unit/kafka/cluster/BrokerEndPointTest.scala | 167 ++++++++++-----
 .../transaction/ProducerIdManagerTest.scala     |  60 ++----
 .../test/scala/unit/kafka/utils/JsonTest.scala  |  27 ++-
 .../unit/kafka/utils/json/JsonValueTest.scala   | 210 +++++++++++++++++++
 gradle/dependencies.gradle                      |   2 -
 24 files changed, 879 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0c0b382..a1cb545 100644
--- a/build.gradle
+++ b/build.gradle
@@ -523,13 +523,13 @@ project(':core') {
 
   dependencies {
     compile project(':clients')
+    compile libs.jacksonDatabind
     compile libs.joptSimple
     compile libs.metrics
     compile libs.scala
     compile libs.slf4jlog4j
     compile libs.zkclient
     compile libs.zookeeper
-    compile libs.scalaParserCombinators
 
     testCompile project(':clients').sourceSets.test.output
     testCompile libs.bcpkix

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 7de85e4..d077296 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -616,23 +616,15 @@ object AdminUtils extends Logging with AdminUtilities {
     val str: String = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull
     val props = new Properties()
     if (str != null) {
-      Json.parseFull(str) match {
-        case None => // there are no config overrides
-        case Some(mapAnon: Map[_, _]) =>
-          val map = mapAnon collect { case (k: String, v: Any) => k -> v }
-          require(map("version") == 1)
-          map.get("config") match {
-            case Some(config: Map[_, _]) =>
-              for(configTup <- config)
-                configTup match {
-                  case (k: String, v: String) =>
-                    props.setProperty(k, v)
-                  case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}")
-                }
-            case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}")
-          }
-
-        case _ => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
+      Json.parseFull(str).foreach { jsValue =>
+        val jsObject = jsValue.asJsonObjectOption.getOrElse {
+          throw new IllegalArgumentException(s"Unexpected value in config: $str, entity_config_path: $entityConfigPath")
+        }
+        require(jsObject("version").to[Int] == 1)
+        val config = jsObject.get("config").flatMap(_.asJsonObjectOption).getOrElse {
+          throw new IllegalArgumentException(s"Invalid $entityConfigPath config: $str")
+        }
+        config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) }
       }
     }
     props

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 1a3b116..2715490 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -38,21 +38,15 @@ object DeleteRecordsCommand {
   }
 
   def parseOffsetJsonStringWithoutDedup(jsonData: String): Seq[(TopicPartition, Long)] = {
-    Json.parseFull(jsonData) match {
-      case Some(m) =>
-        m.asInstanceOf[Map[String, Any]].get("partitions") match {
-          case Some(partitionsSeq) =>
-            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
-              val topic = p.get("topic").get.asInstanceOf[String]
-              val partition = p.get("partition").get.asInstanceOf[Int]
-              val offset = p.get("offset").get.asInstanceOf[Int].toLong
-              new TopicPartition(topic, partition) -> offset
-            })
-          case None =>
-            Seq.empty
-        }
-      case None =>
-        Seq.empty
+    Json.parseFull(jsonData).toSeq.flatMap { js =>
+      js.asJsonObject.get("partitions").toSeq.flatMap { partitionsJs =>
+        partitionsJs.asJsonArray.iterator.map(_.asJsonObject).map { partitionJs =>
+          val topic = partitionJs("topic").to[String]
+          val partition = partitionJs("partition").to[Int]
+          val offset = partitionJs("offset").to[Long]
+          new TopicPartition(topic, partition) -> offset
+        }.toBuffer
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index f45c81a..c292fe6 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -79,20 +79,19 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
 
   def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = {
     Json.parseFull(jsonString) match {
-      case Some(m) =>
-        m.asInstanceOf[Map[String, Any]].get("partitions") match {
+      case Some(js) =>
+        js.asJsonObject.get("partitions") match {
           case Some(partitionsList) =>
-            val partitionsRaw = partitionsList.asInstanceOf[List[Map[String, Any]]]
+            val partitionsRaw = partitionsList.asJsonArray.iterator.map(_.asJsonObject)
             val partitions = partitionsRaw.map { p =>
-              val topic = p.get("topic").get.asInstanceOf[String]
-              val partition = p.get("partition").get.asInstanceOf[Int]
+              val topic = p("topic").to[String]
+              val partition = p("partition").to[Int]
               TopicAndPartition(topic, partition)
-            }
+            }.toBuffer
             val duplicatePartitions = CoreUtils.duplicates(partitions)
-            val partitionsSet = partitions.toSet
             if (duplicatePartitions.nonEmpty)
               throw new AdminOperationException("Preferred replica election data contains duplicate partitions: %s".format(duplicatePartitions.mkString(",")))
-            partitionsSet
+            partitions.toSet
           case None => throw new AdminOperationException("Preferred replica election data is empty")
         }
       case None => throw new AdminOperationException("Preferred replica election data is empty")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 184a750..974e973 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -59,7 +59,7 @@ object Broker {
     * {
     *   "version":2,
     *   "host":"localhost",
-    *   "port":9092
+    *   "port":9092,
     *   "jmx_port":9999,
     *   "timestamp":"2233345666",
     *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
@@ -69,7 +69,7 @@ object Broker {
     * {
     *   "version":3,
     *   "host":"localhost",
-    *   "port":9092
+    *   "port":9092,
     *   "jmx_port":9999,
     *   "timestamp":"2233345666",
     *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
@@ -80,11 +80,11 @@ object Broker {
     * {
     *   "version":4,
     *   "host":"localhost",
-    *   "port":9092
+    *   "port":9092,
     *   "jmx_port":9999,
     *   "timestamp":"2233345666",
     *   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
-    *   "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}
+    *   "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
     *   "rack":"dc1"
     * }
     */
@@ -93,29 +93,30 @@ object Broker {
       throw new BrokerNotAvailableException(s"Broker id $id does not exist")
     try {
       Json.parseFull(brokerInfoString) match {
-        case Some(m) =>
-          val brokerInfo = m.asInstanceOf[Map[String, Any]]
-          val version = brokerInfo(VersionKey).asInstanceOf[Int]
+        case Some(js) =>
+          val brokerInfo = js.asJsonObject
+          val version = brokerInfo(VersionKey).to[Int]
+
           val endpoints =
             if (version < 1)
               throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
             else if (version == 1) {
-              val host = brokerInfo(HostKey).asInstanceOf[String]
-              val port = brokerInfo(PortKey).asInstanceOf[Int]
+              val host = brokerInfo(HostKey).to[String]
+              val port = brokerInfo(PortKey).to[Int]
               val securityProtocol = SecurityProtocol.PLAINTEXT
               val endPoint = new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
               Seq(endPoint)
             }
             else {
               val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey).map(
-                _.asInstanceOf[Map[String, String]].map { case (listenerName, securityProtocol) =>
-                new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol)
-              })
-              val listeners = brokerInfo(EndpointsKey).asInstanceOf[List[String]]
+                _.to[Map[String, String]].map { case (listenerName, securityProtocol) =>
+                  new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol)
+                })
+              val listeners = brokerInfo(EndpointsKey).to[Seq[String]]
               listeners.map(EndPoint.createEndPoint(_, securityProtocolMap))
             }
-          val rack = brokerInfo.get(RackKey).filter(_ != null).map(_.asInstanceOf[String])
 
+          val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]])
           Broker(id, endpoints, rack)
         case None =>
           throw new BrokerNotAvailableException(s"Broker id $id does not exist")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 68beaed..2cabcae 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -65,14 +65,14 @@ private[kafka] object TopicCount extends Logging {
     var topMap: Map[String, Int] = null
     try {
       Json.parseFull(topicCountString) match {
-        case Some(m) =>
-          val consumerRegistrationMap = m.asInstanceOf[Map[String, Any]]
+        case Some(js) =>
+          val consumerRegistrationMap = js.asJsonObject
           consumerRegistrationMap.get("pattern") match {
-            case Some(pattern) => subscriptionPattern = pattern.asInstanceOf[String]
+            case Some(pattern) => subscriptionPattern = pattern.to[String]
             case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
           }
           consumerRegistrationMap.get("subscription") match {
-            case Some(sub) => topMap = sub.asInstanceOf[Map[String, Int]]
+            case Some(sub) => topMap = sub.to[Map[String, Int]]
             case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
           }
         case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 3a61a59..0a6aac0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -138,9 +138,7 @@ object KafkaController extends Logging {
   def parseControllerId(controllerInfoString: String): Int = {
     try {
       Json.parseFull(controllerInfoString) match {
-        case Some(m) =>
-          val controllerInfo = m.asInstanceOf[Map[String, Any]]
-          controllerInfo("brokerid").asInstanceOf[Int]
+        case Some(js) => js.asJsonObject("brokerid").to[Int]
         case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
       }
     } catch {
@@ -148,9 +146,8 @@ object KafkaController extends Logging {
         // It may be due to an incompatible controller register version
         warn("Failed to parse the controller info as json. "
           + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
-        try {
-          controllerInfoString.toInt
-        } catch {
+        try controllerInfoString.toInt
+        catch {
           case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
         }
     }
@@ -1370,31 +1367,22 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     }
 
     private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
-      val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
+      val changeZnode = ZkUtils.IsrChangeNotificationPath + "/" + child
       val (jsonOpt, _) = controllerContext.zkUtils.readDataMaybeNull(changeZnode)
-      if (jsonOpt.isDefined) {
-        val json = Json.parseFull(jsonOpt.get)
-
-        json match {
-          case Some(m) =>
-            val topicAndPartitions: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
-            val isrChanges = m.asInstanceOf[Map[String, Any]]
-            val topicAndPartitionList = isrChanges("partitions").asInstanceOf[List[Any]]
-            topicAndPartitionList.foreach {
-              case tp =>
-                val topicAndPartition = tp.asInstanceOf[Map[String, Any]]
-                val topic = topicAndPartition("topic").asInstanceOf[String]
-                val partition = topicAndPartition("partition").asInstanceOf[Int]
-                topicAndPartitions += TopicAndPartition(topic, partition)
-            }
-            topicAndPartitions
+      jsonOpt.map { json =>
+        Json.parseFull(json) match {
+          case Some(js) =>
+            val isrChanges = js.asJsonObject
+            isrChanges("partitions").asJsonArray.iterator.map(_.asJsonObject).map { tpJs =>
+              val topic = tpJs("topic").to[String]
+              val partition = tpJs("partition").to[Int]
+              TopicAndPartition(topic, partition)
+            }.toSet
           case None =>
-            error("Invalid topic and partition JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
-            Set.empty
+            error(s"Invalid topic and partition JSON in ZK. ZK notification node: $changeZnode, JSON: $json")
+            Set.empty[TopicAndPartition]
         }
-      } else {
-        Set.empty
-      }
+      }.getOrElse(Set.empty[TopicAndPartition])
     }
 
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 916ffa9..f7bde96 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -40,11 +40,10 @@ object ProducerIdManager extends Logging {
 
   def parseProducerIdBlockData(jsonData: String): ProducerIdBlock = {
     try {
-      Json.parseFull(jsonData).flatMap { m =>
-        val producerIdBlockInfo = m.asInstanceOf[Map[String, Any]]
-        val brokerId = producerIdBlockInfo("broker").asInstanceOf[Int]
-        val blockStart = producerIdBlockInfo("block_start").asInstanceOf[String].toLong
-        val blockEnd = producerIdBlockInfo("block_end").asInstanceOf[String].toLong
+      Json.parseFull(jsonData).map(_.asJsonObject).flatMap { js =>
+        val brokerId = js("broker").to[Int]
+        val blockStart = js("block_start").to[String].toLong
+        val blockEnd = js("block_end").to[String].toLong
         Some(ProducerIdBlock(brokerId, blockStart, blockEnd))
       }.getOrElse(throw new KafkaException(s"Failed to parse the producerId block json $jsonData"))
     } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/security/auth/Acl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index b84d75c..1fbcfb1 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -56,20 +56,17 @@ object Acl {
     if (aclJson == null || aclJson.isEmpty)
       return collection.immutable.Set.empty[Acl]
 
-    Json.parseFull(aclJson).toSet[Any].flatMap { m =>
-      val aclMap = m.asInstanceOf[Map[String, Any]]
+    Json.parseFull(aclJson).map(_.asJsonObject).map { js =>
       //the acl json version.
-      require(aclMap(VersionKey) == CurrentVersion)
-      val aclSet = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
-      aclSet.map { item =>
-        val principal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
-        val permissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
-        val operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
-        val host = item(HostsKey).asInstanceOf[String]
+      require(js(VersionKey).to[Int] == CurrentVersion)
+      js(AclsKey).asJsonArray.iterator.map(_.asJsonObject).map { itemJs =>
+        val principal = KafkaPrincipal.fromString(itemJs(PrincipalKey).to[String])
+        val permissionType = PermissionType.fromString(itemJs(PermissionTypeKey).to[String])
+        val host = itemJs(HostsKey).to[String]
+        val operation = Operation.fromString(itemJs(OperationKey).to[String])
         new Acl(principal, permissionType, host, operation)
-      }
-    }
-
+      }.toSet
+    }.getOrElse(Set.empty)
   }
 
   def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 97760ba..634b0c2 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -25,6 +25,7 @@ import kafka.utils.ZkUtils
 import scala.collection._
 import scala.collection.JavaConverters._
 import kafka.admin.AdminUtils
+import kafka.utils.json.JsonObject
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.security.scram.ScramMechanism
 import org.apache.kafka.common.utils.Time
@@ -89,39 +90,32 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(json: String) = {
-      Json.parseFull(json) match {
-        case None => // There are no config overrides.
-        // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
-        case Some(mapAnon: Map[_, _]) =>
-          val map = mapAnon collect
-            { case (k: String, v: Any) => k -> v }
-
-          map("version") match {
-            case 1 => processEntityConfigChangeVersion1(json, map)
-            case 2 => processEntityConfigChangeVersion2(json, map)
-            case _ => throw new IllegalArgumentException("Config change notification has an unsupported version " + map("version") +
-                "Supported versions are 1 and 2.")
-          }
-
-        case _ => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-          "{\"version\" : 1, \"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"}." + " or " +
-          "{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." +
-          " Received: " + json)
+      // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+      Json.parseFull(json).foreach { js =>
+        val jsObject = js.asJsonObjectOption.getOrElse {
+          throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
+            """{"version" : 1, "entity_type":"topics/clients", "entity_name" : "topic_name/client_id"} or """ +
+            """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
+            s"Received: $json")
+        }
+        jsObject("version").to[Int] match {
+          case 1 => processEntityConfigChangeVersion1(json, jsObject)
+          case 2 => processEntityConfigChangeVersion2(json, jsObject)
+          case version => throw new IllegalArgumentException("Config change notification has unsupported version " +
+            s"'$version', supported versions are 1 and 2.")
+        }
       }
     }
 
-    private def processEntityConfigChangeVersion1(json: String, map: Map[String, Any]) {
-
-      val entityType = map.get("entity_type") match {
-        case Some(ConfigType.Topic) => ConfigType.Topic
-        case Some(ConfigType.Client) => ConfigType.Client
-        case _ => throw new IllegalArgumentException("Version 1 config change notification must have 'entity_type' set to 'clients' or 'topics'." +
-              " Received: " + json)
+    private def processEntityConfigChangeVersion1(json: String, js: JsonObject) {
+      val validConfigTypes = Set(ConfigType.Topic, ConfigType.Client)
+      val entityType = js.get("entity_type").flatMap(_.to[Option[String]]).filter(validConfigTypes).getOrElse {
+        throw new IllegalArgumentException("Version 1 config change notification must have 'entity_type' set to " +
+          s"'clients' or 'topics'. Received: $json")
       }
 
-      val entity = map.get("entity_name") match {
-        case Some(value: String) => value
-        case _ => throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
+      val entity = js.get("entity_name").flatMap(_.to[Option[String]]).getOrElse {
+        throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
       }
 
       val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
@@ -130,18 +124,19 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
 
     }
 
-    private def processEntityConfigChangeVersion2(json: String, map: Map[String, Any]) {
+    private def processEntityConfigChangeVersion2(json: String, js: JsonObject) {
 
-      val entityPath = map.get("entity_path") match {
-        case Some(value: String) => value
-        case _ => throw new IllegalArgumentException("Version 2 config change notification does not specify 'entity_path'. Received: " + json)
+      val entityPath = js.get("entity_path").flatMap(_.to[Option[String]]).getOrElse {
+        throw new IllegalArgumentException(s"Version 2 config change notification must specify 'entity_path'. Received: $json")
       }
 
       val index = entityPath.indexOf('/')
       val rootEntityType = entityPath.substring(0, index)
-      if (index < 0 || !configHandlers.contains(rootEntityType))
-        throw new IllegalArgumentException("Version 2 config change notification must have 'entity_path' starting with 'clients/', 'topics/' or 'users/'." +
-              " Received: " + json)
+      if (index < 0 || !configHandlers.contains(rootEntityType)) {
+        val entityTypes = configHandlers.keys.map(entityType => s"'$entityType'/").mkString(", ")
+        throw new IllegalArgumentException("Version 2 config change notification must have 'entity_path' starting with " +
+          s"one of $entityTypes. Received: $json")
+      }
       val fullSanitizedEntityName = entityPath.substring(index + 1)
 
       val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, rootEntityType, fullSanitizedEntityName)
@@ -154,7 +149,8 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
     }
   }
 
-  private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.ConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
+  private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.ConfigChangesPath,
+    AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
 
   /**
    * Begin watching for config changes
@@ -165,16 +161,16 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
     // Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
     configHandlers.foreach {
       case (ConfigType.User, handler) =>
-          AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.User).foreach {
-            case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
-          }
-          AdminUtils.fetchAllChildEntityConfigs(zkUtils, ConfigType.User, ConfigType.Client).foreach {
-            case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
-          }
+        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.User).foreach {
+          case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
+        }
+        AdminUtils.fetchAllChildEntityConfigs(zkUtils, ConfigType.User, ConfigType.Client).foreach {
+          case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
+        }
       case (configType, handler) =>
-          AdminUtils.fetchAllEntityConfigs(zkUtils, configType).foreach {
-            case (entityName, properties) => handler.processConfigChanges(entityName, properties)
-          }
+        AdminUtils.fetchAllEntityConfigs(zkUtils, configType).foreach {
+          case (entityName, properties) => handler.processConfigChanges(entityName, properties)
+        }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index d110284..a916875 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -14,34 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package kafka.utils
+package kafka.utils
 
-import kafka.common._
+import com.fasterxml.jackson.core.JsonProcessingException
+import com.fasterxml.jackson.databind.ObjectMapper
+import kafka.utils.json.JsonValue
 import scala.collection._
-import util.parsing.json.JSON
 
 /**
- *  A wrapper that synchronizes JSON in scala, which is not threadsafe.
+ * Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation.
  */
-object Json extends Logging {
-  val myConversionFunc = {input : String => input.toInt}
-  JSON.globalNumberParser = myConversionFunc
-  val lock = new Object
+object Json {
+
+  private val mapper = new ObjectMapper()
 
   /**
-   * Parse a JSON string into an object
+   * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
    */
-  def parseFull(input: String): Option[Any] = {
-    lock synchronized {
-      try {
-        JSON.parseFull(input)
-      } catch {
-        case t: Throwable =>
-          throw new KafkaException("Can't parse json string: %s".format(input), t)
-      }
-    }
-  }
-  
+  def parseFull(input: String): Option[JsonValue] =
+    try Option(mapper.readTree(input)).map(JsonValue(_))
+    catch { case _: JsonProcessingException => None }
+
   /**
    * Encode an object into a JSON string. This method accepts any type T where
    *   T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
@@ -55,17 +48,15 @@ object Json extends Logging {
       case b: Boolean => b.toString
       case s: String => "\"" + s + "\""
       case n: Number => n.toString
-      case m: Map[_, _] => 
-        "{" + 
-          m.map(elem => 
-            elem match {
-            case t: Tuple2[_,_] => encode(t._1) + ":" + encode(t._2)
-            case _ => throw new IllegalArgumentException("Invalid map element (" + elem + ") in " + obj)
-          }).mkString(",") + 
-      "}"
+      case m: Map[_, _] => "{" +
+        m.map {
+          case (k, v) => encode(k) + ":" + encode(v)
+          case elem => throw new IllegalArgumentException(s"Invalid map element '$elem' in $obj")
+        }.mkString(",") + "}"
       case a: Array[_] => encode(a.toSeq)
       case i: Iterable[_] => "[" + i.map(encode).mkString(",") + "]"
-      case other: AnyRef => throw new IllegalArgumentException("Unknown arguement of type " + other.getClass + ": " + other)
+      case other: AnyRef => throw new IllegalArgumentException(s"Unknown argument of type ${other.getClass}: $other")
     }
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/utils/LogDirUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala
index 0bbc47d..8457ce5 100644
--- a/core/src/main/scala/kafka/utils/LogDirUtils.scala
+++ b/core/src/main/scala/kafka/utils/LogDirUtils.scala
@@ -28,7 +28,7 @@ object LogDirUtils extends Logging {
   def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) {
     val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath(
       ZkUtils.LogDirEventNotificationPath + "/" + LogDirEventNotificationPrefix, logDirFailureEventZkData(brokerId))
-    debug("Added " + logDirEventNotificationPath + " for broker " + brokerId)
+    debug(s"Added $logDirEventNotificationPath for broker $brokerId")
   }
 
   private def logDirFailureEventZkData(brokerId: Int): String = {
@@ -42,24 +42,18 @@ object LogDirUtils extends Logging {
 
   def getBrokerIdFromLogDirEvent(zkUtils: ZkUtils, child: String): Option[Int] = {
     val changeZnode = ZkUtils.LogDirEventNotificationPath + "/" + child
-    val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
-    if (jsonOpt.isDefined) {
-      val json = Json.parseFull(jsonOpt.get)
-
-      json match {
-        case Some(m) =>
-          val brokerAndEventType = m.asInstanceOf[Map[String, Any]]
-          val brokerId = brokerAndEventType.get("broker").get.asInstanceOf[Int]
-          val eventType = brokerAndEventType.get("event").get.asInstanceOf[Int]
-          if (eventType != LogDirFailureEvent)
-            throw new IllegalArgumentException(s"The event type $eventType in znode $changeZnode is not recognized")
-          Some(brokerId)
-        case None =>
-          error("Invalid LogDirEvent JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
-          None
+    val (jsonOpt, _) = zkUtils.readDataMaybeNull(changeZnode)
+    jsonOpt.flatMap { json =>
+      val result = Json.parseFull(json).map(_.asJsonObject).map { jsObject =>
+        val brokerId = jsObject("broker").to[Int]
+        val eventType = jsObject("event").to[Int]
+        if (eventType != LogDirFailureEvent)
+          throw new IllegalArgumentException(s"The event type $eventType in znode $changeZnode is not recognized")
+        brokerId
       }
-    } else {
-      None
+      if (result.isEmpty)
+        error(s"Invalid LogDirEvent in ZK node '$changeZnode', JSON: $json")
+      result
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index fe31d7f..9533ce9 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -75,14 +75,13 @@ object ReplicationUtils extends Logging {
     leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
   }
 
-  private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat)
-      : Option[LeaderIsrAndControllerEpoch] = {
-    Json.parseFull(leaderAndIsrStr).flatMap {m =>
-      val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
-      val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
-      val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
-      val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
-      val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
+  private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
+    Json.parseFull(leaderAndIsrStr).flatMap { js =>
+      val leaderIsrAndEpochInfo = js.asJsonObject
+      val leader = leaderIsrAndEpochInfo("leader").to[Int]
+      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
+      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
+      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
       val zkPathVersion = stat.getVersion
       trace(s"Leader $leader, Epoch $epoch, Isr $isr, Zk path version $zkPathVersion for leaderAndIsrPath $path")
       Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 7d3529f..6953507 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -35,6 +35,7 @@ import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
+import kafka.utils.Json._
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -166,21 +167,16 @@ object ZkUtils {
 
   // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
   def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = {
-    Json.parseFull(jsonData) match {
-      case Some(m) =>
-        m.asInstanceOf[Map[String, Any]].get("partitions") match {
-          case Some(partitionsSeq) =>
-            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
-              val topic = p.get("topic").get.asInstanceOf[String]
-              val partition = p.get("partition").get.asInstanceOf[Int]
-              val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
-              TopicAndPartition(topic, partition) -> newReplicas
-            })
-          case None =>
-            Seq.empty
-        }
-      case None =>
-        Seq.empty
+    for {
+      js <- Json.parseFull(jsonData).toSeq
+      partitionsSeq <- js.asJsonObject.get("partitions").toSeq
+      p <- partitionsSeq.asJsonArray.iterator
+    } yield {
+      val partitionFields = p.asJsonObject
+      val topic = partitionFields("topic").to[String]
+      val partition = partitionFields("partition").to[Int]
+      val newReplicas = partitionFields("replicas").to[Seq[Int]]
+      TopicAndPartition(topic, partition) -> newReplicas
     }
   }
 
@@ -188,17 +184,11 @@ object ZkUtils {
     parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
 
   def parseTopicsData(jsonData: String): Seq[String] = {
-    var topics = List.empty[String]
-    Json.parseFull(jsonData).foreach { m =>
-      m.asInstanceOf[Map[String, Any]].get("topics").foreach { partitionsSeq =>
-          val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
-          mapPartitionSeq.foreach(p => {
-            val topic = p.get("topic").get.asInstanceOf[String]
-            topics ++= List(topic)
-          })
-      }
-    }
-    topics
+    for {
+      js <- Json.parseFull(jsonData).toSeq
+      partitionsSeq <- js.asJsonObject.get("topics").toSeq
+      p <- partitionsSeq.asJsonArray.iterator
+    } yield p.asJsonObject("topic").to[String]
   }
 
   def controllerZkData(brokerId: Int, timestamp: Long): String = {
@@ -266,10 +256,9 @@ class ZkUtils(val zkClient: ZkClient,
     }
 
     def fromJson(clusterIdJson: String): String = {
-      Json.parseFull(clusterIdJson).map { m =>
-        val clusterIdMap = m.asInstanceOf[Map[String, Any]]
-        clusterIdMap.get("id").get.asInstanceOf[String]
-      }.getOrElse(throw new KafkaException(s"Failed to parse the cluster id json $clusterIdJson"))
+      Json.parseFull(clusterIdJson).map(_.asJsonObject("id").to[String]).getOrElse {
+        throw new KafkaException(s"Failed to parse the cluster id json $clusterIdJson")
+      }
     }
   }
 
@@ -305,7 +294,7 @@ class ZkUtils(val zkClient: ZkClient,
 
   def getLeaderForPartition(topic: String, partition: Int): Option[Int] = {
     readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1.flatMap { leaderAndIsr =>
-      Json.parseFull(leaderAndIsr).map(_.asInstanceOf[Map[String, Any]]("leader").asInstanceOf[Int])
+      Json.parseFull(leaderAndIsr).map(_.asJsonObject("leader").to[Int])
     }
   }
 
@@ -315,12 +304,11 @@ class ZkUtils(val zkClient: ZkClient,
    * other broker will retry becoming leader with the same new epoch value.
    */
   def getEpochForPartition(topic: String, partition: Int): Int = {
-    val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
-    leaderAndIsrOpt match {
+    readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1 match {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
           case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for partition [%s,%d] is invalid".format(topic, partition))
-          case Some(m) => m.asInstanceOf[Map[String, Any]].get("leader_epoch").get.asInstanceOf[Int]
+          case Some(js) => js.asJsonObject("leader_epoch").to[Int]
         }
       case None => throw new NoEpochForPartitionException("No epoch, ISR path for partition [%s,%d] is empty"
         .format(topic, partition))
@@ -343,7 +331,7 @@ class ZkUtils(val zkClient: ZkClient,
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
-          case Some(m) => m.asInstanceOf[Map[String, Any]].get("isr").get.asInstanceOf[Seq[Int]]
+          case Some(js) => js.asJsonObject("isr").to[Seq[Int]]
           case None => Seq.empty[Int]
         }
       case None => Seq.empty[Int]
@@ -354,21 +342,13 @@ class ZkUtils(val zkClient: ZkClient,
    * Gets the assigned replicas (AR) for a specific topic and partition
    */
   def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
-    val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
-    jsonPartitionMapOpt match {
-      case Some(jsonPartitionMap) =>
-        Json.parseFull(jsonPartitionMap) match {
-          case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
-            case Some(replicaMap) => replicaMap.asInstanceOf[Map[String, Seq[Int]]].get(partition.toString) match {
-              case Some(seq) => seq
-              case None => Seq.empty[Int]
-            }
-            case None => Seq.empty[Int]
-          }
-          case None => Seq.empty[Int]
-        }
-      case None => Seq.empty[Int]
-    }
+    val seqOpt = for {
+      jsonPartitionMap <- readDataMaybeNull(getTopicPath(topic))._1
+      js <- Json.parseFull(jsonPartitionMap)
+      replicaMap <- js.asJsonObject.get("partitions")
+      seq <- replicaMap.asJsonObject.get(partition.toString)
+    } yield seq.to[Seq[Int]]
+    seqOpt.getOrElse(Seq.empty)
   }
 
   /**
@@ -705,15 +685,13 @@ class ZkUtils(val zkClient: ZkClient,
   def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
     val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
     topics.foreach { topic =>
-      val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
-      jsonPartitionMapOpt.foreach { jsonPartitionMap =>
-        Json.parseFull(jsonPartitionMap).foreach { m =>
-          m.asInstanceOf[Map[String, Any]].get("partitions").foreach { repl =>
-              val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
-              for((partition, replicas) <- replicaMap){
-                ret.put(TopicAndPartition(topic, partition.toInt), replicas)
-                debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
-              }
+      readDataMaybeNull(getTopicPath(topic))._1.foreach { jsonPartitionMap =>
+        Json.parseFull(jsonPartitionMap).foreach { js =>
+          js.asJsonObject.get("partitions").foreach { partitionsJs =>
+            partitionsJs.asJsonObject.iterator.foreach { case (partition, replicas) =>
+              ret.put(TopicAndPartition(topic, partition.toInt), replicas.to[Seq[Int]])
+              debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
+            }
           }
         }
       }
@@ -723,21 +701,13 @@ class ZkUtils(val zkClient: ZkClient,
 
   def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
     val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
-    topics.foreach{ topic =>
-      val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
-      val partitionMap = jsonPartitionMapOpt match {
-        case Some(jsonPartitionMap) =>
-          Json.parseFull(jsonPartitionMap) match {
-            case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
-              case Some(replicaMap) =>
-                val m1 = replicaMap.asInstanceOf[Map[String, Seq[Int]]]
-                m1.map(p => (p._1.toInt, p._2))
-              case None => Map[Int, Seq[Int]]()
-            }
-            case None => Map[Int, Seq[Int]]()
-          }
-        case None => Map[Int, Seq[Int]]()
-      }
+    topics.foreach { topic =>
+      val partitionMapOpt = for {
+        jsonPartitionMap <- readDataMaybeNull(getTopicPath(topic))._1
+        js <- Json.parseFull(jsonPartitionMap)
+        replicaMap <- js.asJsonObject.get("partitions")
+      } yield replicaMap.asJsonObject.iterator.map { case (k, v) => (k.toInt, v.to[Seq[Int]]) }.toMap
+      val partitionMap = partitionMapOpt.getOrElse(Map.empty)
       debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
       ret += (topic -> partitionMap)
     }
@@ -773,7 +743,7 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
-    val zkPath = ReassignPartitionsPath
+    val zkPath = ZkUtils.ReassignPartitionsPath
     partitionsToBeReassigned.size match {
       case 0 => // need to delete the /admin/reassign_partitions path
         deletePath(zkPath)
@@ -829,8 +799,8 @@ class ZkUtils(val zkClient: ZkClient,
           }
       }
     }
-    for ( (topic, consumerList) <- consumersPerTopicMap )
-      consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
+    for ((topic, consumerList) <- consumersPerTopicMap)
+      consumersPerTopicMap.put(topic, consumerList.sortWith((s, t) => s < t))
     consumersPerTopicMap
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/utils/json/DecodeJson.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/json/DecodeJson.scala b/core/src/main/scala/kafka/utils/json/DecodeJson.scala
new file mode 100644
index 0000000..eab1f2a
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/json/DecodeJson.scala
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils.json
+
+import scala.collection._
+import scala.language.higherKinds
+import JavaConverters._
+import generic.CanBuildFrom
+
+import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
+
+/**
+ * A type class for parsing JSON. This should typically be used via `JsonValue.apply`.
+ */
+trait DecodeJson[T] {
+
+  /**
+   * Decode the JSON node provided into an instance of `Right[T]`, if possible. Otherwise, return an error message
+   * wrapped by an instance of `Left`.
+   */
+  def decodeEither(node: JsonNode): Either[String, T]
+
+  /**
+   * Decode the JSON node provided into an instance of `T`.
+   *
+   * @throws JsonMappingException if `node` cannot be decoded into `T`.
+   */
+  def decode(node: JsonNode): T =
+    decodeEither(node) match {
+      case Right(x) => x
+      case Left(x) =>
+        // Non-deprecated constructors were only introduced in Jackson 2.7, so stick with the deprecated one in case
+        // people have older versions of Jackson in their classpath. Once the Scala clients are removed, we can loosen
+        // this restriction.
+        throw new JsonMappingException(x)
+    }
+
+}
+
+/**
+ * Contains `DecodeJson` type class instances. That is, we need one instance for each type that we want to be able to
+ * to parse into. It is a compiler error to try to parse into a type for which there is no instance.
+ */
+object DecodeJson {
+
+  implicit object DecodeBoolean extends DecodeJson[Boolean] {
+    def decodeEither(node: JsonNode): Either[String, Boolean] =
+      if (node.isBoolean) Right(node.booleanValue) else Left(s"Expected `Boolean` value, received $node")
+  }
+
+  implicit object DecodeDouble extends DecodeJson[Double] {
+    def decodeEither(node: JsonNode): Either[String, Double] =
+      if (node.isDouble || node.isLong || node.isInt)
+        Right(node.doubleValue)
+      else Left(s"Expected `Double` value, received $node")
+  }
+
+  implicit object DecodeInt extends DecodeJson[Int] {
+    def decodeEither(node: JsonNode): Either[String, Int] =
+      if (node.isInt) Right(node.intValue) else Left(s"Expected `Int` value, received $node")
+  }
+
+  implicit object DecodeLong extends DecodeJson[Long] {
+    def decodeEither(node: JsonNode): Either[String, Long] =
+      if (node.isLong || node.isInt) Right(node.longValue) else Left(s"Expected `Long` value, received $node")
+  }
+
+  implicit object DecodeString extends DecodeJson[String] {
+    def decodeEither(node: JsonNode): Either[String, String] =
+      if (node.isTextual) Right(node.textValue) else Left(s"Expected `String` value, received $node")
+  }
+
+  implicit def decodeOption[E](implicit decodeJson: DecodeJson[E]): DecodeJson[Option[E]] = new DecodeJson[Option[E]] {
+    def decodeEither(node: JsonNode): Either[String, Option[E]] = {
+      if (node.isNull) Right(None)
+      else decodeJson.decodeEither(node).right.map(Some(_))
+    }
+  }
+
+  implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], cbf: CanBuildFrom[Nothing, E, S[E]]): DecodeJson[S[E]] = new DecodeJson[S[E]] {
+    def decodeEither(node: JsonNode): Either[String, S[E]] = {
+      if (node.isArray)
+        decodeIterator(node.elements.asScala)(decodeJson.decodeEither)
+      else Left(s"Expected JSON array, received $node")
+    }
+  }
+
+  implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], cbf: CanBuildFrom[Nothing, (String, V), M[String, V]]): DecodeJson[M[String, V]] = new DecodeJson[M[String, V]] {
+    def decodeEither(node: JsonNode): Either[String, M[String, V]] = {
+      if (node.isObject)
+        decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).right.map(v => (e.getKey, v)))(cbf)
+      else Left(s"Expected JSON object, received $node")
+    }
+  }
+
+  private def decodeIterator[S, T, C](it: Iterator[S])(f: S => Either[String, T])(implicit cbf: CanBuildFrom[Nothing, T, C]): Either[String, C] = {
+    val result = cbf()
+    while (it.hasNext) {
+      f(it.next) match {
+        case Right(x) => result += x
+        case Left(x) => return Left(x)
+      }
+    }
+    Right(result.result())
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/utils/json/JsonArray.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/json/JsonArray.scala b/core/src/main/scala/kafka/utils/json/JsonArray.scala
new file mode 100644
index 0000000..0b81a4a
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/json/JsonArray.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils.json
+
+import scala.collection.{Iterator, JavaConverters}
+import JavaConverters._
+
+import com.fasterxml.jackson.databind.node.ArrayNode
+
+class JsonArray private[json] (protected val node: ArrayNode) extends JsonValue {
+  def iterator: Iterator[JsonValue] = node.elements.asScala.map(JsonValue(_))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/utils/json/JsonObject.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/json/JsonObject.scala b/core/src/main/scala/kafka/utils/json/JsonObject.scala
new file mode 100644
index 0000000..8feb08b
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/json/JsonObject.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils.json
+
+import com.fasterxml.jackson.databind.JsonMappingException
+
+import scala.collection.JavaConverters._
+
+import com.fasterxml.jackson.databind.node.ObjectNode
+
+import scala.collection.Iterator
+
+/**
+ * A thin wrapper over Jackson's `ObjectNode` for a more idiomatic API. See `JsonValue` for more details.
+ */
+class JsonObject private[json] (protected val node: ObjectNode) extends JsonValue {
+
+  def apply(name: String): JsonValue =
+    get(name).getOrElse(throw new JsonMappingException(s"No such field exists: `$name`"))
+
+  def get(name: String): Option[JsonValue] = Option(node.get(name)).map(JsonValue(_))
+
+  def iterator: Iterator[(String, JsonValue)] = node.fields.asScala.map { entry =>
+    (entry.getKey, JsonValue(entry.getValue))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/main/scala/kafka/utils/json/JsonValue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/json/JsonValue.scala b/core/src/main/scala/kafka/utils/json/JsonValue.scala
new file mode 100644
index 0000000..2be1880
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/json/JsonValue.scala
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils.json
+
+import scala.collection._
+
+import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
+import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode}
+
+/**
+ * A simple wrapper over Jackson's JsonNode that enables type safe parsing via the `DecodeJson` type
+ * class.
+ *
+ * Typical usage would be something like:
+ *
+ * {{{
+ * val jsonNode: JsonNode = ???
+ * val jsonObject = JsonValue(jsonNode).asJsonObject
+ * val intValue = jsonObject("int_field").to[Int]
+ * val optionLongValue = jsonObject("option_long_field").to[Option[Long]]
+ * val mapStringIntField = jsonObject("map_string_int_field").to[Map[String, Int]]
+ * val seqStringField = jsonObject("seq_string_field").to[Seq[String]
+ * }}}
+ *
+ * The `to` method throws an exception if the value cannot be converted to the requested type. An alternative is the
+ * `toEither` method that returns an `Either` instead.
+ */
+trait JsonValue {
+
+  protected def node: JsonNode
+
+  /**
+   * Decode this JSON value into an instance of `T`.
+   *
+   * @throws JsonMappingException if this value cannot be decoded into `T`.
+   */
+  def to[T](implicit decodeJson: DecodeJson[T]): T = decodeJson.decode(node)
+
+  /**
+   * Decode this JSON value into an instance of `Right[T]`, if possible. Otherwise, return an error message
+   * wrapped by an instance of `Left`.
+   */
+  def toEither[T](implicit decodeJson: DecodeJson[T]): Either[String, T] = decodeJson.decodeEither(node)
+
+  /**
+   * If this is a JSON object, return an instance of JsonObject. Otherwise, throw a JsonMappingException.
+   */
+  def asJsonObject: JsonObject =
+    asJsonObjectOption.getOrElse(throw new JsonMappingException(s"Expected JSON object, received $node"))
+
+  /**
+   * If this is a JSON object, return a JsonObject wrapped by a `Some`. Otherwise, return None.
+   */
+  def asJsonObjectOption: Option[JsonObject] = this match {
+    case j: JsonObject => Some(j)
+    case _ => node match {
+      case n: ObjectNode => Some(new JsonObject(n))
+      case _ => None
+    }
+  }
+
+  /**
+   * If this is a JSON array, return an instance of JsonArray. Otherwise, throw a JsonMappingException.
+   */
+  def asJsonArray: JsonArray =
+    asJsonArrayOption.getOrElse(throw new JsonMappingException(s"Expected JSON array, received $node"))
+
+  /**
+   * If this is a JSON array, return a JsonArray wrapped by a `Some`. Otherwise, return None.
+   */
+  def asJsonArrayOption: Option[JsonArray] = this match {
+    case j: JsonArray => Some(j)
+    case _ => node match {
+      case n: ArrayNode => Some(new JsonArray(n))
+      case _ => None
+    }
+  }
+
+  override def hashCode: Int = node.hashCode
+
+  override def equals(a: Any): Boolean = a match {
+    case a: JsonValue => node == a.node
+    case _ => false
+  }
+
+  override def toString: String = node.toString
+
+}
+
+object JsonValue {
+
+  /**
+   * Create an instance of `JsonValue` from Jackson's `JsonNode`.
+   */
+  def apply(node: JsonNode): JsonValue = node match {
+    case n: ObjectNode => new JsonObject(n)
+    case n: ArrayNode => new JsonArray(n)
+    case _ => new BasicJsonValue(node)
+  }
+
+  private class BasicJsonValue private[json] (protected val node: JsonNode) extends JsonValue
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
index 6942df0..04be8fd 100644
--- a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
 import org.junit.{Before, Test}
 import org.junit.Assert.assertTrue
-import org.junit.Assert.assertEquals
 
 /**
   * Test whether clients can producer and consume when there is log directory failure

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 2578243..536957c 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -17,38 +17,33 @@
 
 package kafka.cluster
 
-import java.nio.ByteBuffer
-
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Assert.{assertEquals, assertNotEquals, assertNull}
 import org.junit.Test
 
-import scala.collection.mutable
-
-class BrokerEndPointTest extends Logging {
+class BrokerEndPointTest {
 
   @Test
-  def testHashAndEquals() {
+  def testHashAndEquals(): Unit = {
     val broker1 = TestUtils.createBroker(1, "myhost", 9092)
     val broker2 = TestUtils.createBroker(1, "myhost", 9092)
     val broker3 = TestUtils.createBroker(2, "myhost", 1111)
     val broker4 = TestUtils.createBroker(1, "other", 1111)
 
-    assert(broker1 == broker2)
-    assert(broker1 != broker3)
-    assert(broker1 != broker4)
-    assert(broker1.hashCode() == broker2.hashCode())
-    assert(broker1.hashCode() != broker3.hashCode())
-    assert(broker1.hashCode() != broker4.hashCode())
+    assertEquals(broker1, broker2)
+    assertNotEquals(broker1, broker3)
+    assertNotEquals(broker1, broker4)
+    assertEquals(broker1.hashCode, broker2.hashCode)
+    assertNotEquals(broker1.hashCode, broker3.hashCode)
+    assertNotEquals(broker1.hashCode, broker4.hashCode)
 
-    val hashmap = new mutable.HashMap[Broker, Int]()
-    hashmap.put(broker1, 1)
-    assert(hashmap.getOrElse(broker1, -1) == 1)
+    assertEquals(Some(1), Map(broker1 -> 1).get(broker1))
   }
 
   @Test
-  def testFromJsonFutureVersion() {
+  def testFromJsonFutureVersion(): Unit = {
     // `createBroker` should support future compatible versions, we use a hypothetical future version here
     val brokerInfoStr = """{
       "foo":"bar",
@@ -60,10 +55,10 @@ class BrokerEndPointTest extends Logging {
       "endpoints":["SSL://localhost:9093"]
     }"""
     val broker = Broker.createBroker(1, brokerInfoStr)
-    assert(broker.id == 1)
+    assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
-    assert(brokerEndPoint.host == "localhost")
-    assert(brokerEndPoint.port == 9093)
+    assertEquals("localhost", brokerEndPoint.host)
+    assertEquals(9093, brokerEndPoint.port)
   }
 
   @Test
@@ -77,86 +72,144 @@ class BrokerEndPointTest extends Logging {
       "endpoints":["PLAINTEXT://localhost:9092"]
     }"""
     val broker = Broker.createBroker(1, brokerInfoStr)
-    assert(broker.id == 1)
+    assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
-    assert(brokerEndPoint.host == "localhost")
-    assert(brokerEndPoint.port == 9092)
+    assertEquals("localhost", brokerEndPoint.host)
+    assertEquals(9092, brokerEndPoint.port)
   }
 
   @Test
-  def testFromJsonV1() = {
+  def testFromJsonV1(): Unit = {
     val brokerInfoStr = """{"jmx_port":-1,"timestamp":"1420485325400","host":"172.16.8.243","version":1,"port":9091}"""
     val broker = Broker.createBroker(1, brokerInfoStr)
-    assert(broker.id == 1)
+    assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
-    assert(brokerEndPoint.host == "172.16.8.243")
-    assert(brokerEndPoint.port == 9091)
+    assertEquals("172.16.8.243", brokerEndPoint.host)
+    assertEquals(9091, brokerEndPoint.port)
+  }
+
+  @Test
+  def testFromJsonV3(): Unit = {
+    val json = """{
+      "version":3,
+      "host":"localhost",
+      "port":9092,
+      "jmx_port":9999,
+      "timestamp":"2233345666",
+      "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
+      "rack":"dc1"
+    }"""
+    val broker = Broker.createBroker(1, json)
+    assertEquals(1, broker.id)
+    val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+    assertEquals("host1", brokerEndPoint.host)
+    assertEquals(9093, brokerEndPoint.port)
+    assertEquals(Some("dc1"), broker.rack)
+  }
+
+  @Test
+  def testFromJsonV4WithNullRack(): Unit = {
+    val json = """{
+      "version":4,
+      "host":"localhost",
+      "port":9092,
+      "jmx_port":9999,
+      "timestamp":"2233345666",
+      "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
+      "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
+      "rack":null
+    }"""
+    val broker = Broker.createBroker(1, json)
+    assertEquals(1, broker.id)
+    val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
+    assertEquals("host1", brokerEndPoint.host)
+    assertEquals(9092, brokerEndPoint.port)
+    assertEquals(None, broker.rack)
+  }
+
+  @Test
+  def testFromJsonV4WithNoRack(): Unit = {
+    val json = """{
+      "version":4,
+      "host":"localhost",
+      "port":9092,
+      "jmx_port":9999,
+      "timestamp":"2233345666",
+      "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
+      "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}
+    }"""
+    val broker = Broker.createBroker(1, json)
+    assertEquals(1, broker.id)
+    val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
+    assertEquals("host1", brokerEndPoint.host)
+    assertEquals(9092, brokerEndPoint.port)
+    assertEquals(None, broker.rack)
   }
 
   @Test
-  def testBrokerEndpointFromUri() {
+  def testBrokerEndpointFromUri(): Unit = {
     var connectionString = "localhost:9092"
     var endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
-    assert(endpoint.host == "localhost")
-    assert(endpoint.port == 9092)
+    assertEquals("localhost", endpoint.host)
+    assertEquals(9092, endpoint.port)
     //KAFKA-3719
     connectionString = "local_host:9092"
     endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
-    assert(endpoint.host == "local_host")
-    assert(endpoint.port == 9092)
+    assertEquals("local_host", endpoint.host)
+    assertEquals(9092, endpoint.port)
     // also test for ipv6
     connectionString = "[::1]:9092"
     endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
-    assert(endpoint.host == "::1")
-    assert(endpoint.port == 9092)
+    assertEquals("::1", endpoint.host)
+    assertEquals(9092, endpoint.port)
     // test for ipv6 with % character
     connectionString = "[fe80::b1da:69ca:57f7:63d8%3]:9092"
     endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
-    assert(endpoint.host == "fe80::b1da:69ca:57f7:63d8%3")
-    assert(endpoint.port == 9092)
+    assertEquals("fe80::b1da:69ca:57f7:63d8%3", endpoint.host)
+    assertEquals(9092, endpoint.port)
     // add test for uppercase in hostname
     connectionString = "MyHostname:9092"
     endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
-    assert(endpoint.host == "MyHostname")
-    assert(endpoint.port == 9092)
+    assertEquals("MyHostname", endpoint.host)
+    assertEquals(9092, endpoint.port)
   }
 
   @Test
-  def testEndpointFromUri() {
+  def testEndpointFromUri(): Unit = {
     var connectionString = "PLAINTEXT://localhost:9092"
     var endpoint = EndPoint.createEndPoint(connectionString, None)
-    assert(endpoint.host == "localhost")
-    assert(endpoint.port == 9092)
-    assert(endpoint.connectionString == "PLAINTEXT://localhost:9092")
+    assertEquals("localhost", endpoint.host)
+    assertEquals(9092, endpoint.port)
+    assertEquals("PLAINTEXT://localhost:9092", endpoint.connectionString)
     // KAFKA-3719
     connectionString = "PLAINTEXT://local_host:9092"
     endpoint = EndPoint.createEndPoint(connectionString, None)
-    assert(endpoint.host == "local_host")
-    assert(endpoint.port == 9092)
-    assert(endpoint.connectionString == "PLAINTEXT://local_host:9092")
+    assertEquals("local_host", endpoint.host)
+    assertEquals(9092, endpoint.port)
+    assertEquals("PLAINTEXT://local_host:9092", endpoint.connectionString)
     // also test for default bind
     connectionString = "PLAINTEXT://:9092"
     endpoint = EndPoint.createEndPoint(connectionString, None)
-    assert(endpoint.host == null)
-    assert(endpoint.port == 9092)
-    assert(endpoint.connectionString == "PLAINTEXT://:9092")
+    assertNull(endpoint.host)
+    assertEquals(9092, endpoint.port)
+    assertEquals( "PLAINTEXT://:9092", endpoint.connectionString)
     // also test for ipv6
     connectionString = "PLAINTEXT://[::1]:9092"
     endpoint = EndPoint.createEndPoint(connectionString, None)
-    assert(endpoint.host == "::1")
-    assert(endpoint.port == 9092)
-    assert(endpoint.connectionString ==  "PLAINTEXT://[::1]:9092")
+    assertEquals("::1", endpoint.host)
+    assertEquals(9092, endpoint.port)
+    assertEquals("PLAINTEXT://[::1]:9092", endpoint.connectionString)
     // test for ipv6 with % character
     connectionString = "PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092"
     endpoint = EndPoint.createEndPoint(connectionString, None)
-    assert(endpoint.host == "fe80::b1da:69ca:57f7:63d8%3")
-    assert(endpoint.port == 9092)
-    assert(endpoint.connectionString ==  "PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092")
+    assertEquals("fe80::b1da:69ca:57f7:63d8%3", endpoint.host)
+    assertEquals(9092, endpoint.port)
+    assertEquals("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092", endpoint.connectionString)
     // test hostname
     connectionString = "PLAINTEXT://MyHostname:9092"
     endpoint = EndPoint.createEndPoint(connectionString, None)
-    assert(endpoint.host == "MyHostname")
-    assert(endpoint.port == 9092)
-    assert(endpoint.connectionString ==  "PLAINTEXT://MyHostname:9092")
+    assertEquals("MyHostname", endpoint.host)
+    assertEquals(9092, endpoint.port)
+    assertEquals("PLAINTEXT://MyHostname:9092", endpoint.connectionString)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index b032f8d..39353b8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -24,7 +24,7 @@ import org.junit.Assert._
 
 class ProducerIdManagerTest {
 
-  val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+  private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
 
   @After
   def tearDown(): Unit = {
@@ -32,41 +32,31 @@ class ProducerIdManagerTest {
   }
 
   @Test
-  def testGetPID() {
-    var zkVersion: Int = -1
+  def testGetProducerId() {
+    var zkVersion: Option[Int] = None
     var data: String = null
-    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
-      .andAnswer(new IAnswer[(Option[String], Int)] {
-        override def answer(): (Option[String], Int) = {
-          if (zkVersion == -1) {
-            (None.asInstanceOf[Option[String]], 0)
-          } else {
-            (Some(data), zkVersion)
-          }
-        }
-      })
-      .anyTimes()
+    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
+      override def answer(): (Option[String], Int) = zkVersion.map(Some(data) -> _).getOrElse(None, 0)
+    }).anyTimes()
 
     val capturedVersion: Capture[Int] = EasyMock.newCapture()
     val capturedData: Capture[String] = EasyMock.newCapture()
     EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
       EasyMock.capture(capturedData),
       EasyMock.capture(capturedVersion),
-      EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]]))
-      .andAnswer(new IAnswer[(Boolean, Int)] {
+      EasyMock.anyObject[Option[(ZkUtils, String, String) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] {
         override def answer(): (Boolean, Int) = {
-          zkVersion = capturedVersion.getValue + 1
+          val newZkVersion = capturedVersion.getValue + 1
+          zkVersion = Some(newZkVersion)
           data = capturedData.getValue
-
-          (true, zkVersion)
+          (true, newZkVersion)
         }
-      })
-      .anyTimes()
+      }).anyTimes()
 
     EasyMock.replay(zkUtils)
 
-    val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils)
-    val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils)
+    val manager1 = new ProducerIdManager(0, zkUtils)
+    val manager2 = new ProducerIdManager(1, zkUtils)
 
     val pid1 = manager1.generateProducerId()
     val pid2 = manager2.generateProducerId()
@@ -74,29 +64,25 @@ class ProducerIdManagerTest {
     assertEquals(0, pid1)
     assertEquals(ProducerIdManager.PidBlockSize, pid2)
 
-    for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
+    for (i <- 1L until ProducerIdManager.PidBlockSize)
       assertEquals(pid1 + i, manager1.generateProducerId())
-    }
 
-    for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
+    for (i <- 1L until ProducerIdManager.PidBlockSize)
       assertEquals(pid2 + i, manager2.generateProducerId())
-    }
 
     assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.generateProducerId())
     assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.generateProducerId())
   }
 
   @Test(expected = classOf[KafkaException])
-  def testExceedPIDLimit() {
-    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
-      .andAnswer(new IAnswer[(Option[String], Int)] {
-        override def answer(): (Option[String], Int) = {
-          (Some(ProducerIdManager.generateProducerIdBlockJson(ProducerIdBlock(0,
-            Long.MaxValue - ProducerIdManager.PidBlockSize,
-            Long.MaxValue))), 0)
-        }
-      })
-      .anyTimes()
+  def testExceedProducerIdLimit() {
+    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
+      override def answer(): (Option[String], Int) = {
+        val json = ProducerIdManager.generateProducerIdBlockJson(
+          ProducerIdBlock(0, Long.MaxValue - ProducerIdManager.PidBlockSize, Long.MaxValue))
+        (Some(json), 0)
+      }
+    }).anyTimes()
     EasyMock.replay(zkUtils)
     new ProducerIdManager(0, zkUtils)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/test/scala/unit/kafka/utils/JsonTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
index 6c8ed97..8bba50b 100644
--- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
@@ -17,11 +17,34 @@
 package kafka.utils
 
 import org.junit.Assert._
-import org.junit.{Test, After, Before}
+import org.junit.Test
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node._
+import kafka.utils.json.JsonValue
+import scala.collection.JavaConverters._
 
 class JsonTest {
 
   @Test
+  def testJsonParse() {
+    val jnf = JsonNodeFactory.instance
+
+    assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf))))
+
+    assertEquals(Json.parseFull("""{"foo":"bar"s}"""), None)
+
+    val objectNode = new ObjectNode(
+      jnf,
+      Map[String, JsonNode]("foo" -> new TextNode("bar"), "is_enabled" -> BooleanNode.TRUE).asJava
+    )
+    assertEquals(Json.parseFull("""{"foo":"bar", "is_enabled":true}"""), Some(JsonValue(objectNode)))
+
+    val arrayNode = new ArrayNode(jnf)
+    Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add)
+    assertEquals(Json.parseFull("[1, 2, 3]"), Some(JsonValue(arrayNode)))
+  }
+
+  @Test
   def testJsonEncoding() {
     assertEquals("null", Json.encode(null))
     assertEquals("1", Json.encode(1))
@@ -40,4 +63,4 @@ class JsonTest {
     assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" -> Seq(3,4))))
   }
   
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala b/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
new file mode 100644
index 0000000..b12d0f3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils.json
+
+import com.fasterxml.jackson.databind.{ObjectMapper, JsonMappingException}
+import org.junit.Test
+import org.junit.Assert._
+
+import kafka.utils.Json
+
+class JsonValueTest {
+
+  val json = """
+    |{
+    |  "boolean": false,
+    |  "int": 1234,
+    |  "long": 3000000000,
+    |  "double": 16.244355,
+    |  "string": "string",
+    |  "number_as_string": "123",
+    |  "array": [4.0, 11.1, 44.5],
+    |  "object": {
+    |    "a": true,
+    |    "b": false
+    |  },
+    |  "null": null
+    |}
+   """.stripMargin
+
+  private def parse(s: String): JsonValue =
+    Json.parseFull(s).getOrElse(sys.error("Failed to parse json: " + s))
+
+  private def assertTo[T: DecodeJson](expected: T, jsonValue: JsonObject => JsonValue): Unit = {
+    val parsed = jsonValue(parse(json).asJsonObject)
+    assertEquals(Right(expected), parsed.toEither[T])
+    assertEquals(expected, parsed.to[T])
+  }
+
+  private def assertToFails[T: DecodeJson](jsonValue: JsonObject => JsonValue): Unit = {
+    val parsed = jsonValue(parse(json).asJsonObject)
+    assertTrue(parsed.toEither[T].isLeft)
+    assertThrow[JsonMappingException](parsed.to[T])
+  }
+
+  def assertThrow[E <: Throwable : Manifest](body: => Unit): Unit = {
+    import scala.util.control.Exception._
+    val klass = manifest[E].runtimeClass
+    catchingPromiscuously(klass).opt(body).foreach { _ =>
+      fail("Expected `" + klass + "` to be thrown, but no exception was thrown")
+    }
+  }
+
+  @Test
+  def testAsJsonObject: Unit = {
+    val parsed = parse(json).asJsonObject
+    val obj = parsed("object")
+    assertEquals(obj, obj.asJsonObject)
+    assertThrow[JsonMappingException](parsed("array").asJsonObject)
+  }
+
+  @Test
+  def testAsJsonObjectOption: Unit = {
+    val parsed = parse(json).asJsonObject
+    assertTrue(parsed("object").asJsonObjectOption.isDefined)
+    assertEquals(None, parsed("array").asJsonObjectOption)
+  }
+
+  @Test
+  def testAsJsonArray: Unit = {
+    val parsed = parse(json).asJsonObject
+    val array = parsed("array")
+    assertEquals(array, array.asJsonArray)
+    assertThrow[JsonMappingException](parsed("object").asJsonArray)
+  }
+
+  @Test
+  def testAsJsonArrayOption: Unit = {
+    val parsed = parse(json).asJsonObject
+    assertTrue(parsed("array").asJsonArrayOption.isDefined)
+    assertEquals(None, parsed("object").asJsonArrayOption)
+  }
+
+  @Test
+  def testJsonObjectGet: Unit = {
+    val parsed = parse(json).asJsonObject
+    assertEquals(Some(parse("""{"a":true,"b":false}""")), parsed.get("object"))
+    assertEquals(None, parsed.get("aaaaa"))
+  }
+
+  @Test
+  def testJsonObjectApply: Unit = {
+    val parsed = parse(json).asJsonObject
+    assertEquals(parse("""{"a":true,"b":false}"""), parsed("object"))
+    assertThrow[JsonMappingException](parsed("aaaaaaaa"))
+  }
+
+  @Test
+  def testJsonObjectIterator: Unit = {
+    assertEquals(
+      Vector("a" -> parse("true"), "b" -> parse("false")),
+      parse(json).asJsonObject("object").asJsonObject.iterator.toVector
+    )
+  }
+
+  @Test
+  def testJsonArrayIterator: Unit = {
+    assertEquals(Vector("4.0", "11.1", "44.5").map(parse), parse(json).asJsonObject("array").asJsonArray.iterator.toVector)
+  }
+
+  @Test
+  def testJsonValueEquals: Unit = {
+
+    assertEquals(parse(json), parse(json))
+
+    assertEquals(parse("""{"blue": true, "red": false}"""), parse("""{"red": false, "blue": true}"""))
+    assertNotEquals(parse("""{"blue": true, "red": true}"""), parse("""{"red": false, "blue": true}"""))
+
+    assertEquals(parse("""[1, 2, 3]"""), parse("""[1, 2, 3]"""))
+    assertNotEquals(parse("""[1, 2, 3]"""), parse("""[2, 1, 3]"""))
+
+    assertEquals(parse("1344"), parse("1344"))
+    assertNotEquals(parse("1344"), parse("144"))
+
+  }
+
+  @Test
+  def testJsonValueHashCode: Unit = {
+    assertEquals(new ObjectMapper().readTree(json).hashCode, parse(json).hashCode)
+  }
+
+  @Test
+  def testJsonValueToString: Unit = {
+    val js = """{"boolean":false,"int":1234,"array":[4.0,11.1,44.5],"object":{"a":true,"b":false}}"""
+    assertEquals(js, parse(js).toString)
+  }
+
+  @Test
+  def testDecodeBoolean: Unit = {
+    assertTo[Boolean](false, _("boolean"))
+    assertToFails[Boolean](_("int"))
+  }
+
+  @Test
+  def testDecodeString: Unit = {
+    assertTo[String]("string", _("string"))
+    assertTo[String]("123", _("number_as_string"))
+    assertToFails[String](_("int"))
+    assertToFails[String](_("array"))
+  }
+
+  @Test
+  def testDecodeInt: Unit = {
+    assertTo[Int](1234, _("int"))
+    assertToFails[Int](_("long"))
+  }
+
+  @Test
+  def testDecodeLong: Unit = {
+    assertTo[Long](3000000000L, _("long"))
+    assertTo[Long](1234, _("int"))
+    assertToFails[Long](_("string"))
+  }
+
+  @Test
+  def testDecodeDouble: Unit = {
+    assertTo[Double](16.244355, _("double"))
+    assertTo[Double](1234.0, _("int"))
+    assertTo[Double](3000000000L, _("long"))
+    assertToFails[Double](_("string"))
+  }
+
+  @Test
+  def testDecodeSeq: Unit = {
+    assertTo[Seq[Double]](Seq(4.0, 11.1, 44.5), _("array"))
+    assertToFails[Seq[Double]](_("string"))
+    assertToFails[Seq[Double]](_("object"))
+    assertToFails[Seq[String]](_("array"))
+  }
+
+  @Test
+  def testDecodeMap: Unit = {
+    assertTo[Map[String, Boolean]](Map("a" -> true, "b" -> false), _("object"))
+    assertToFails[Map[String, Int]](_("object"))
+    assertToFails[Map[String, String]](_("object"))
+    assertToFails[Map[String, Double]](_("array"))
+  }
+
+  @Test
+  def testDecodeOption: Unit = {
+    assertTo[Option[Int]](None, _("null"))
+    assertTo[Option[Int]](Some(1234), _("int"))
+    assertToFails[Option[String]](_("int"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b14e117/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index e134880..e771dfa 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -64,7 +64,6 @@ versions += [
   reflections: "0.9.11",
   rocksDB: "5.3.6",
   scalaTest: "3.0.2",
-  scalaParserCombinators: "1.0.4",
   scoverage: "1.3.0",
   slf4j: "1.7.25",
   snappy: "1.1.2.6",
@@ -105,7 +104,6 @@ libs += [
   scala: "org.scala-lang:scala-library:$versions.scala",
   scalaCompiler: "org.scala-lang:scala-compiler:$versions.scala",
   scalaTest: "org.scalatest:scalatest_$versions.baseScala:$versions.scalaTest",
-  scalaParserCombinators: "org.scala-lang.modules:scala-parser-combinators_$versions.baseScala:$versions.scalaParserCombinators",
   scoveragePlugin: "org.scoverage:scalac-scoverage-plugin_$versions.baseScala:$versions.scoverage",
   scoverageRuntime: "org.scoverage:scalac-scoverage-runtime_$versions.baseScala:$versions.scoverage",
   slf4jApi: "org.slf4j:slf4j-api:$versions.slf4j",