You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/31 14:57:31 UTC

[kafka] branch trunk updated: KAFKA-4914: Partition reassignment tool should check types before persisting state in ZooKeeper (#2708)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4106cb1  KAFKA-4914: Partition reassignment tool should check types before persisting state in ZooKeeper (#2708)
4106cb1 is described below

commit 4106cb1db18cc45cbbeb54546a7468a4727bd88f
Author: Nick Travers <n....@gmail.com>
AuthorDate: Sat Mar 31 07:57:19 2018 -0700

    KAFKA-4914: Partition reassignment tool should check types before persisting state in ZooKeeper (#2708)
    
    Prior to this, there have been instances where invalid data was allowed to be persisted in
    ZooKeeper, which causes ClassCastExceptions when a broker is restarted and reads this
    type-unsafe data.
    
    Adds basic structural and type validation for the reassignment JSON via
    introduction of Scala case classes that map to the expected JSON
    structure. Also use the Scala case classes to deserialize the JSON
    to avoid duplication.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Viktor Somogyi <vi...@cloudera.com>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/utils/Json.scala         | 22 ++++++++-
 core/src/main/scala/kafka/utils/ZkUtils.scala      | 20 +++-----
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 10 +++-
 core/src/main/scala/kafka/zk/ZkData.scala          | 54 ++++++++++++++-------
 .../src/test/scala/unit/kafka/utils/JsonTest.scala | 31 ++++++++++--
 .../kafka/zk/ReassignPartitionsZNodeTest.scala     | 56 ++++++++++++++++++++++
 6 files changed, 155 insertions(+), 38 deletions(-)

diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index cbb8dac..c61e149 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import kafka.utils.json.JsonValue
 
 import scala.collection._
+import scala.reflect.ClassTag
 
 /**
  * Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation.
@@ -46,6 +47,15 @@ object Json {
     }
 
   /**
+   * Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of
+   * exception.
+   */
+  def parseStringAs[T](input: String)(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = {
+    try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
+    catch { case e: JsonProcessingException => Left(e) }
+  }
+
+  /**
    * Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
    */
   def parseBytes(input: Array[Byte]): Option[JsonValue] =
@@ -57,6 +67,14 @@ object Json {
     catch { case e: JsonProcessingException => Left(e) }
 
   /**
+   * Parse a JSON byte array into either a generic type T, or a JsonProcessingException in the case of exception.
+   */
+  def parseBytesAs[T](input: Array[Byte])(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = {
+    try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
+    catch { case e: JsonProcessingException => Left(e) }
+  }
+
+  /**
    * Encode an object into a JSON string. This method accepts any type T where
    *   T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
    * Any other type will result in an exception.
@@ -83,10 +101,10 @@ object Json {
   }
 
   /**
-    * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in
+   * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in
    * the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid
    * a jackson-scala dependency).
-    */
+   */
   def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj)
 
   /**
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index d5fde4d..0c16243 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -26,7 +26,7 @@ import kafka.cluster._
 import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
 import kafka.consumer.{ConsumerThreadId, TopicCount}
 import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
-import kafka.zk.{BrokerIdZNode, ZkData}
+import kafka.zk.{BrokerIdZNode, ReassignPartitionsZNode, ZkData}
 import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
@@ -142,20 +142,14 @@ object ZkUtils {
   def getDeleteTopicPath(topic: String): String =
     DeleteTopicsPath + "/" + topic
 
-  // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
   def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
-    val seq = for {
-      js <- Json.parseFull(jsonData).toSeq
-      partitionsSeq <- js.asJsonObject.get("partitions").toSeq
-      p <- partitionsSeq.asJsonArray.iterator
-    } yield {
-      val partitionFields = p.asJsonObject
-      val topic = partitionFields("topic").to[String]
-      val partition = partitionFields("partition").to[Int]
-      val newReplicas = partitionFields("replicas").to[Seq[Int]]
-      TopicAndPartition(topic, partition) -> newReplicas
+    val utf8Bytes = jsonData.getBytes(StandardCharsets.UTF_8)
+    val assignments = ReassignPartitionsZNode.decode(utf8Bytes) match {
+      case Left(e) => throw e
+      case Right(result) => result
     }
-    seq.toMap
+
+    assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) }
   }
 
   def parseTopicsData(jsonData: String): Seq[String] = {
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 0a2d96a..9b58fc7 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -666,11 +666,17 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
    * Returns all reassignments.
    * @return the reassignments for each partition.
    */
-  def getPartitionReassignment: Map[TopicPartition, Seq[Int]] = {
+  def getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]] = {
     val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
-      case Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data)
+      case Code.OK =>
+        ReassignPartitionsZNode.decode(getDataResponse.data) match {
+          case Left(e) =>
+            logger.warn(s"Ignoring partition reassignment due to invalid json: ${e.getMessage}", e)
+            Map.empty[TopicPartition, Seq[Int]]
+          case Right(assignments) => assignments
+        }
       case Code.NONODE => Map.empty
       case _ => throw getDataResponse.resultException.get
     }
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index b6352fa..fbed11d 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -19,6 +19,8 @@ package kafka.zk
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Properties
 
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.core.JsonProcessingException
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.common.KafkaException
@@ -35,9 +37,10 @@ import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.ZooDefs
 import org.apache.zookeeper.data.{ACL, Stat}
 
+import scala.beans.BeanProperty
 import scala.collection.JavaConverters._
-import scala.collection.Seq
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, breakOut}
 
 // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
 
@@ -367,26 +370,41 @@ object DeleteTopicsTopicZNode {
 }
 
 object ReassignPartitionsZNode {
+
+  /**
+    * The assignment of brokers for a `TopicPartition`.
+    *
+    * A replica assignment consists of a `topic`, `partition` and a list of `replicas`, which
+    * represent the broker ids that the `TopicPartition` is assigned to.
+    */
+  case class ReplicaAssignment(@BeanProperty @JsonProperty("topic") topic: String,
+                               @BeanProperty @JsonProperty("partition") partition: Int,
+                               @BeanProperty @JsonProperty("replicas") replicas: java.util.List[Int])
+
+  /**
+    * An assignment consists of a `version` and a list of `partitions`, which represent the
+    * assignment of topic-partitions to brokers.
+    */
+  case class PartitionAssignment(@BeanProperty @JsonProperty("version") version: Int,
+                                 @BeanProperty @JsonProperty("partitions") partitions: java.util.List[ReplicaAssignment])
+
   def path = s"${AdminZNode.path}/reassign_partitions"
-  def encode(reassignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
-    val reassignmentJson = reassignment.map { case (tp, replicas) =>
-      Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas.asJava).asJava
-    }.asJava
-    Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson).asJava)
+
+  def encode(reassignmentMap: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
+    val reassignment = PartitionAssignment(1,
+      reassignmentMap.toSeq.map { case (tp, replicas) =>
+        ReplicaAssignment(tp.topic, tp.partition, replicas.asJava)
+      }.asJava
+    )
+    Json.encodeAsBytes(reassignment)
   }
