You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/31 14:57:31 UTC
[kafka] branch trunk updated: KAFKA-4914: Partition reassignment
tool should check types before persisting state in ZooKeeper (#2708)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4106cb1 KAFKA-4914: Partition reassignment tool should check types before persisting state in ZooKeeper (#2708)
4106cb1 is described below
commit 4106cb1db18cc45cbbeb54546a7468a4727bd88f
Author: Nick Travers <n....@gmail.com>
AuthorDate: Sat Mar 31 07:57:19 2018 -0700
KAFKA-4914: Partition reassignment tool should check types before persisting state in ZooKeeper (#2708)
Prior to this, there have been instances where invalid data was allowed to be persisted in
ZooKeeper, which causes ClassCastExceptions when a broker is restarted and reads this
type-unsafe data.
Adds basic structural and type validation for the reassignment JSON via
introduction of Scala case classes that map to the expected JSON
structure. Also use the Scala case classes to deserialize the JSON
to avoid duplication.
Reviewers: Manikumar Reddy <ma...@gmail.com>, Viktor Somogyi <vi...@cloudera.com>, Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/utils/Json.scala | 22 ++++++++-
core/src/main/scala/kafka/utils/ZkUtils.scala | 20 +++-----
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 10 +++-
core/src/main/scala/kafka/zk/ZkData.scala | 54 ++++++++++++++-------
.../src/test/scala/unit/kafka/utils/JsonTest.scala | 31 ++++++++++--
.../kafka/zk/ReassignPartitionsZNodeTest.scala | 56 ++++++++++++++++++++++
6 files changed, 155 insertions(+), 38 deletions(-)
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index cbb8dac..c61e149 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import kafka.utils.json.JsonValue
import scala.collection._
+import scala.reflect.ClassTag
/**
* Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation.
@@ -46,6 +47,15 @@ object Json {
}
/**
+ * Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of
+ * exception.
+ */
+ def parseStringAs[T](input: String)(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = {
+ try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
+ catch { case e: JsonProcessingException => Left(e) }
+ }
+
+ /**
* Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
*/
def parseBytes(input: Array[Byte]): Option[JsonValue] =
@@ -57,6 +67,14 @@ object Json {
catch { case e: JsonProcessingException => Left(e) }
/**
+ * Parse a JSON byte array into either a generic type T, or a JsonProcessingException in the case of exception.
+ */
+ def parseBytesAs[T](input: Array[Byte])(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = {
+ try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
+ catch { case e: JsonProcessingException => Left(e) }
+ }
+
+ /**
* Encode an object into a JSON string. This method accepts any type T where
* T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
* Any other type will result in an exception.
@@ -83,10 +101,10 @@ object Json {
}
/**
- * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in
+ * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid
* a jackson-scala dependency).
- */
+ */
def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj)
/**
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index d5fde4d..0c16243 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -26,7 +26,7 @@ import kafka.cluster._
import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
import kafka.consumer.{ConsumerThreadId, TopicCount}
import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
-import kafka.zk.{BrokerIdZNode, ZkData}
+import kafka.zk.{BrokerIdZNode, ReassignPartitionsZNode, ZkData}
import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
@@ -142,20 +142,14 @@ object ZkUtils {
def getDeleteTopicPath(topic: String): String =
DeleteTopicsPath + "/" + topic
- // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
- val seq = for {
- js <- Json.parseFull(jsonData).toSeq
- partitionsSeq <- js.asJsonObject.get("partitions").toSeq
- p <- partitionsSeq.asJsonArray.iterator
- } yield {
- val partitionFields = p.asJsonObject
- val topic = partitionFields("topic").to[String]
- val partition = partitionFields("partition").to[Int]
- val newReplicas = partitionFields("replicas").to[Seq[Int]]
- TopicAndPartition(topic, partition) -> newReplicas
+ val utf8Bytes = jsonData.getBytes(StandardCharsets.UTF_8)
+ val assignments = ReassignPartitionsZNode.decode(utf8Bytes) match {
+ case Left(e) => throw e
+ case Right(result) => result
}
- seq.toMap
+
+ assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) }
}
def parseTopicsData(jsonData: String): Seq[String] = {
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 0a2d96a..9b58fc7 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -666,11 +666,17 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
* Returns all reassignments.
* @return the reassignments for each partition.
*/
- def getPartitionReassignment: Map[TopicPartition, Seq[Int]] = {
+ def getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]] = {
val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
- case Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data)
+ case Code.OK =>
+ ReassignPartitionsZNode.decode(getDataResponse.data) match {
+ case Left(e) =>
+ logger.warn(s"Ignoring partition reassignment due to invalid json: ${e.getMessage}", e)
+ Map.empty[TopicPartition, Seq[Int]]
+ case Right(assignments) => assignments
+ }
case Code.NONODE => Map.empty
case _ => throw getDataResponse.resultException.get
}
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index b6352fa..fbed11d 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -19,6 +19,8 @@ package kafka.zk
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Properties
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.core.JsonProcessingException
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.common.KafkaException
@@ -35,9 +37,10 @@ import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.data.{ACL, Stat}
+import scala.beans.BeanProperty
import scala.collection.JavaConverters._
-import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, breakOut}
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
@@ -367,26 +370,41 @@ object DeleteTopicsTopicZNode {
}
object ReassignPartitionsZNode {
+
+ /**
+ * The assignment of brokers for a `TopicPartition`.
+ *
+ * A replica assignment consists of a `topic`, `partition` and a list of `replicas`, which
+ * represent the broker ids that the `TopicPartition` is assigned to.
+ */
+ case class ReplicaAssignment(@BeanProperty @JsonProperty("topic") topic: String,
+ @BeanProperty @JsonProperty("partition") partition: Int,
+ @BeanProperty @JsonProperty("replicas") replicas: java.util.List[Int])
+
+ /**
+ * An assignment consists of a `version` and a list of `partitions`, which represent the
+ * assignment of topic-partitions to brokers.
+ */
+ case class PartitionAssignment(@BeanProperty @JsonProperty("version") version: Int,
+ @BeanProperty @JsonProperty("partitions") partitions: java.util.List[ReplicaAssignment])
+
def path = s"${AdminZNode.path}/reassign_partitions"
- def encode(reassignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
- val reassignmentJson = reassignment.map { case (tp, replicas) =>
- Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas.asJava).asJava
- }.asJava
- Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson).asJava)
+
+ def encode(reassignmentMap: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
+ val reassignment = PartitionAssignment(1,
+ reassignmentMap.toSeq.map { case (tp, replicas) =>
+ ReplicaAssignment(tp.topic, tp.partition, replicas.asJava)
+ }.asJava
+ )
+ Json.encodeAsBytes(reassignment)
}
- def decode(bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
- val reassignmentJson = js.asJsonObject
- val partitionsJsonOpt = reassignmentJson.get("partitions")
- partitionsJsonOpt.map { partitionsJson =>
- partitionsJson.asJsonArray.iterator.map { partitionFieldsJs =>
- val partitionFields = partitionFieldsJs.asJsonObject
- val topic = partitionFields("topic").to[String]
- val partition = partitionFields("partition").to[Int]
- val replicas = partitionFields("replicas").to[Seq[Int]]
- new TopicPartition(topic, partition) -> replicas
- }
+
+ def decode(bytes: Array[Byte]): Either[JsonProcessingException, collection.Map[TopicPartition, Seq[Int]]] =
+ Json.parseBytesAs[PartitionAssignment](bytes).right.map { partitionAssignment =>
+ partitionAssignment.partitions.asScala.map { replicaAssignment =>
+ new TopicPartition(replicaAssignment.topic, replicaAssignment.partition) -> replicaAssignment.replicas.asScala
+ }(breakOut)
}
- }.map(_.toMap).getOrElse(Map.empty)
}
object PreferredReplicaElectionZNode {
diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
index fa2a030..209fdee 100644
--- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
@@ -18,15 +18,22 @@ package kafka.utils
import java.nio.charset.StandardCharsets
-import org.junit.Assert._
-import org.junit.Test
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node._
+import kafka.utils.JsonTest.TestObject
import kafka.utils.json.JsonValue
+import org.junit.Assert._
+import org.junit.Test
import scala.collection.JavaConverters._
import scala.collection.Map
+object JsonTest {
+ case class TestObject(@JsonProperty("foo") foo: String, @JsonProperty("bar") bar: Int)
+}
+
class JsonTest {
@Test
@@ -125,5 +132,23 @@ class JsonTest {
assertEquals(""""str1\\,str2"""", new String(Json.encodeAsBytes("""str1\,str2"""), StandardCharsets.UTF_8))
assertEquals(""""\"quoted\""""", new String(Json.encodeAsBytes(""""quoted""""), StandardCharsets.UTF_8))
}
-
+
+ @Test
+ def testParseTo() = {
+ val foo = "baz"
+ val bar = 1
+
+ val result = Json.parseStringAs[TestObject](s"""{"foo": "$foo", "bar": $bar}""")
+
+ assertTrue(result.isRight)
+ assertEquals(TestObject(foo, bar), result.right.get)
+ }
+
+ @Test
+ def testParseToWithInvalidJson() = {
+ val result = Json.parseStringAs[TestObject]("{invalid json}")
+
+ assertTrue(result.isLeft)
+ assertEquals(classOf[JsonParseException], result.left.get.getClass)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala b/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala
new file mode 100644
index 0000000..2f3456f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import java.nio.charset.StandardCharsets
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import org.apache.kafka.common.TopicPartition
+import org.junit.Assert._
+import org.junit.Test
+
+class ReassignPartitionsZNodeTest {
+
+ private val topic = "foo"
+ private val partition1 = 0
+ private val replica1 = 1
+ private val replica2 = 2
+
+ private val reassignPartitionData = Map(new TopicPartition(topic, partition1) -> Seq(replica1, replica2))
+ private val reassignmentJson = """{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[1,2]}]}"""
+
+ @Test
+ def testEncode() {
+ val encodedJsonString = new String(ReassignPartitionsZNode.encode(reassignPartitionData), StandardCharsets.UTF_8)
+ assertEquals(reassignmentJson, encodedJsonString)
+ }
+
+ @Test
+ def testDecodeInvalidJson() {
+ val result = ReassignPartitionsZNode.decode("invalid json".getBytes)
+ assertTrue(result.isLeft)
+ assertTrue(result.left.get.isInstanceOf[JsonProcessingException])
+ }
+
+ @Test
+ def testDecodeValidJson() {
+ val result = ReassignPartitionsZNode.decode(reassignmentJson.getBytes)
+ assertTrue(result.isRight)
+ val assignmentMap = result.right.get
+ assertEquals(Seq(replica1, replica2), assignmentMap(new TopicPartition(topic, partition1)))
+ }
+}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.