You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/07/31 20:14:37 UTC

spark git commit: [SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0

Repository: spark
Updated Branches:
  refs/heads/master 1223a201f -> e82784d13


[SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0

## What changes were proposed in this pull request?

This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated.

## How was this patch tested?

This PR uses existing Kafka related unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: tedyu <yu...@gmail.com>

Closes #21488 from tedyu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e82784d1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e82784d1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e82784d1

Branch: refs/heads/master
Commit: e82784d13fac7d45164dfadb00d3fa43e64e0bde
Parents: 1223a20
Author: tedyu <yu...@gmail.com>
Authored: Tue Jul 31 13:14:14 2018 -0700
Committer: zsxwing <zs...@gmail.com>
Committed: Tue Jul 31 13:14:14 2018 -0700

----------------------------------------------------------------------
 external/kafka-0-10-sql/pom.xml                 | 24 +++++++++++--
 .../kafka010/KafkaContinuousSourceSuite.scala   |  1 +
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  7 +++-
 .../spark/sql/kafka010/KafkaTestUtils.scala     | 36 +++++++++++++-------
 4 files changed, 53 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 16bbc6d..9550003 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -29,10 +29,10 @@
   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
   <properties>
     <sbt.project.name>sql-kafka-0-10</sbt.project.name>
-    <kafka.version>0.10.0.1</kafka.version>
+    <kafka.version>2.0.0</kafka.version>
   </properties>
   <packaging>jar</packaging>
-  <name>Kafka 0.10 Source for Structured Streaming</name>
+  <name>Kafka 0.10+ Source for Structured Streaming</name>
   <url>http://spark.apache.org/</url>
 
   <dependencies>
@@ -73,6 +73,20 @@
       <artifactId>kafka_${scala.binary.version}</artifactId>
       <version>${kafka.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>net.sf.jopt-simple</groupId>
@@ -80,6 +94,12 @@
       <version>3.2</version>
       <scope>test</scope>
     </dependency>
+     <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-servlet</artifactId>
+        <version>${jetty.version}</version>
+        <scope>test</scope>
+      </dependency>
     <dependency>
       <groupId>org.scalacheck</groupId>
       <artifactId>scalacheck_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index aab8ec4..ea2a2a8 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -42,6 +42,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
       .format("kafka")
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.default.api.timeout.ms", "3000")
       .option("subscribePattern", s"$topicPrefix-.*")
       .option("failOnDataLoss", "false")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 5d5e573..aa89868 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -290,6 +290,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
       .format("kafka")
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.default.api.timeout.ms", "3000")
       .option("subscribePattern", s"$topicPrefix-.*")
       .option("failOnDataLoss", "false")
 
@@ -467,6 +468,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
       .format("kafka")
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.default.api.timeout.ms", "3000")
       .option("subscribe", topic)
       // If a topic is deleted and we try to poll data starting from offset 0,
       // the Kafka consumer will just block until timeout and return an empty result.
@@ -1103,6 +1105,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
         .option("kafka.metadata.max.age.ms", "1")
         .option("subscribePattern", "stress.*")
         .option("failOnDataLoss", "false")
+        .option("kafka.default.api.timeout.ms", "3000")
         .load()
         .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
         .as[(String, String)]
@@ -1173,7 +1176,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
         // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
         // least 30 seconds.
         props.put("log.cleaner.backoff.ms", "100")
-        props.put("log.segment.bytes", "40")
+        // The size of RecordBatch V2 increases to support transactional write.
+        props.put("log.segment.bytes", "70")
         props.put("log.retention.bytes", "40")
         props.put("log.retention.check.interval.ms", "100")
         props.put("delete.retention.ms", "10")
@@ -1215,6 +1219,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
       .format("kafka")
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.default.api.timeout.ms", "3000")
       .option("subscribePattern", "failOnDataLoss.*")
       .option("startingOffsets", "earliest")
       .option("failOnDataLoss", "false")

http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 7524594..8229490 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -29,12 +29,15 @@ import scala.util.Random
 
 import kafka.admin.AdminUtils
 import kafka.api.Request
-import kafka.common.TopicAndPartition
-import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.ZkUtils
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 import org.scalatest.concurrent.Eventually._
@@ -61,6 +64,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
   private var zookeeper: EmbeddedZookeeper = _
 
   private var zkUtils: ZkUtils = _
+  private var adminClient: AdminClient = null
 
   // Kafka broker related configurations
   private val brokerHost = "localhost"
@@ -113,17 +117,23 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
       brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
       server = new KafkaServer(brokerConf)
       server.startup()
-      brokerPort = server.boundPort()
+      brokerPort = server.boundPort(new ListenerName("PLAINTEXT"))
       (server, brokerPort)
     }, new SparkConf(), "KafkaBroker")
 
     brokerReady = true
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
+    adminClient = AdminClient.create(props)
   }
 
   /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
   def setup(): Unit = {
     setupEmbeddedZookeeper()
     setupEmbeddedKafkaServer()
+    eventually(timeout(60.seconds)) {
+      assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds")
+    }
   }
 
   /** Teardown the whole servers, including Kafka broker and Zookeeper */
@@ -203,7 +213,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
 
   /** Add new partitions to a Kafka topic */
   def addPartitions(topic: String, partitions: Int): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic, partitions)
+    adminClient.createPartitions(
+      Map(topic -> NewPartitions.increaseTo(partitions)).asJava,
+      new CreatePartitionsOptions)
     // wait until metadata is propagated
     (0 until partitions).foreach { p =>
       waitUntilMetadataIsPropagated(topic, p)
@@ -296,6 +308,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
     props.put("replica.socket.timeout.ms", "1500")
     props.put("delete.topic.enable", "true")
     props.put("offsets.topic.num.partitions", "1")
+    props.put("offsets.topic.replication.factor", "1")
+    props.put("group.initial.rebalance.delay.ms", "10")
     // Can not use properties.putAll(propsMap.asJava) in scala-2.12
     // See https://github.com/scala/bug/issues/10418
     withBrokerProps.foreach { case (k, v) => props.put(k, v) }
@@ -327,7 +341,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
       topic: String,
       numPartitions: Int,
       servers: Seq[KafkaServer]): Unit = {
-    val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+    val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
 
     import ZkUtils._
     // wait until admin path for delete topic is deleted, signaling completion of topic deletion
@@ -337,7 +351,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
     assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
     // ensure that the topic-partition has been deleted from all brokers' replica managers
     assert(servers.forall(server => topicAndPartitions.forall(tp =>
-      server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+      server.replicaManager.getPartition(tp) == None)),
       s"topic $topic still exists in the replica manager")
     // ensure that logs from all replicas are deleted if delete topic is marked successful
     assert(servers.forall(server => topicAndPartitions.forall(tp =>
@@ -345,8 +359,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
       s"topic $topic still exists in log mananger")
     // ensure that topic is removed from all cleaner offsets
     assert(servers.forall(server => topicAndPartitions.forall { tp =>
-      val checkpoints = server.getLogManager().logDirs.map { logDir =>
-        new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+      val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
+        new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
       }
       checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
     }), s"checkpoint for topic $topic still exists")
@@ -379,11 +393,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
   private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
     def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
       case Some(partitionState) =>
-        val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-
         zkUtils.getLeaderForPartition(topic, partition).isDefined &&
-          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
-          leaderAndInSyncReplicas.isr.nonEmpty
+          Request.isValidBrokerId(partitionState.basePartitionState.leader) &&
+          !partitionState.basePartitionState.replicas.isEmpty
 
       case _ =>
         false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org