-  def decode(bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
-    val reassignmentJson = js.asJsonObject
-    val partitionsJsonOpt = reassignmentJson.get("partitions")
-    partitionsJsonOpt.map { partitionsJson =>
-      partitionsJson.asJsonArray.iterator.map { partitionFieldsJs =>
-        val partitionFields = partitionFieldsJs.asJsonObject
-        val topic = partitionFields("topic").to[String]
-        val partition = partitionFields("partition").to[Int]
-        val replicas = partitionFields("replicas").to[Seq[Int]]
-        new TopicPartition(topic, partition) -> replicas
-      }
+
+  def decode(bytes: Array[Byte]): Either[JsonProcessingException, collection.Map[TopicPartition, Seq[Int]]] =
+    Json.parseBytesAs[PartitionAssignment](bytes).right.map { partitionAssignment =>
+      partitionAssignment.partitions.asScala.map { replicaAssignment =>
+        new TopicPartition(replicaAssignment.topic, replicaAssignment.partition) -> replicaAssignment.replicas.asScala
+      }(breakOut)
     }
-  }.map(_.toMap).getOrElse(Map.empty)
 }
 
 object PreferredReplicaElectionZNode {
diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
index fa2a030..209fdee 100644
--- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
@@ -18,15 +18,22 @@ package kafka.utils
 
 import java.nio.charset.StandardCharsets
 
-import org.junit.Assert._
-import org.junit.Test
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.core.JsonParseException
 import com.fasterxml.jackson.databind.JsonNode
 import com.fasterxml.jackson.databind.node._
+import kafka.utils.JsonTest.TestObject
 import kafka.utils.json.JsonValue
+import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
+object JsonTest {
+  case class TestObject(@JsonProperty("foo") foo: String, @JsonProperty("bar") bar: Int)
+}
+
 class JsonTest {
 
   @Test
@@ -125,5 +132,23 @@ class JsonTest {
     assertEquals(""""str1\\,str2"""", new String(Json.encodeAsBytes("""str1\,str2"""), StandardCharsets.UTF_8))
     assertEquals(""""\"quoted\""""", new String(Json.encodeAsBytes(""""quoted""""), StandardCharsets.UTF_8))
   }
-  
+
+  @Test
+  def testParseTo() = {
+    val foo = "baz"
+    val bar = 1
+
+    val result = Json.parseStringAs[TestObject](s"""{"foo": "$foo", "bar": $bar}""")
+
+    assertTrue(result.isRight)
+    assertEquals(TestObject(foo, bar), result.right.get)
+  }
+
+  @Test
+  def testParseToWithInvalidJson() = {
+    val result = Json.parseStringAs[TestObject]("{invalid json}")
+
+    assertTrue(result.isLeft)
+    assertEquals(classOf[JsonParseException], result.left.get.getClass)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala b/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala
new file mode 100644
index 0000000..2f3456f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala
@@ -0,0 +1,56 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.zk
+
+import java.nio.charset.StandardCharsets
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import org.apache.kafka.common.TopicPartition
+import org.junit.Assert._
+import org.junit.Test
+
+class ReassignPartitionsZNodeTest {
+
+  private val topic = "foo"
+  private val partition1 = 0
+  private val replica1 = 1
+  private val replica2 = 2
+
+  private val reassignPartitionData = Map(new TopicPartition(topic, partition1) -> Seq(replica1, replica2))
+  private val reassignmentJson = """{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[1,2]}]}"""
+
+  @Test
+  def testEncode() {
+    val encodedJsonString = new String(ReassignPartitionsZNode.encode(reassignPartitionData), StandardCharsets.UTF_8)
+    assertEquals(reassignmentJson, encodedJsonString)
+  }
+
+  @Test
+  def testDecodeInvalidJson() {
+    val result = ReassignPartitionsZNode.decode("invalid json".getBytes)
+    assertTrue(result.isLeft)
+    assertTrue(result.left.get.isInstanceOf[JsonProcessingException])
+  }
+
+  @Test
+  def testDecodeValidJson() {
+    val result = ReassignPartitionsZNode.decode(reassignmentJson.getBytes)
+    assertTrue(result.isRight)
+    val assignmentMap = result.right.get
+    assertEquals(Seq(replica1, replica2), assignmentMap(new TopicPartition(topic, partition1)))
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.