You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/02/16 22:30:26 UTC
[1/2] spark git commit: [SPARK-23362][SS] Migrate Kafka Microbatch
source to v2
Repository: spark
Updated Branches:
refs/heads/master c5857e496 -> 0a73aa31f
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
deleted file mode 100644
index 02c8764..0000000
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ /dev/null
@@ -1,1122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.io._
-import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.{Files, Paths}
-import java.util.{Locale, Properties}
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Dataset, ForeachWriter}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
-import org.apache.spark.sql.functions.{count, window}
-import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
-import org.apache.spark.sql.streaming.util.StreamManualClock
-import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
-import org.apache.spark.util.Utils
-
-abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
-
- protected var testUtils: KafkaTestUtils = _
-
- override val streamingTimeout = 30.seconds
-
- protected val brokerProps = Map[String, Object]()
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- testUtils = new KafkaTestUtils(brokerProps)
- testUtils.setup()
- }
-
- override def afterAll(): Unit = {
- if (testUtils != null) {
- testUtils.teardown()
- testUtils = null
- }
- super.afterAll()
- }
-
- protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
- // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure
- // its "getOffset" is called before pushing any data. Otherwise, because of the race condition,
- // we don't know which data should be fetched when `startingOffsets` is latest.
- q match {
- case c: ContinuousExecution => c.awaitEpoch(0)
- case m: MicroBatchExecution => m.processAllAvailable()
- }
- true
- }
-
- protected def setTopicPartitions(topic: String, newCount: Int, query: StreamExecution) : Unit = {
- testUtils.addPartitions(topic, newCount)
- }
-
- /**
- * Add data to Kafka.
- *
- * `topicAction` can be used to run actions for each topic before inserting data.
- */
- case class AddKafkaData(topics: Set[String], data: Int*)
- (implicit ensureDataInMultiplePartition: Boolean = false,
- concurrent: Boolean = false,
- message: String = "",
- topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData {
-
- override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
- query match {
- // Make sure no Spark job is running when deleting a topic
- case Some(m: MicroBatchExecution) => m.processAllAvailable()
- case _ =>
- }
-
- val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
- val newTopics = topics.diff(existingTopics.keySet)
- for (newTopic <- newTopics) {
- topicAction(newTopic, None)
- }
- for (existingTopicPartitions <- existingTopics) {
- topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2))
- }
-
- require(
- query.nonEmpty,
- "Cannot add data when there is no query for finding the active kafka source")
-
- val sources = query.get.logicalPlan.collect {
- case StreamingExecutionRelation(source: KafkaSource, _) => source
- } ++ (query.get.lastExecution match {
- case null => Seq()
- case e => e.logical.collect {
- case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
- }
- })
- if (sources.isEmpty) {
- throw new Exception(
- "Could not find Kafka source in the StreamExecution logical plan to add data to")
- } else if (sources.size > 1) {
- throw new Exception(
- "Could not select the Kafka source in the StreamExecution logical plan as there" +
- "are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
- }
- val kafkaSource = sources.head
- val topic = topics.toSeq(Random.nextInt(topics.size))
- val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString }.toArray)
-
- def metadataToStr(m: (String, RecordMetadata)): String = {
- s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}"
- }
- // Verify that the test data gets inserted into multiple partitions
- if (ensureDataInMultiplePartition) {
- require(
- sentMetadata.groupBy(_._2.partition).size > 1,
- s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}")
- }
-
- val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics))
- logInfo(s"Added data, expected offset $offset")
- (kafkaSource, offset)
- }
-
- override def toString: String =
- s"AddKafkaData(topics = $topics, data = $data, message = $message)"
- }
-
- private val topicId = new AtomicInteger(0)
- protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
-}
-
-class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
-
- import testImplicits._
-
- test("(de)serialization of initial offsets") {
- val topic = newTopic()
- testUtils.createTopic(topic, partitions = 5)
-
- val reader = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("subscribe", topic)
-
- testStream(reader.load)(
- makeSureGetOffsetCalled,
- StopStream,
- StartStream(),
- StopStream)
- }
-
- test("maxOffsetsPerTrigger") {
- val topic = newTopic()
- testUtils.createTopic(topic, partitions = 3)
- testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
- testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
- testUtils.sendMessages(topic, Array("1"), Some(2))
-
- val reader = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("maxOffsetsPerTrigger", 10)
- .option("subscribe", topic)
- .option("startingOffsets", "earliest")
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
-
- val clock = new StreamManualClock
-
- val waitUntilBatchProcessed = AssertOnQuery { q =>
- eventually(Timeout(streamingTimeout)) {
- if (!q.exception.isDefined) {
- assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
- }
- }
- if (q.exception.isDefined) {
- throw q.exception.get
- }
- true
- }
-
- testStream(mapped)(
- StartStream(ProcessingTime(100), clock),
- waitUntilBatchProcessed,
- // 1 from smallest, 1 from middle, 8 from biggest
- CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
- AdvanceManualClock(100),
- waitUntilBatchProcessed,
- // smallest now empty, 1 more from middle, 9 more from biggest
- CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
- 11, 108, 109, 110, 111, 112, 113, 114, 115, 116
- ),
- StopStream,
- StartStream(ProcessingTime(100), clock),
- waitUntilBatchProcessed,
- // smallest now empty, 1 more from middle, 9 more from biggest
- CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
- 11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
- 12, 117, 118, 119, 120, 121, 122, 123, 124, 125
- ),
- AdvanceManualClock(100),
- waitUntilBatchProcessed,
- // smallest now empty, 1 more from middle, 9 more from biggest
- CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
- 11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
- 12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
- 13, 126, 127, 128, 129, 130, 131, 132, 133, 134
- )
- )
- }
-
- test("input row metrics") {
- val topic = newTopic()
- testUtils.createTopic(topic, partitions = 5)
- testUtils.sendMessages(topic, Array("-1"))
- require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
- val kafka = spark
- .readStream
- .format("kafka")
- .option("subscribe", topic)
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
-
- val mapped = kafka.map(kv => kv._2.toInt + 1)
- testStream(mapped)(
- StartStream(trigger = ProcessingTime(1)),
- makeSureGetOffsetCalled,
- AddKafkaData(Set(topic), 1, 2, 3),
- CheckAnswer(2, 3, 4),
- AssertOnQuery { query =>
- val recordsRead = query.recentProgress.map(_.numInputRows).sum
- recordsRead == 3
- }
- )
- }
-
- test("subscribing topic by pattern with topic deletions") {
- val topicPrefix = newTopic()
- val topic = topicPrefix + "-seems"
- val topic2 = topicPrefix + "-bad"
- testUtils.createTopic(topic, partitions = 5)
- testUtils.sendMessages(topic, Array("-1"))
- require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
- val reader = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("subscribePattern", s"$topicPrefix-.*")
- .option("failOnDataLoss", "false")
-
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val mapped = kafka.map(kv => kv._2.toInt + 1)
-
- testStream(mapped)(
- makeSureGetOffsetCalled,
- AddKafkaData(Set(topic), 1, 2, 3),
- CheckAnswer(2, 3, 4),
- Assert {
- testUtils.deleteTopic(topic)
- testUtils.createTopic(topic2, partitions = 5)
- true
- },
- AddKafkaData(Set(topic2), 4, 5, 6),
- CheckAnswer(2, 3, 4, 5, 6, 7)
- )
- }
-
- testWithUninterruptibleThread(
- "deserialization of initial offset with Spark 2.1.0") {
- withTempDir { metadataPath =>
- val topic = newTopic
- testUtils.createTopic(topic, partitions = 3)
-
- val provider = new KafkaSourceProvider
- val parameters = Map(
- "kafka.bootstrap.servers" -> testUtils.brokerAddress,
- "subscribe" -> topic
- )
- val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
- "", parameters)
- source.getOffset.get // Write initial offset
-
- // Make sure Spark 2.1.0 will throw an exception when reading the new log
- intercept[java.lang.IllegalArgumentException] {
- // Simulate how Spark 2.1.0 reads the log
- Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
- val length = in.read()
- val bytes = new Array[Byte](length)
- in.read(bytes)
- KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
- }
- }
- }
- }
-
- testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") {
- withTempDir { metadataPath =>
- val topic = "kafka-initial-offset-2-1-0"
- testUtils.createTopic(topic, partitions = 3)
-
- val provider = new KafkaSourceProvider
- val parameters = Map(
- "kafka.bootstrap.servers" -> testUtils.brokerAddress,
- "subscribe" -> topic
- )
-
- val from = new File(
- getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
- val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
- Files.copy(from, to)
-
- val source = provider.createSource(
- spark.sqlContext, metadataPath.toURI.toString, None, "", parameters)
- val deserializedOffset = source.getOffset.get
- val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
- assert(referenceOffset == deserializedOffset)
- }
- }
-
- testWithUninterruptibleThread("deserialization of initial offset written by future version") {
- withTempDir { metadataPath =>
- val futureMetadataLog =
- new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
- metadataPath.getAbsolutePath) {
- override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
- out.write(0)
- val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
- writer.write(s"v99999\n${metadata.json}")
- writer.flush
- }
- }
-
- val topic = newTopic
- testUtils.createTopic(topic, partitions = 3)
- val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
- futureMetadataLog.add(0, offset)
-
- val provider = new KafkaSourceProvider
- val parameters = Map(
- "kafka.bootstrap.servers" -> testUtils.brokerAddress,
- "subscribe" -> topic
- )
- val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
- "", parameters)
-
- val e = intercept[java.lang.IllegalStateException] {
- source.getOffset.get // Read initial offset
- }
-
- Seq(
- s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999",
- "produced by a newer version of Spark and cannot be read by this version"
- ).foreach { message =>
- assert(e.getMessage.contains(message))
- }
- }
- }
-
- test("KafkaSource with watermark") {
- val now = System.currentTimeMillis()
- val topic = newTopic()
- testUtils.createTopic(newTopic(), partitions = 1)
- testUtils.sendMessages(topic, Array(1).map(_.toString))
-
- val kafka = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("startingOffsets", s"earliest")
- .option("subscribe", topic)
- .load()
-
- val windowedAggregation = kafka
- .withWatermark("timestamp", "10 seconds")
- .groupBy(window($"timestamp", "5 seconds") as 'window)
- .agg(count("*") as 'count)
- .select($"window".getField("start") as 'window, $"count")
-
- val query = windowedAggregation
- .writeStream
- .format("memory")
- .outputMode("complete")
- .queryName("kafkaWatermark")
- .start()
- query.processAllAvailable()
- val rows = spark.table("kafkaWatermark").collect()
- assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
- val row = rows(0)
- // We cannot check the exact window start time as it depands on the time that messages were
- // inserted by the producer. So here we just use a low bound to make sure the internal
- // conversion works.
- assert(
- row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
- s"Unexpected results: $row")
- assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
- query.stop()
- }
-
- test("delete a topic when a Spark job is running") {
- KafkaSourceSuite.collectedData.clear()
-
- val topic = newTopic()
- testUtils.createTopic(topic, partitions = 1)
- testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)
-
- val reader = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("subscribe", topic)
- // If a topic is deleted and we try to poll data starting from offset 0,
- // the Kafka consumer will just block until timeout and return an empty result.
- // So set the timeout to 1 second to make this test fast.
- .option("kafkaConsumer.pollTimeoutMs", "1000")
- .option("startingOffsets", "earliest")
- .option("failOnDataLoss", "false")
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- KafkaSourceSuite.globalTestUtils = testUtils
- // The following ForeachWriter will delete the topic before fetching data from Kafka
- // in executors.
- val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {
- override def open(partitionId: Long, version: Long): Boolean = {
- KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
- true
- }
-
- override def process(value: Int): Unit = {
- KafkaSourceSuite.collectedData.add(value)
- }
-
- override def close(errorOrNull: Throwable): Unit = {}
- }).start()
- query.processAllAvailable()
- query.stop()
- // `failOnDataLoss` is `false`, we should not fail the query
- assert(query.exception.isEmpty)
- }
-
- test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") {
- def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = {
- val topic = newTopic()
- testUtils.createTopic(topic, partitions = 1)
- testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
-
- val reader = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("maxOffsetsPerTrigger", 5)
- .option("subscribe", topic)
- .option("startingOffsets", "earliest")
-
- reader.load()
- .selectExpr("CAST(value AS STRING)")
- .as[String]
- .map(k => k.toInt)
- }
-
- val df1 = getSpecificDF(0 to 9)
- val df2 = getSpecificDF(100 to 199)
-
- val kafka = df1.union(df2)
-
- val clock = new StreamManualClock
-
- val waitUntilBatchProcessed = AssertOnQuery { q =>
- eventually(Timeout(streamingTimeout)) {
- if (!q.exception.isDefined) {
- assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
- }
- }
- if (q.exception.isDefined) {
- throw q.exception.get
- }
- true
- }
-
- testStream(kafka)(
- StartStream(ProcessingTime(100), clock),
- waitUntilBatchProcessed,
- // 5 from smaller topic, 5 from bigger one
- CheckLastBatch((0 to 4) ++ (100 to 104): _*),
- AdvanceManualClock(100),
- waitUntilBatchProcessed,
- // 5 from smaller topic, 5 from bigger one
- CheckLastBatch((5 to 9) ++ (105 to 109): _*),
- AdvanceManualClock(100),
- waitUntilBatchProcessed,
- // smaller topic empty, 5 from bigger one
- CheckLastBatch(110 to 114: _*),
- StopStream,
- StartStream(ProcessingTime(100), clock),
- waitUntilBatchProcessed,
- // smallest now empty, 5 from bigger one
- CheckLastBatch(115 to 119: _*),
- AdvanceManualClock(100),
- waitUntilBatchProcessed,
- // smallest now empty, 5 from bigger one
- CheckLastBatch(120 to 124: _*)
- )
- }
-}
-
-abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
-
- import testImplicits._
-
- test("cannot stop Kafka stream") {
- val topic = newTopic()
- testUtils.createTopic(topic, partitions = 5)
- testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
-
- val reader = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("subscribePattern", s"$topic.*")
-
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val mapped = kafka.map(kv => kv._2.toInt + 1)
-
- testStream(mapped)(
- makeSureGetOffsetCalled,
- StopStream
- )
- }
-
- for (failOnDataLoss <- Seq(true, false)) {
- test(s"assign from latest offsets (failOnDataLoss: $failOnDataLoss)") {
- val topic = newTopic()
- testFromLatestOffsets(
- topic,
- addPartitions = false,
- failOnDataLoss = failOnDataLoss,
- "assign" -> assignString(topic, 0 to 4))
- }
-
- test(s"assign from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
- val topic = newTopic()
- testFromEarliestOffsets(
- topic,
- addPartitions = false,
- failOnDataLoss = failOnDataLoss,
- "assign" -> assignString(topic, 0 to 4))
- }
-
- test(s"assign from specific offsets (failOnDataLoss: $failOnDataLoss)") {
- val topic = newTopic()
- testFromSpecificOffsets(
- topic,
- failOnDataLoss = failOnDataLoss,
- "assign" -> assignString(topic, 0 to 4),
- "failOnDataLoss" -> failOnDataLoss.toString)
- }
-
- test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") {
- val topic = newTopic()
- testFromLatestOffsets(
- topic,
- addPartitions = true,
- failOnDataLoss = failOnDataLoss,
- "subscribe" -> topic)
- }
-
- test(s"subscribing topic by name from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
- val topic = newTopic()
- testFromEarliestOffsets(
- topic,
- addPartitions = true,
- failOnDataLoss = failOnDataLoss,
- "subscribe" -> topic)
- }
-
- test(s"subscribing topic by name from specific offsets (failOnDataLoss: $failOnDataLoss)") {
- val topic = newTopic()
- testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, "subscribe" -> topic)
- }
-
- test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") {
- val topicPrefix = newTopic()
- val topic = topicPrefix + "-suffix"
- testFromLatestOffsets(
- topic,
- addPartitions = true,
- failOnDataLoss = failOnDataLoss,
- "subscribePattern" -> s"$topicPrefix-.*")
- }
-
- test(s"subscribing topic by pattern from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
- val topicPrefix = newTopic()
- val topic = topicPrefix + "-suffix"
- testFromEarliestOffsets(
- topic,
- addPartitions = true,
- failOnDataLoss = failOnDataLoss,
- "subscribePattern" -> s"$topicPrefix-.*")
- }
-
- test(s"subscribing topic by pattern from specific offsets (failOnDataLoss: $failOnDataLoss)") {
- val topicPrefix = newTopic()
- val topic = topicPrefix + "-suffix"
- testFromSpecificOffsets(
- topic,
- failOnDataLoss = failOnDataLoss,
- "subscribePattern" -> s"$topicPrefix-.*")
- }
- }
-
- test("bad source options") {
- def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
- val ex = intercept[IllegalArgumentException] {
- val reader = spark
- .readStream
- .format("kafka")
- options.foreach { case (k, v) => reader.option(k, v) }
- reader.load()
- }
- expectedMsgs.foreach { m =>
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT)))
- }
- }
-
- // Specifying an ending offset
- testBadOptions("endingOffsets" -> "latest")("Ending offset not valid in streaming queries")
-
- // No strategy specified
- testBadOptions()("options must be specified", "subscribe", "subscribePattern")
-
- // Multiple strategies specified
- testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
- "only one", "options can be specified")
-
- testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")(
- "only one", "options can be specified")
-
- testBadOptions("assign" -> "")("no topicpartitions to assign")
- testBadOptions("subscribe" -> "")("no topics to subscribe")
- testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
- }
-
- test("unsupported kafka configs") {
- def testUnsupportedConfig(key: String, value: String = "someValue"): Unit = {
- val ex = intercept[IllegalArgumentException] {
- val reader = spark
- .readStream
- .format("kafka")
- .option("subscribe", "topic")
- .option("kafka.bootstrap.servers", "somehost")
- .option(s"$key", value)
- reader.load()
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported"))
- }
-
- testUnsupportedConfig("kafka.group.id")
- testUnsupportedConfig("kafka.auto.offset.reset")
- testUnsupportedConfig("kafka.enable.auto.commit")
- testUnsupportedConfig("kafka.interceptor.classes")
- testUnsupportedConfig("kafka.key.deserializer")
- testUnsupportedConfig("kafka.value.deserializer")
-
- testUnsupportedConfig("kafka.auto.offset.reset", "none")
- testUnsupportedConfig("kafka.auto.offset.reset", "someValue")
- testUnsupportedConfig("kafka.auto.offset.reset", "earliest")
- testUnsupportedConfig("kafka.auto.offset.reset", "latest")
- }
-
- test("get offsets from case insensitive parameters") {
- for ((optionKey, optionValue, answer) <- Seq(
- (STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
- (ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit),
- (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
- SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) {
- val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), optionKey, answer)
- assert(offset === answer)
- }
-
- for ((optionKey, answer) <- Seq(
- (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
- (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
- val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer)
- assert(offset === answer)
- }
- }
-
- private def assignString(topic: String, partitions: Iterable[Int]): String = {
- JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
- }
-
- private def testFromSpecificOffsets(
- topic: String,
- failOnDataLoss: Boolean,
- options: (String, String)*): Unit = {
- val partitionOffsets = Map(
- new TopicPartition(topic, 0) -> -2L,
- new TopicPartition(topic, 1) -> -1L,
- new TopicPartition(topic, 2) -> 0L,
- new TopicPartition(topic, 3) -> 1L,
- new TopicPartition(topic, 4) -> 2L
- )
- val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
-
- testUtils.createTopic(topic, partitions = 5)
- // part 0 starts at earliest, these should all be seen
- testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
- // part 1 starts at latest, these should all be skipped
- testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
- // part 2 starts at 0, these should all be seen
- testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
- // part 3 starts at 1, first should be skipped
- testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
- // part 4 starts at 2, first and second should be skipped
- testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
- require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
- val reader = spark
- .readStream
- .format("kafka")
- .option("startingOffsets", startingOffsets)
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("failOnDataLoss", failOnDataLoss.toString)
- options.foreach { case (k, v) => reader.option(k, v) }
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
-
- testStream(mapped)(
- makeSureGetOffsetCalled,
- Execute { q =>
- // wait to reach the last offset in every partition
- q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)))
- },
- CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
- StopStream,
- StartStream(),
- CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery
- AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true),
- CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34),
- StopStream
- )
- }
-
- test("Kafka column types") {
- val now = System.currentTimeMillis()
- val topic = newTopic()
- testUtils.createTopic(newTopic(), partitions = 1)
- testUtils.sendMessages(topic, Array(1).map(_.toString))
-
- val kafka = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("startingOffsets", s"earliest")
- .option("subscribe", topic)
- .load()
-
- val query = kafka
- .writeStream
- .format("memory")
- .queryName("kafkaColumnTypes")
- .trigger(defaultTrigger)
- .start()
- eventually(timeout(streamingTimeout)) {
- assert(spark.table("kafkaColumnTypes").count == 1,
- s"Unexpected results: ${spark.table("kafkaColumnTypes").collectAsList()}")
- }
- val row = spark.table("kafkaColumnTypes").head()
- assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
- assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
- assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
- assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row")
- assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
- // We cannot check the exact timestamp as it's the time that messages were inserted by the
- // producer. So here we just use a low bound to make sure the internal conversion works.
- assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
- assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
- query.stop()
- }
-
- private def testFromLatestOffsets(
- topic: String,
- addPartitions: Boolean,
- failOnDataLoss: Boolean,
- options: (String, String)*): Unit = {
- testUtils.createTopic(topic, partitions = 5)
- testUtils.sendMessages(topic, Array("-1"))
- require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
- val reader = spark
- .readStream
- .format("kafka")
- .option("startingOffsets", s"latest")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("failOnDataLoss", failOnDataLoss.toString)
- options.foreach { case (k, v) => reader.option(k, v) }
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val mapped = kafka.map(kv => kv._2.toInt + 1)
-
- testStream(mapped)(
- makeSureGetOffsetCalled,
- AddKafkaData(Set(topic), 1, 2, 3),
- CheckAnswer(2, 3, 4),
- StopStream,
- StartStream(),
- CheckAnswer(2, 3, 4), // Should get the data back on recovery
- StopStream,
- AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
- StartStream(),
- CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
- AddKafkaData(Set(topic), 7, 8),
- CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
- AssertOnQuery("Add partitions") { query: StreamExecution =>
- if (addPartitions) setTopicPartitions(topic, 10, query)
- true
- },
- AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
- CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
- )
- }
-
- private def testFromEarliestOffsets(
- topic: String,
- addPartitions: Boolean,
- failOnDataLoss: Boolean,
- options: (String, String)*): Unit = {
- testUtils.createTopic(topic, partitions = 5)
- testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
- require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
- val reader = spark.readStream
- reader
- .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
- .option("startingOffsets", s"earliest")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("failOnDataLoss", failOnDataLoss.toString)
- options.foreach { case (k, v) => reader.option(k, v) }
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val mapped = kafka.map(kv => kv._2.toInt + 1)
-
- testStream(mapped)(
- AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
- CheckAnswer(2, 3, 4, 5, 6, 7),
- StopStream,
- StartStream(),
- CheckAnswer(2, 3, 4, 5, 6, 7),
- StopStream,
- AddKafkaData(Set(topic), 7, 8),
- StartStream(),
- CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
- AssertOnQuery("Add partitions") { query: StreamExecution =>
- if (addPartitions) setTopicPartitions(topic, 10, query)
- true
- },
- AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
- CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
- )
- }
-}
-
-object KafkaSourceSuite {
- @volatile var globalTestUtils: KafkaTestUtils = _
- val collectedData = new ConcurrentLinkedQueue[Any]()
-}
-
-
-class KafkaSourceStressSuite extends KafkaSourceTest {
-
- import testImplicits._
-
- val topicId = new AtomicInteger(1)
-
- @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic)
-
- def newStressTopic: String = s"stress${topicId.getAndIncrement()}"
-
- private def nextInt(start: Int, end: Int): Int = {
- start + Random.nextInt(start + end - 1)
- }
-
- test("stress test with multiple topics and partitions") {
- topics.foreach { topic =>
- testUtils.createTopic(topic, partitions = nextInt(1, 6))
- testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
- }
-
- // Create Kafka source that reads from latest offset
- val kafka =
- spark.readStream
- .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("subscribePattern", "stress.*")
- .option("failOnDataLoss", "false")
- .load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
-
- val mapped = kafka.map(kv => kv._2.toInt + 1)
-
- runStressTest(
- mapped,
- Seq(makeSureGetOffsetCalled),
- (d, running) => {
- Random.nextInt(5) match {
- case 0 => // Add a new topic
- topics = topics ++ Seq(newStressTopic)
- AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic",
- topicAction = (topic, partition) => {
- if (partition.isEmpty) {
- testUtils.createTopic(topic, partitions = nextInt(1, 6))
- }
- })
- case 1 if running =>
- // Only delete a topic when the query is running. Otherwise, we may lost data and
- // cannot check the correctness.
- val deletedTopic = topics(Random.nextInt(topics.size))
- if (deletedTopic != topics.head) {
- topics = topics.filterNot(_ == deletedTopic)
- }
- AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic $deletedTopic",
- topicAction = (topic, partition) => {
- // Never remove the first topic to make sure we have at least one topic
- if (topic == deletedTopic && deletedTopic != topics.head) {
- testUtils.deleteTopic(deletedTopic)
- }
- })
- case 2 => // Add new partitions
- AddKafkaData(topics.toSet, d: _*)(message = "Add partition",
- topicAction = (topic, partition) => {
- testUtils.addPartitions(topic, partition.get + nextInt(1, 6))
- })
- case _ => // Just add new data
- AddKafkaData(topics.toSet, d: _*)
- }
- },
- iterations = 50)
- }
-}
-
-class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext {
-
- import testImplicits._
-
- private var testUtils: KafkaTestUtils = _
-
- private val topicId = new AtomicInteger(0)
-
- private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
-
- override def createSparkSession(): TestSparkSession = {
- // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
- new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
- }
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- testUtils = new KafkaTestUtils {
- override def brokerConfiguration: Properties = {
- val props = super.brokerConfiguration
- // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
- // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
- // least 30 seconds.
- props.put("log.cleaner.backoff.ms", "100")
- props.put("log.segment.bytes", "40")
- props.put("log.retention.bytes", "40")
- props.put("log.retention.check.interval.ms", "100")
- props.put("delete.retention.ms", "10")
- props.put("log.flush.scheduler.interval.ms", "10")
- props
- }
- }
- testUtils.setup()
- }
-
- override def afterAll(): Unit = {
- if (testUtils != null) {
- testUtils.teardown()
- testUtils = null
- super.afterAll()
- }
- }
-
- protected def startStream(ds: Dataset[Int]) = {
- ds.writeStream.foreach(new ForeachWriter[Int] {
-
- override def open(partitionId: Long, version: Long): Boolean = {
- true
- }
-
- override def process(value: Int): Unit = {
- // Slow down the processing speed so that messages may be aged out.
- Thread.sleep(Random.nextInt(500))
- }
-
- override def close(errorOrNull: Throwable): Unit = {
- }
- }).start()
- }
-
- test("stress test for failOnDataLoss=false") {
- val reader = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("subscribePattern", "failOnDataLoss.*")
- .option("startingOffsets", "earliest")
- .option("failOnDataLoss", "false")
- .option("fetchOffset.retryIntervalMs", "3000")
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val query = startStream(kafka.map(kv => kv._2.toInt))
-
- val testTime = 1.minutes
- val startTime = System.currentTimeMillis()
- // Track the current existing topics
- val topics = mutable.ArrayBuffer[String]()
- // Track topics that have been deleted
- val deletedTopics = mutable.Set[String]()
- while (System.currentTimeMillis() - testTime.toMillis < startTime) {
- Random.nextInt(10) match {
- case 0 => // Create a new topic
- val topic = newTopic()
- topics += topic
- // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
- // chance that a topic will be recreated after deletion due to the asynchronous update.
- // Hence, always overwrite to handle this race condition.
- testUtils.createTopic(topic, partitions = 1, overwrite = true)
- logInfo(s"Create topic $topic")
- case 1 if topics.nonEmpty => // Delete an existing topic
- val topic = topics.remove(Random.nextInt(topics.size))
- testUtils.deleteTopic(topic)
- logInfo(s"Delete topic $topic")
- deletedTopics += topic
- case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted.
- val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
- deletedTopics -= topic
- topics += topic
- // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
- // chance that a topic will be recreated after deletion due to the asynchronous update.
- // Hence, always overwrite to handle this race condition.
- testUtils.createTopic(topic, partitions = 1, overwrite = true)
- logInfo(s"Create topic $topic")
- case 3 =>
- Thread.sleep(1000)
- case _ => // Push random messages
- for (topic <- topics) {
- val size = Random.nextInt(10)
- for (_ <- 0 until size) {
- testUtils.sendMessages(topic, Array(Random.nextInt(10).toString))
- }
- }
- }
- // `failOnDataLoss` is `false`, we should not fail the query
- if (query.exception.nonEmpty) {
- throw query.exception.get
- }
- }
-
- query.stop()
- // `failOnDataLoss` is `false`, we should not fail the query
- if (query.exception.nonEmpty) {
- throw query.exception.get
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f24fd7f..e75e1d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1146,10 +1146,20 @@ object SQLConf {
val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
.internal()
.doc("A comma-separated list of fully qualified data source register class names for which" +
- " StreamWriteSupport is disabled. Writes to these sources will fail back to the V1 Sink.")
+ " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.")
.stringConf
.createWithDefault("")
+ val DISABLED_V2_STREAMING_MICROBATCH_READERS =
+ buildConf("spark.sql.streaming.disabledV2MicroBatchReaders")
+ .internal()
+ .doc(
+ "A comma-separated list of fully qualified data source register class names for which " +
+ "MicroBatchReadSupport is disabled. Reads from these sources will fall back to the " +
+ "V1 Sources.")
+ .stringConf
+ .createWithDefault("")
+
object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}
@@ -1525,6 +1535,9 @@ class SQLConf extends Serializable with Logging {
def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
+ def disabledV2StreamingMicroBatchReaders: String =
+ getConf(DISABLED_V2_STREAMING_MICROBATCH_READERS)
+
def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index ac73ba3..8465501 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -72,27 +72,36 @@ class MicroBatchExecution(
// Note that we have to use the previous `output` as attributes in StreamingExecutionRelation,
// since the existing logical plan has already used those attributes. The per-microbatch
// transformation is responsible for replacing attributes with their final values.
+
+ val disabledSources =
+ sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
+
val _logicalPlan = analyzedPlan.transform {
- case streamingRelation@StreamingRelation(dataSource, _, output) =>
+ case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- val source = dataSource.createSource(metadataPath)
+ val source = dataSourceV1.createSource(metadataPath)
nextSourceId += 1
+ logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]")
StreamingExecutionRelation(source, output)(sparkSession)
})
- case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) =>
+ case s @ StreamingRelationV2(
+ dataSourceV2: MicroBatchReadSupport, sourceName, options, output, _) if
+ !disabledSources.contains(dataSourceV2.getClass.getCanonicalName) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- val reader = source.createMicroBatchReader(
+ val reader = dataSourceV2.createMicroBatchReader(
Optional.empty(), // user specified schema
metadataPath,
new DataSourceOptions(options.asJava))
nextSourceId += 1
+ logInfo(s"Using MicroBatchReader [$reader] from " +
+ s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
StreamingExecutionRelation(reader, output)(sparkSession)
})
- case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
+ case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
@@ -102,6 +111,7 @@ class MicroBatchExecution(
}
val source = v1Relation.get.dataSource.createSource(metadataPath)
nextSourceId += 1
+ logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dataSourceV2]")
StreamingExecutionRelation(source, output)(sparkSession)
})
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: [SPARK-23362][SS] Migrate Kafka Microbatch
source to v2
Posted by td...@apache.org.
[SPARK-23362][SS] Migrate Kafka Microbatch source to v2
## What changes were proposed in this pull request?
Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2).
Performance comparison:
In a unit test with in-process Kafka broker, I tested the read throughput of V1 and V2 using 20M records in a single partition. They were comparable.
## How was this patch tested?
Existing tests, few modified to be better tests than the existing ones.
Author: Tathagata Das <ta...@gmail.com>
Closes #20554 from tdas/SPARK-23362.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a73aa31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a73aa31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a73aa31
Branch: refs/heads/master
Commit: 0a73aa31f41c83503d5d99eff3c9d7b406014ab3
Parents: c5857e4
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Feb 16 14:30:19 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Feb 16 14:30:19 2018 -0800
----------------------------------------------------------------------
dev/.rat-excludes | 1 +
.../sql/kafka010/CachedKafkaConsumer.scala | 2 +-
.../sql/kafka010/KafkaContinuousReader.scala | 29 +-
.../sql/kafka010/KafkaMicroBatchReader.scala | 403 ++++++
.../KafkaRecordToUnsafeRowConverter.scala | 52 +
.../apache/spark/sql/kafka010/KafkaSource.scala | 19 +-
.../sql/kafka010/KafkaSourceProvider.scala | 70 +-
...fka-source-initial-offset-future-version.bin | 2 +
...afka-source-initial-offset-version-2.1.0.bin | 2 +-
.../kafka010/KafkaMicroBatchSourceSuite.scala | 1222 ++++++++++++++++++
.../spark/sql/kafka010/KafkaSourceSuite.scala | 1122 ----------------
.../org/apache/spark/sql/internal/SQLConf.scala | 15 +-
.../streaming/MicroBatchExecution.scala | 20 +-
13 files changed, 1786 insertions(+), 1173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/dev/.rat-excludes
----------------------------------------------------------------------
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 243fbe3..9552d00 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -105,3 +105,4 @@ META-INF/*
spark-warehouse
structured-streaming/*
kafka-source-initial-offset-version-2.1.0.bin
+kafka-source-initial-offset-future-version.bin
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 90ed7b1..e97881c 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index b049a05..97a0f66 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
@@ -187,13 +187,9 @@ class KafkaContinuousDataReader(
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
- private val topic = topicPartition.topic
- private val kafkaPartition = topicPartition.partition
- private val consumer = CachedKafkaConsumer.createUncached(topic, kafkaPartition, kafkaParams)
-
- private val sharedRow = new UnsafeRow(7)
- private val bufferHolder = new BufferHolder(sharedRow)
- private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
+ private val consumer =
+ CachedKafkaConsumer.createUncached(topicPartition.topic, topicPartition.partition, kafkaParams)
+ private val converter = new KafkaRecordToUnsafeRowConverter
private var nextKafkaOffset = startOffset
private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
@@ -232,22 +228,7 @@ class KafkaContinuousDataReader(
}
override def get(): UnsafeRow = {
- bufferHolder.reset()
-
- if (currentRecord.key == null) {
- rowWriter.setNullAt(0)
- } else {
- rowWriter.write(0, currentRecord.key)
- }
- rowWriter.write(1, currentRecord.value)
- rowWriter.write(2, UTF8String.fromString(currentRecord.topic))
- rowWriter.write(3, currentRecord.partition)
- rowWriter.write(4, currentRecord.offset)
- rowWriter.write(5,
- DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(currentRecord.timestamp)))
- rowWriter.write(6, currentRecord.timestampType.id)
- sharedRow.setTotalSize(bufferHolder.totalSize)
- sharedRow
+ converter.toUnsafeRow(currentRecord)
}
override def getOffset(): KafkaSourcePartitionOffset = {
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
new file mode 100644
index 0000000..fb647ca
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with SupportsScanUnsafeRow with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {
+ kafkaOffsetReader.close()
+ }
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
+
+ def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+ }
+
+ val bm = SparkEnv.get.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+
+ /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */
+ class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+ extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+ val VERSION = 1
+
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+ } else {
+ throw new IllegalStateException(
+ s"Log file was malformed: failed to detect the log file version line.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
+ }
+ }
+}
+
+/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReaderFactory(
+ range: KafkaOffsetRange,
+ preferredLoc: Option[String],
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
+
+ override def preferredLocations(): Array[String] = preferredLoc.toArray
+
+ override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+}
+
+/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReader(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging {
+
+ private val consumer = CachedKafkaConsumer.getOrCreate(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ private val rangeToRead = resolveRange(offsetRange)
+ private val converter = new KafkaRecordToUnsafeRowConverter
+
+ private var nextOffset = rangeToRead.fromOffset
+ private var nextRow: UnsafeRow = _
+
+ override def next(): Boolean = {
+ if (nextOffset < rangeToRead.untilOffset) {
+ val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+ if (record != null) {
+ nextRow = converter.toUnsafeRow(record)
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ override def get(): UnsafeRow = {
+ assert(nextRow != null)
+ nextOffset += 1
+ nextRow
+ }
+
+ override def close(): Unit = {
+ // Indicate that we're no longer using this consumer
+ CachedKafkaConsumer.releaseKafkaConsumer(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ }
+
+ private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
+ if (range.fromOffset < 0 || range.untilOffset < 0) {
+ // Late bind the offset range
+ val availableOffsetRange = consumer.getAvailableOffsetRange()
+ val fromOffset = if (range.fromOffset < 0) {
+ assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
+ s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
+ availableOffsetRange.earliest
+ } else {
+ range.fromOffset
+ }
+ val untilOffset = if (range.untilOffset < 0) {
+ assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
+ s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
+ availableOffsetRange.latest
+ } else {
+ range.untilOffset
+ }
+ KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset)
+ } else {
+ range
+ }
+ }
+}
+
+private[kafka010] case class KafkaOffsetRange(
+ topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long)
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
new file mode 100644
index 0000000..1acdd56
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.unsafe.types.UTF8String
+
+/** A simple class for converting Kafka ConsumerRecord to UnsafeRow */
+private[kafka010] class KafkaRecordToUnsafeRowConverter {
+ private val sharedRow = new UnsafeRow(7)
+ private val bufferHolder = new BufferHolder(sharedRow)
+ private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
+
+ def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
+ bufferHolder.reset()
+
+ if (record.key == null) {
+ rowWriter.setNullAt(0)
+ } else {
+ rowWriter.write(0, record.key)
+ }
+ rowWriter.write(1, record.value)
+ rowWriter.write(2, UTF8String.fromString(record.topic))
+ rowWriter.write(3, record.partition)
+ rowWriter.write(4, record.offset)
+ rowWriter.write(
+ 5,
+ DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(record.timestamp)))
+ rowWriter.write(6, record.timestampType.id)
+ sharedRow.setTotalSize(bufferHolder.totalSize)
+ sharedRow
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 169a5d0..1c7b3a2 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -306,7 +307,7 @@ private[kafka010] class KafkaSource(
kafkaReader.close()
}
- override def toString(): String = s"KafkaSource[$kafkaReader]"
+ override def toString(): String = s"KafkaSourceV1[$kafkaReader]"
/**
* If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
@@ -323,22 +324,6 @@ private[kafka010] class KafkaSource(
/** Companion object for the [[KafkaSource]]. */
private[kafka010] object KafkaSource {
- val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
- """
- |Some data may have been lost because they are not available in Kafka any more; either the
- | data was aged out by Kafka or the topic may have been deleted before all the data in the
- | topic was processed. If you want your streaming query to fail on such cases, set the source
- | option "failOnDataLoss" to "true".
- """.stripMargin
-
- val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
- """
- |Some data may have been lost because they are not available in Kafka any more; either the
- | data was aged out by Kafka or the topic may have been deleted before all the data in the
- | topic was processed. If you don't want your streaming query to fail on such cases, set the
- | source option "failOnDataLoss" to "false".
- """.stripMargin
-
private[kafka010] val VERSION = 1
def getSortedExecutorList(sc: SparkContext): Array[String] = {
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index d4fa035..0aa64a6 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,13 +30,13 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
- * The provider class for the [[KafkaSource]]. This provider is designed such that it throws
+ * The provider class for all Kafka readers and writers. It is designed such that it throws
* IllegalArgumentException when the Kafka Dataset is created, so that it can catch
* missing options even before the query is started.
*/
@@ -47,6 +47,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with CreatableRelationProvider
with StreamWriteSupport
with ContinuousReadSupport
+ with MicroBatchReadSupport
with Logging {
import KafkaSourceProvider._
@@ -105,6 +106,52 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}
+ /**
+ * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches
+ * of Kafka data in a micro-batch streaming query.
+ */
+ override def createMicroBatchReader(
+ schema: Optional[StructType],
+ metadataPath: String,
+ options: DataSourceOptions): KafkaMicroBatchReader = {
+
+ val parameters = options.asMap().asScala.toMap
+ validateStreamOptions(parameters)
+ // Each running query should use its own group id. Otherwise, the query may be only assigned
+ // partial data since Kafka will assign partitions to multiple consumers having the same group
+ // id. Hence, we should generate a unique id for each query.
+ val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
+
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
+ val specifiedKafkaParams =
+ parameters
+ .keySet
+ .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
+ .map { k => k.drop(6).toString -> parameters(k) }
+ .toMap
+
+ val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
+ STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
+
+ val kafkaOffsetReader = new KafkaOffsetReader(
+ strategy(caseInsensitiveParams),
+ kafkaParamsForDriver(specifiedKafkaParams),
+ parameters,
+ driverGroupIdPrefix = s"$uniqueGroupId-driver")
+
+ new KafkaMicroBatchReader(
+ kafkaOffsetReader,
+ kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+ options,
+ metadataPath,
+ startingStreamOffsets,
+ failOnDataLoss(caseInsensitiveParams))
+ }
+
+ /**
+ * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
+ * Kafka data in a continuous streaming query.
+ */
override def createContinuousReader(
schema: Optional[StructType],
metadataPath: String,
@@ -408,8 +455,27 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+
val TOPIC_OPTION_KEY = "topic"
+ val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
+ """
+ |Some data may have been lost because they are not available in Kafka any more; either the
+ | data was aged out by Kafka or the topic may have been deleted before all the data in the
+ | topic was processed. If you want your streaming query to fail on such cases, set the source
+ | option "failOnDataLoss" to "true".
+ """.stripMargin
+
+ val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
+ """
+ |Some data may have been lost because they are not available in Kafka any more; either the
+ | data was aged out by Kafka or the topic may have been deleted before all the data in the
+ | topic was processed. If you don't want your streaming query to fail on such cases, set the
+ | source option "failOnDataLoss" to "false".
+ """.stripMargin
+
+
+
private val deserClassName = classOf[ByteArrayDeserializer].getName
def getKafkaOffsetRangeLimit(
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
new file mode 100644
index 0000000..d530773
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
@@ -0,0 +1,2 @@
+0v99999
+{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
index ae928e7..8c78d9e 100644
--- a/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
+++ b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
new file mode 100644
index 0000000..ed4ecfe
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -0,0 +1,1222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+import java.util.{Locale, Properties}
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.io.Source
+import scala.util.Random
+
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, ForeachWriter}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider._
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override val streamingTimeout = 30.seconds
+
+ protected val brokerProps = Map[String, Object]()
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils(brokerProps)
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+
+ protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
+ // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure
+ // its "getOffset" is called before pushing any data. Otherwise, because of the race condition,
+ // we don't know which data should be fetched when `startingOffsets` is latest.
+ q match {
+ case c: ContinuousExecution => c.awaitEpoch(0)
+ case m: MicroBatchExecution => m.processAllAvailable()
+ }
+ true
+ }
+
+ protected def setTopicPartitions(topic: String, newCount: Int, query: StreamExecution) : Unit = {
+ testUtils.addPartitions(topic, newCount)
+ }
+
+ /**
+ * Add data to Kafka.
+ *
+ * `topicAction` can be used to run actions for each topic before inserting data.
+ */
+ case class AddKafkaData(topics: Set[String], data: Int*)
+ (implicit ensureDataInMultiplePartition: Boolean = false,
+ concurrent: Boolean = false,
+ message: String = "",
+ topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData {
+
+ override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
+ query match {
+ // Make sure no Spark job is running when deleting a topic
+ case Some(m: MicroBatchExecution) => m.processAllAvailable()
+ case _ =>
+ }
+
+ val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
+ val newTopics = topics.diff(existingTopics.keySet)
+ for (newTopic <- newTopics) {
+ topicAction(newTopic, None)
+ }
+ for (existingTopicPartitions <- existingTopics) {
+ topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2))
+ }
+
+ require(
+ query.nonEmpty,
+ "Cannot add data when there is no query for finding the active kafka source")
+
+ val sources = {
+ query.get.logicalPlan.collect {
+ case StreamingExecutionRelation(source: KafkaSource, _) => source
+ case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
+ } ++ (query.get.lastExecution match {
+ case null => Seq()
+ case e => e.logical.collect {
+ case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+ }
+ })
+ }.distinct
+
+ if (sources.isEmpty) {
+ throw new Exception(
+ "Could not find Kafka source in the StreamExecution logical plan to add data to")
+ } else if (sources.size > 1) {
+ throw new Exception(
+ "Could not select the Kafka source in the StreamExecution logical plan as there" +
+ "are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
+ }
+ val kafkaSource = sources.head
+ val topic = topics.toSeq(Random.nextInt(topics.size))
+ val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString }.toArray)
+
+ def metadataToStr(m: (String, RecordMetadata)): String = {
+ s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}"
+ }
+ // Verify that the test data gets inserted into multiple partitions
+ if (ensureDataInMultiplePartition) {
+ require(
+ sentMetadata.groupBy(_._2.partition).size > 1,
+ s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}")
+ }
+
+ val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics))
+ logInfo(s"Added data, expected offset $offset")
+ (kafkaSource, offset)
+ }
+
+ override def toString: String =
+ s"AddKafkaData(topics = $topics, data = $data, message = $message)"
+ }
+
+ private val topicId = new AtomicInteger(0)
+ protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+}
+
+abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
+
+ import testImplicits._
+
+ test("(de)serialization of initial offsets") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+
+ testStream(reader.load)(
+ makeSureGetOffsetCalled,
+ StopStream,
+ StartStream(),
+ StopStream)
+ }
+
+ test("maxOffsetsPerTrigger") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 3)
+ testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
+ testUtils.sendMessages(topic, Array("1"), Some(2))
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("maxOffsetsPerTrigger", 10)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // smallest now empty, 1 more from middle, 9 more from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+ 11, 108, 109, 110, 111, 112, 113, 114, 115, 116
+ ),
+ StopStream,
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // smallest now empty, 1 more from middle, 9 more from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+ 11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+ 12, 117, 118, 119, 120, 121, 122, 123, 124, 125
+ ),
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // smallest now empty, 1 more from middle, 9 more from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+ 11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+ 12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
+ 13, 126, 127, 128, 129, 130, 131, 132, 133, 134
+ )
+ )
+ }
+
+ test("input row metrics") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("-1"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("subscribe", topic)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+ testStream(mapped)(
+ StartStream(trigger = ProcessingTime(1)),
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(2, 3, 4),
+ AssertOnQuery { query =>
+ val recordsRead = query.recentProgress.map(_.numInputRows).sum
+ recordsRead == 3
+ }
+ )
+ }
+
+ test("subscribing topic by pattern with topic deletions") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-seems"
+ val topic2 = topicPrefix + "-bad"
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("-1"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", s"$topicPrefix-.*")
+ .option("failOnDataLoss", "false")
+
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(2, 3, 4),
+ Assert {
+ testUtils.deleteTopic(topic)
+ testUtils.createTopic(topic2, partitions = 5)
+ true
+ },
+ AddKafkaData(Set(topic2), 4, 5, 6),
+ CheckAnswer(2, 3, 4, 5, 6, 7)
+ )
+ }
+
+ test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
+ withTempDir { metadataPath =>
+ val topic = "kafka-initial-offset-current"
+ testUtils.createTopic(topic, partitions = 1)
+
+ val initialOffsetFile = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0").toFile
+
+ val df = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("startingOffsets", s"earliest")
+ .load()
+
+ // Test the written initial offset file has 0 byte in the beginning, so that
+ // Spark 2.1.0 can read the offsets (see SPARK-19517)
+ testStream(df)(
+ StartStream(checkpointLocation = metadataPath.getAbsolutePath),
+ makeSureGetOffsetCalled)
+
+ val binarySource = Source.fromFile(initialOffsetFile)
+ try {
+ assert(binarySource.next().toInt == 0) // first byte is binary 0
+ } finally {
+ binarySource.close()
+ }
+ }
+ }
+
+ test("deserialization of initial offset written by Spark 2.1.0 (SPARK-19517)") {
+ withTempDir { metadataPath =>
+ val topic = "kafka-initial-offset-2-1-0"
+ testUtils.createTopic(topic, partitions = 3)
+ testUtils.sendMessages(topic, Array("0", "1", "2"), Some(0))
+ testUtils.sendMessages(topic, Array("0", "10", "20"), Some(1))
+ testUtils.sendMessages(topic, Array("0", "100", "200"), Some(2))
+
+ // Copy the initial offset file into the right location inside the checkpoint root directory
+ // such that the Kafka source can read it for initial offsets.
+ val from = new File(
+ getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
+ val to = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0")
+ Files.createDirectories(to.getParent)
+ Files.copy(from, to)
+
+ val df = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("startingOffsets", s"earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ // Test that the query starts from the expected initial offset (i.e. read older offsets,
+ // even though startingOffsets is latest).
+ testStream(df)(
+ StartStream(checkpointLocation = metadataPath.getAbsolutePath),
+ AddKafkaData(Set(topic), 1000),
+ CheckAnswer(0, 1, 2, 10, 20, 200, 1000))
+ }
+ }
+
+ test("deserialization of initial offset written by future version") {
+ withTempDir { metadataPath =>
+ val topic = "kafka-initial-offset-future-version"
+ testUtils.createTopic(topic, partitions = 3)
+
+ // Copy the initial offset file into the right location inside the checkpoint root directory
+ // such that the Kafka source can read it for initial offsets.
+ val from = new File(
+ getClass.getResource("/kafka-source-initial-offset-future-version.bin").toURI).toPath
+ val to = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0")
+ Files.createDirectories(to.getParent)
+ Files.copy(from, to)
+
+ val df = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(df)(
+ StartStream(checkpointLocation = metadataPath.getAbsolutePath),
+ ExpectFailure[IllegalStateException](e => {
+ Seq(
+ s"maximum supported log version is v1, but encountered v99999",
+ "produced by a newer version of Spark and cannot be read by this version"
+ ).foreach { message =>
+ assert(e.toString.contains(message))
+ }
+ }))
+ }
+ }
+
+ test("KafkaSource with watermark") {
+ val now = System.currentTimeMillis()
+ val topic = newTopic()
+ testUtils.createTopic(newTopic(), partitions = 1)
+ testUtils.sendMessages(topic, Array(1).map(_.toString))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("startingOffsets", s"earliest")
+ .option("subscribe", topic)
+ .load()
+
+ val windowedAggregation = kafka
+ .withWatermark("timestamp", "10 seconds")
+ .groupBy(window($"timestamp", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start") as 'window, $"count")
+
+ val query = windowedAggregation
+ .writeStream
+ .format("memory")
+ .outputMode("complete")
+ .queryName("kafkaWatermark")
+ .start()
+ query.processAllAvailable()
+ val rows = spark.table("kafkaWatermark").collect()
+ assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+ val row = rows(0)
+ // We cannot check the exact window start time as it depands on the time that messages were
+ // inserted by the producer. So here we just use a low bound to make sure the internal
+ // conversion works.
+ assert(
+ row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+ s"Unexpected results: $row")
+ assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+ query.stop()
+ }
+
+ test("delete a topic when a Spark job is running") {
+ KafkaSourceSuite.collectedData.clear()
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribe", topic)
+ // If a topic is deleted and we try to poll data starting from offset 0,
+ // the Kafka consumer will just block until timeout and return an empty result.
+ // So set the timeout to 1 second to make this test fast.
+ .option("kafkaConsumer.pollTimeoutMs", "1000")
+ .option("startingOffsets", "earliest")
+ .option("failOnDataLoss", "false")
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ KafkaSourceSuite.globalTestUtils = testUtils
+ // The following ForeachWriter will delete the topic before fetching data from Kafka
+ // in executors.
+ val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {
+ override def open(partitionId: Long, version: Long): Boolean = {
+ KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
+ true
+ }
+
+ override def process(value: Int): Unit = {
+ KafkaSourceSuite.collectedData.add(value)
+ }
+
+ override def close(errorOrNull: Throwable): Unit = {}
+ }).start()
+ query.processAllAvailable()
+ query.stop()
+ // `failOnDataLoss` is `false`, we should not fail the query
+ assert(query.exception.isEmpty)
+ }
+
+ test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") {
+ def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("maxOffsetsPerTrigger", 5)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+
+ reader.load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(k => k.toInt)
+ }
+
+ val df1 = getSpecificDF(0 to 9)
+ val df2 = getSpecificDF(100 to 199)
+
+ val kafka = df1.union(df2)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ testStream(kafka)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 5 from smaller topic, 5 from bigger one
+ CheckLastBatch((0 to 4) ++ (100 to 104): _*),
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // 5 from smaller topic, 5 from bigger one
+ CheckLastBatch((5 to 9) ++ (105 to 109): _*),
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // smaller topic empty, 5 from bigger one
+ CheckLastBatch(110 to 114: _*),
+ StopStream,
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // smallest now empty, 5 from bigger one
+ CheckLastBatch(115 to 119: _*),
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // smallest now empty, 5 from bigger one
+ CheckLastBatch(120 to 124: _*)
+ )
+ }
+
+ test("ensure stream-stream self-join generates only one offset in offset log") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 2)
+ require(testUtils.getLatestOffsets(Set(topic)).size === 2)
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("subscribe", topic)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .load()
+
+ val values = kafka
+ .selectExpr("CAST(CAST(value AS STRING) AS INT) AS value",
+ "CAST(CAST(value AS STRING) AS INT) % 5 AS key")
+
+ val join = values.join(values, "key")
+
+ testStream(join)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2),
+ CheckAnswer((1, 1, 1), (2, 2, 2)),
+ AddKafkaData(Set(topic), 6, 3),
+ CheckAnswer((1, 1, 1), (2, 2, 2), (3, 3, 3), (1, 6, 1), (1, 1, 6), (1, 6, 6))
+ )
+ }
+}
+
+
+class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.conf.set(
+ "spark.sql.streaming.disabledV2MicroBatchReaders",
+ classOf[KafkaSourceProvider].getCanonicalName)
+ }
+
+ test("V1 Source is used when disabled through SQLConf") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", s"$topic.*")
+ .load()
+
+ testStream(kafka)(
+ makeSureGetOffsetCalled,
+ AssertOnQuery { query =>
+ query.logicalPlan.collect {
+ case StreamingExecutionRelation(_: KafkaSource, _) => true
+ }.nonEmpty
+ }
+ )
+ }
+}
+
+class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
+
+ test("V2 Source is used by default") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", s"$topic.*")
+ .load()
+
+ testStream(kafka)(
+ makeSureGetOffsetCalled,
+ AssertOnQuery { query =>
+ query.logicalPlan.collect {
+ case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true
+ }.nonEmpty
+ }
+ )
+ }
+}
+
+abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
+
+ import testImplicits._
+
+ test("cannot stop Kafka stream") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", s"$topic.*")
+
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ StopStream
+ )
+ }
+
+ for (failOnDataLoss <- Seq(true, false)) {
+ test(s"assign from latest offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topic = newTopic()
+ testFromLatestOffsets(
+ topic,
+ addPartitions = false,
+ failOnDataLoss = failOnDataLoss,
+ "assign" -> assignString(topic, 0 to 4))
+ }
+
+ test(s"assign from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topic = newTopic()
+ testFromEarliestOffsets(
+ topic,
+ addPartitions = false,
+ failOnDataLoss = failOnDataLoss,
+ "assign" -> assignString(topic, 0 to 4))
+ }
+
+ test(s"assign from specific offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topic = newTopic()
+ testFromSpecificOffsets(
+ topic,
+ failOnDataLoss = failOnDataLoss,
+ "assign" -> assignString(topic, 0 to 4),
+ "failOnDataLoss" -> failOnDataLoss.toString)
+ }
+
+ test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topic = newTopic()
+ testFromLatestOffsets(
+ topic,
+ addPartitions = true,
+ failOnDataLoss = failOnDataLoss,
+ "subscribe" -> topic)
+ }
+
+ test(s"subscribing topic by name from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topic = newTopic()
+ testFromEarliestOffsets(
+ topic,
+ addPartitions = true,
+ failOnDataLoss = failOnDataLoss,
+ "subscribe" -> topic)
+ }
+
+ test(s"subscribing topic by name from specific offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topic = newTopic()
+ testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, "subscribe" -> topic)
+ }
+
+ test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromLatestOffsets(
+ topic,
+ addPartitions = true,
+ failOnDataLoss = failOnDataLoss,
+ "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ test(s"subscribing topic by pattern from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromEarliestOffsets(
+ topic,
+ addPartitions = true,
+ failOnDataLoss = failOnDataLoss,
+ "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ test(s"subscribing topic by pattern from specific offsets (failOnDataLoss: $failOnDataLoss)") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromSpecificOffsets(
+ topic,
+ failOnDataLoss = failOnDataLoss,
+ "subscribePattern" -> s"$topicPrefix-.*")
+ }
+ }
+
+ test("bad source options") {
+ def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
+ val ex = intercept[IllegalArgumentException] {
+ val reader = spark
+ .readStream
+ .format("kafka")
+ options.foreach { case (k, v) => reader.option(k, v) }
+ reader.load()
+ }
+ expectedMsgs.foreach { m =>
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT)))
+ }
+ }
+
+ // Specifying an ending offset
+ testBadOptions("endingOffsets" -> "latest")("Ending offset not valid in streaming queries")
+
+ // No strategy specified
+ testBadOptions()("options must be specified", "subscribe", "subscribePattern")
+
+ // Multiple strategies specified
+ testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
+ "only one", "options can be specified")
+
+ testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")(
+ "only one", "options can be specified")
+
+ testBadOptions("assign" -> "")("no topicpartitions to assign")
+ testBadOptions("subscribe" -> "")("no topics to subscribe")
+ testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
+ }
+
+ test("unsupported kafka configs") {
+ def testUnsupportedConfig(key: String, value: String = "someValue"): Unit = {
+ val ex = intercept[IllegalArgumentException] {
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("subscribe", "topic")
+ .option("kafka.bootstrap.servers", "somehost")
+ .option(s"$key", value)
+ reader.load()
+ }
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported"))
+ }
+
+ testUnsupportedConfig("kafka.group.id")
+ testUnsupportedConfig("kafka.auto.offset.reset")
+ testUnsupportedConfig("kafka.enable.auto.commit")
+ testUnsupportedConfig("kafka.interceptor.classes")
+ testUnsupportedConfig("kafka.key.deserializer")
+ testUnsupportedConfig("kafka.value.deserializer")
+
+ testUnsupportedConfig("kafka.auto.offset.reset", "none")
+ testUnsupportedConfig("kafka.auto.offset.reset", "someValue")
+ testUnsupportedConfig("kafka.auto.offset.reset", "earliest")
+ testUnsupportedConfig("kafka.auto.offset.reset", "latest")
+ }
+
+ test("get offsets from case insensitive parameters") {
+ for ((optionKey, optionValue, answer) <- Seq(
+ (STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
+ (ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit),
+ (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
+ SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) {
+ val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), optionKey, answer)
+ assert(offset === answer)
+ }
+
+ for ((optionKey, answer) <- Seq(
+ (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
+ (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
+ val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer)
+ assert(offset === answer)
+ }
+ }
+
+ private def assignString(topic: String, partitions: Iterable[Int]): String = {
+ JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+ }
+
+ private def testFromSpecificOffsets(
+ topic: String,
+ failOnDataLoss: Boolean,
+ options: (String, String)*): Unit = {
+ val partitionOffsets = Map(
+ new TopicPartition(topic, 0) -> -2L,
+ new TopicPartition(topic, 1) -> -1L,
+ new TopicPartition(topic, 2) -> 0L,
+ new TopicPartition(topic, 3) -> 1L,
+ new TopicPartition(topic, 4) -> 2L
+ )
+ val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
+
+ testUtils.createTopic(topic, partitions = 5)
+ // part 0 starts at earliest, these should all be seen
+ testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
+ // part 1 starts at latest, these should all be skipped
+ testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
+ // part 2 starts at 0, these should all be seen
+ testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
+ // part 3 starts at 1, first should be skipped
+ testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
+ // part 4 starts at 2, first and second should be skipped
+ testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("startingOffsets", startingOffsets)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("failOnDataLoss", failOnDataLoss.toString)
+ options.foreach { case (k, v) => reader.option(k, v) }
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ Execute { q =>
+ // wait to reach the last offset in every partition
+ q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)))
+ },
+ CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
+ StopStream,
+ StartStream(),
+ CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery
+ AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true),
+ CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34),
+ StopStream
+ )
+ }
+
+ test("Kafka column types") {
+ val now = System.currentTimeMillis()
+ val topic = newTopic()
+ testUtils.createTopic(newTopic(), partitions = 1)
+ testUtils.sendMessages(topic, Array(1).map(_.toString))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("startingOffsets", s"earliest")
+ .option("subscribe", topic)
+ .load()
+
+ val query = kafka
+ .writeStream
+ .format("memory")
+ .queryName("kafkaColumnTypes")
+ .trigger(defaultTrigger)
+ .start()
+ eventually(timeout(streamingTimeout)) {
+ assert(spark.table("kafkaColumnTypes").count == 1,
+ s"Unexpected results: ${spark.table("kafkaColumnTypes").collectAsList()}")
+ }
+ val row = spark.table("kafkaColumnTypes").head()
+ assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
+ assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
+ assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
+ assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row")
+ assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
+ // We cannot check the exact timestamp as it's the time that messages were inserted by the
+ // producer. So here we just use a low bound to make sure the internal conversion works.
+ assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
+ assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
+ query.stop()
+ }
+
+ private def testFromLatestOffsets(
+ topic: String,
+ addPartitions: Boolean,
+ failOnDataLoss: Boolean,
+ options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("-1"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("startingOffsets", s"latest")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("failOnDataLoss", failOnDataLoss.toString)
+ options.foreach { case (k, v) => reader.option(k, v) }
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(2, 3, 4),
+ StopStream,
+ StartStream(),
+ CheckAnswer(2, 3, 4), // Should get the data back on recovery
+ StopStream,
+ AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
+ StartStream(),
+ CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
+ AddKafkaData(Set(topic), 7, 8),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
+ AssertOnQuery("Add partitions") { query: StreamExecution =>
+ if (addPartitions) setTopicPartitions(topic, 10, query)
+ true
+ },
+ AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
+ )
+ }
+
+ private def testFromEarliestOffsets(
+ topic: String,
+ addPartitions: Boolean,
+ failOnDataLoss: Boolean,
+ options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark.readStream
+ reader
+ .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
+ .option("startingOffsets", s"earliest")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("failOnDataLoss", failOnDataLoss.toString)
+ options.foreach { case (k, v) => reader.option(k, v) }
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
+ CheckAnswer(2, 3, 4, 5, 6, 7),
+ StopStream,
+ StartStream(),
+ CheckAnswer(2, 3, 4, 5, 6, 7),
+ StopStream,
+ AddKafkaData(Set(topic), 7, 8),
+ StartStream(),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
+ AssertOnQuery("Add partitions") { query: StreamExecution =>
+ if (addPartitions) setTopicPartitions(topic, 10, query)
+ true
+ },
+ AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
+ )
+ }
+}
+
+object KafkaSourceSuite {
+ @volatile var globalTestUtils: KafkaTestUtils = _
+ val collectedData = new ConcurrentLinkedQueue[Any]()
+}
+
+
+class KafkaSourceStressSuite extends KafkaSourceTest {
+
+ import testImplicits._
+
+ val topicId = new AtomicInteger(1)
+
+ @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic)
+
+ def newStressTopic: String = s"stress${topicId.getAndIncrement()}"
+
+ private def nextInt(start: Int, end: Int): Int = {
+ start + Random.nextInt(start + end - 1)
+ }
+
+ test("stress test with multiple topics and partitions") {
+ topics.foreach { topic =>
+ testUtils.createTopic(topic, partitions = nextInt(1, 6))
+ testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
+ }
+
+ // Create Kafka source that reads from latest offset
+ val kafka =
+ spark.readStream
+ .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", "stress.*")
+ .option("failOnDataLoss", "false")
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ runStressTest(
+ mapped,
+ Seq(makeSureGetOffsetCalled),
+ (d, running) => {
+ Random.nextInt(5) match {
+ case 0 => // Add a new topic
+ topics = topics ++ Seq(newStressTopic)
+ AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic",
+ topicAction = (topic, partition) => {
+ if (partition.isEmpty) {
+ testUtils.createTopic(topic, partitions = nextInt(1, 6))
+ }
+ })
+ case 1 if running =>
+ // Only delete a topic when the query is running. Otherwise, we may lost data and
+ // cannot check the correctness.
+ val deletedTopic = topics(Random.nextInt(topics.size))
+ if (deletedTopic != topics.head) {
+ topics = topics.filterNot(_ == deletedTopic)
+ }
+ AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic $deletedTopic",
+ topicAction = (topic, partition) => {
+ // Never remove the first topic to make sure we have at least one topic
+ if (topic == deletedTopic && deletedTopic != topics.head) {
+ testUtils.deleteTopic(deletedTopic)
+ }
+ })
+ case 2 => // Add new partitions
+ AddKafkaData(topics.toSet, d: _*)(message = "Add partition",
+ topicAction = (topic, partition) => {
+ testUtils.addPartitions(topic, partition.get + nextInt(1, 6))
+ })
+ case _ => // Just add new data
+ AddKafkaData(topics.toSet, d: _*)
+ }
+ },
+ iterations = 50)
+ }
+}
+
+class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext {
+
+ import testImplicits._
+
+ private var testUtils: KafkaTestUtils = _
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ props.put("log.segment.bytes", "40")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ super.afterAll()
+ }
+ }
+
+ protected def startStream(ds: Dataset[Int]) = {
+ ds.writeStream.foreach(new ForeachWriter[Int] {
+
+ override def open(partitionId: Long, version: Long): Boolean = {
+ true
+ }
+
+ override def process(value: Int): Unit = {
+ // Slow down the processing speed so that messages may be aged out.
+ Thread.sleep(Random.nextInt(500))
+ }
+
+ override def close(errorOrNull: Throwable): Unit = {
+ }
+ }).start()
+ }
+
+ test("stress test for failOnDataLoss=false") {
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", "failOnDataLoss.*")
+ .option("startingOffsets", "earliest")
+ .option("failOnDataLoss", "false")
+ .option("fetchOffset.retryIntervalMs", "3000")
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val query = startStream(kafka.map(kv => kv._2.toInt))
+
+ val testTime = 1.minutes
+ val startTime = System.currentTimeMillis()
+ // Track the current existing topics
+ val topics = mutable.ArrayBuffer[String]()
+ // Track topics that have been deleted
+ val deletedTopics = mutable.Set[String]()
+ while (System.currentTimeMillis() - testTime.toMillis < startTime) {
+ Random.nextInt(10) match {
+ case 0 => // Create a new topic
+ val topic = newTopic()
+ topics += topic
+ // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
+ // chance that a topic will be recreated after deletion due to the asynchronous update.
+ // Hence, always overwrite to handle this race condition.
+ testUtils.createTopic(topic, partitions = 1, overwrite = true)
+ logInfo(s"Create topic $topic")
+ case 1 if topics.nonEmpty => // Delete an existing topic
+ val topic = topics.remove(Random.nextInt(topics.size))
+ testUtils.deleteTopic(topic)
+ logInfo(s"Delete topic $topic")
+ deletedTopics += topic
+ case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted.
+ val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
+ deletedTopics -= topic
+ topics += topic
+ // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
+ // chance that a topic will be recreated after deletion due to the asynchronous update.
+ // Hence, always overwrite to handle this race condition.
+ testUtils.createTopic(topic, partitions = 1, overwrite = true)
+ logInfo(s"Create topic $topic")
+ case 3 =>
+ Thread.sleep(1000)
+ case _ => // Push random messages
+ for (topic <- topics) {
+ val size = Random.nextInt(10)
+ for (_ <- 0 until size) {
+ testUtils.sendMessages(topic, Array(Random.nextInt(10).toString))
+ }
+ }
+ }
+ // `failOnDataLoss` is `false`, we should not fail the query
+ if (query.exception.nonEmpty) {
+ throw query.exception.get
+ }
+ }
+
+ query.stop()
+ // `failOnDataLoss` is `false`, we should not fail the query
+ if (query.exception.nonEmpty) {
+ throw query.exception.get
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org