You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/02/16 22:30:26 UTC

[1/2] spark git commit: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Repository: spark
Updated Branches:
  refs/heads/master c5857e496 -> 0a73aa31f


http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
deleted file mode 100644
index 02c8764..0000000
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ /dev/null
@@ -1,1122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.io._
-import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.{Files, Paths}
-import java.util.{Locale, Properties}
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Dataset, ForeachWriter}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
-import org.apache.spark.sql.functions.{count, window}
-import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
-import org.apache.spark.sql.streaming.util.StreamManualClock
-import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
-import org.apache.spark.util.Utils
-
-abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
-
-  protected var testUtils: KafkaTestUtils = _
-
-  override val streamingTimeout = 30.seconds
-
-  protected val brokerProps = Map[String, Object]()
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    testUtils = new KafkaTestUtils(brokerProps)
-    testUtils.setup()
-  }
-
-  override def afterAll(): Unit = {
-    if (testUtils != null) {
-      testUtils.teardown()
-      testUtils = null
-    }
-    super.afterAll()
-  }
-
-  protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
-    // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure
-    // its "getOffset" is called before pushing any data. Otherwise, because of the race condition,
-    // we don't know which data should be fetched when `startingOffsets` is latest.
-    q match {
-      case c: ContinuousExecution => c.awaitEpoch(0)
-      case m: MicroBatchExecution => m.processAllAvailable()
-    }
-    true
-  }
-
-  protected def setTopicPartitions(topic: String, newCount: Int, query: StreamExecution) : Unit = {
-    testUtils.addPartitions(topic, newCount)
-  }
-
-  /**
-   * Add data to Kafka.
-   *
-   * `topicAction` can be used to run actions for each topic before inserting data.
-   */
-  case class AddKafkaData(topics: Set[String], data: Int*)
-    (implicit ensureDataInMultiplePartition: Boolean = false,
-      concurrent: Boolean = false,
-      message: String = "",
-      topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData {
-
-    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
-      query match {
-        // Make sure no Spark job is running when deleting a topic
-        case Some(m: MicroBatchExecution) => m.processAllAvailable()
-        case _ =>
-      }
-
-      val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
-      val newTopics = topics.diff(existingTopics.keySet)
-      for (newTopic <- newTopics) {
-        topicAction(newTopic, None)
-      }
-      for (existingTopicPartitions <- existingTopics) {
-        topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2))
-      }
-
-      require(
-        query.nonEmpty,
-        "Cannot add data when there is no query for finding the active kafka source")
-
-      val sources = query.get.logicalPlan.collect {
-        case StreamingExecutionRelation(source: KafkaSource, _) => source
-      } ++ (query.get.lastExecution match {
-        case null => Seq()
-        case e => e.logical.collect {
-          case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
-        }
-      })
-      if (sources.isEmpty) {
-        throw new Exception(
-          "Could not find Kafka source in the StreamExecution logical plan to add data to")
-      } else if (sources.size > 1) {
-        throw new Exception(
-          "Could not select the Kafka source in the StreamExecution logical plan as there" +
-            "are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
-      }
-      val kafkaSource = sources.head
-      val topic = topics.toSeq(Random.nextInt(topics.size))
-      val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString }.toArray)
-
-      def metadataToStr(m: (String, RecordMetadata)): String = {
-        s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}"
-      }
-      // Verify that the test data gets inserted into multiple partitions
-      if (ensureDataInMultiplePartition) {
-        require(
-          sentMetadata.groupBy(_._2.partition).size > 1,
-          s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}")
-      }
-
-      val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics))
-      logInfo(s"Added data, expected offset $offset")
-      (kafkaSource, offset)
-    }
-
-    override def toString: String =
-      s"AddKafkaData(topics = $topics, data = $data, message = $message)"
-  }
-
-  private val topicId = new AtomicInteger(0)
-  protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
-}
-
-class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
-
-  import testImplicits._
-
-  test("(de)serialization of initial offsets") {
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 5)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", topic)
-
-    testStream(reader.load)(
-      makeSureGetOffsetCalled,
-      StopStream,
-      StartStream(),
-      StopStream)
-  }
-
-  test("maxOffsetsPerTrigger") {
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 3)
-    testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
-    testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
-    testUtils.sendMessages(topic, Array("1"), Some(2))
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("maxOffsetsPerTrigger", 10)
-      .option("subscribe", topic)
-      .option("startingOffsets", "earliest")
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
-
-    val clock = new StreamManualClock
-
-    val waitUntilBatchProcessed = AssertOnQuery { q =>
-      eventually(Timeout(streamingTimeout)) {
-        if (!q.exception.isDefined) {
-          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
-        }
-      }
-      if (q.exception.isDefined) {
-        throw q.exception.get
-      }
-      true
-    }
-
-    testStream(mapped)(
-      StartStream(ProcessingTime(100), clock),
-      waitUntilBatchProcessed,
-      // 1 from smallest, 1 from middle, 8 from biggest
-      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // smallest now empty, 1 more from middle, 9 more from biggest
-      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-        11, 108, 109, 110, 111, 112, 113, 114, 115, 116
-      ),
-      StopStream,
-      StartStream(ProcessingTime(100), clock),
-      waitUntilBatchProcessed,
-      // smallest now empty, 1 more from middle, 9 more from biggest
-      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
-        12, 117, 118, 119, 120, 121, 122, 123, 124, 125
-      ),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // smallest now empty, 1 more from middle, 9 more from biggest
-      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
-        12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
-        13, 126, 127, 128, 129, 130, 131, 132, 133, 134
-      )
-    )
-  }
-
-  test("input row metrics") {
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, Array("-1"))
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val kafka = spark
-      .readStream
-      .format("kafka")
-      .option("subscribe", topic)
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-    testStream(mapped)(
-      StartStream(trigger = ProcessingTime(1)),
-      makeSureGetOffsetCalled,
-      AddKafkaData(Set(topic), 1, 2, 3),
-      CheckAnswer(2, 3, 4),
-      AssertOnQuery { query =>
-        val recordsRead = query.recentProgress.map(_.numInputRows).sum
-        recordsRead == 3
-      }
-    )
-  }
-
-  test("subscribing topic by pattern with topic deletions") {
-    val topicPrefix = newTopic()
-    val topic = topicPrefix + "-seems"
-    val topic2 = topicPrefix + "-bad"
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, Array("-1"))
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("subscribePattern", s"$topicPrefix-.*")
-      .option("failOnDataLoss", "false")
-
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    testStream(mapped)(
-      makeSureGetOffsetCalled,
-      AddKafkaData(Set(topic), 1, 2, 3),
-      CheckAnswer(2, 3, 4),
-      Assert {
-        testUtils.deleteTopic(topic)
-        testUtils.createTopic(topic2, partitions = 5)
-        true
-      },
-      AddKafkaData(Set(topic2), 4, 5, 6),
-      CheckAnswer(2, 3, 4, 5, 6, 7)
-    )
-  }
-
-  testWithUninterruptibleThread(
-    "deserialization of initial offset with Spark 2.1.0") {
-    withTempDir { metadataPath =>
-      val topic = newTopic
-      testUtils.createTopic(topic, partitions = 3)
-
-      val provider = new KafkaSourceProvider
-      val parameters = Map(
-        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
-        "subscribe" -> topic
-      )
-      val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
-        "", parameters)
-      source.getOffset.get // Write initial offset
-
-      // Make sure Spark 2.1.0 will throw an exception when reading the new log
-      intercept[java.lang.IllegalArgumentException] {
-        // Simulate how Spark 2.1.0 reads the log
-        Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
-          val length = in.read()
-          val bytes = new Array[Byte](length)
-          in.read(bytes)
-          KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
-        }
-      }
-    }
-  }
-
-  testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") {
-    withTempDir { metadataPath =>
-      val topic = "kafka-initial-offset-2-1-0"
-      testUtils.createTopic(topic, partitions = 3)
-
-      val provider = new KafkaSourceProvider
-      val parameters = Map(
-        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
-        "subscribe" -> topic
-      )
-
-      val from = new File(
-        getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
-      val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
-      Files.copy(from, to)
-
-      val source = provider.createSource(
-        spark.sqlContext, metadataPath.toURI.toString, None, "", parameters)
-      val deserializedOffset = source.getOffset.get
-      val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
-      assert(referenceOffset == deserializedOffset)
-    }
-  }
-
-  testWithUninterruptibleThread("deserialization of initial offset written by future version") {
-    withTempDir { metadataPath =>
-      val futureMetadataLog =
-        new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
-          metadataPath.getAbsolutePath) {
-          override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
-            out.write(0)
-            val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
-            writer.write(s"v99999\n${metadata.json}")
-            writer.flush
-          }
-        }
-
-      val topic = newTopic
-      testUtils.createTopic(topic, partitions = 3)
-      val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
-      futureMetadataLog.add(0, offset)
-
-      val provider = new KafkaSourceProvider
-      val parameters = Map(
-        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
-        "subscribe" -> topic
-      )
-      val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
-        "", parameters)
-
-      val e = intercept[java.lang.IllegalStateException] {
-        source.getOffset.get // Read initial offset
-      }
-
-      Seq(
-        s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999",
-        "produced by a newer version of Spark and cannot be read by this version"
-      ).foreach { message =>
-        assert(e.getMessage.contains(message))
-      }
-    }
-  }
-
-  test("KafkaSource with watermark") {
-    val now = System.currentTimeMillis()
-    val topic = newTopic()
-    testUtils.createTopic(newTopic(), partitions = 1)
-    testUtils.sendMessages(topic, Array(1).map(_.toString))
-
-    val kafka = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("startingOffsets", s"earliest")
-      .option("subscribe", topic)
-      .load()
-
-    val windowedAggregation = kafka
-      .withWatermark("timestamp", "10 seconds")
-      .groupBy(window($"timestamp", "5 seconds") as 'window)
-      .agg(count("*") as 'count)
-      .select($"window".getField("start") as 'window, $"count")
-
-    val query = windowedAggregation
-      .writeStream
-      .format("memory")
-      .outputMode("complete")
-      .queryName("kafkaWatermark")
-      .start()
-    query.processAllAvailable()
-    val rows = spark.table("kafkaWatermark").collect()
-    assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
-    val row = rows(0)
-    // We cannot check the exact window start time as it depands on the time that messages were
-    // inserted by the producer. So here we just use a low bound to make sure the internal
-    // conversion works.
-    assert(
-      row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
-      s"Unexpected results: $row")
-    assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
-    query.stop()
-  }
-
-  test("delete a topic when a Spark job is running") {
-    KafkaSourceSuite.collectedData.clear()
-
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 1)
-    testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("subscribe", topic)
-      // If a topic is deleted and we try to poll data starting from offset 0,
-      // the Kafka consumer will just block until timeout and return an empty result.
-      // So set the timeout to 1 second to make this test fast.
-      .option("kafkaConsumer.pollTimeoutMs", "1000")
-      .option("startingOffsets", "earliest")
-      .option("failOnDataLoss", "false")
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    KafkaSourceSuite.globalTestUtils = testUtils
-    // The following ForeachWriter will delete the topic before fetching data from Kafka
-    // in executors.
-    val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {
-      override def open(partitionId: Long, version: Long): Boolean = {
-        KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
-        true
-      }
-
-      override def process(value: Int): Unit = {
-        KafkaSourceSuite.collectedData.add(value)
-      }
-
-      override def close(errorOrNull: Throwable): Unit = {}
-    }).start()
-    query.processAllAvailable()
-    query.stop()
-    // `failOnDataLoss` is `false`, we should not fail the query
-    assert(query.exception.isEmpty)
-  }
-
-  test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") {
-    def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = {
-      val topic = newTopic()
-      testUtils.createTopic(topic, partitions = 1)
-      testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
-
-      val reader = spark
-        .readStream
-        .format("kafka")
-        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-        .option("kafka.metadata.max.age.ms", "1")
-        .option("maxOffsetsPerTrigger", 5)
-        .option("subscribe", topic)
-        .option("startingOffsets", "earliest")
-
-      reader.load()
-        .selectExpr("CAST(value AS STRING)")
-        .as[String]
-        .map(k => k.toInt)
-    }
-
-    val df1 = getSpecificDF(0 to 9)
-    val df2 = getSpecificDF(100 to 199)
-
-    val kafka = df1.union(df2)
-
-    val clock = new StreamManualClock
-
-    val waitUntilBatchProcessed = AssertOnQuery { q =>
-      eventually(Timeout(streamingTimeout)) {
-        if (!q.exception.isDefined) {
-          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
-        }
-      }
-      if (q.exception.isDefined) {
-        throw q.exception.get
-      }
-      true
-    }
-
-    testStream(kafka)(
-      StartStream(ProcessingTime(100), clock),
-      waitUntilBatchProcessed,
-      // 5 from smaller topic, 5 from bigger one
-      CheckLastBatch((0 to 4) ++ (100 to 104): _*),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // 5 from smaller topic, 5 from bigger one
-      CheckLastBatch((5 to 9) ++ (105 to 109): _*),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // smaller topic empty, 5 from bigger one
-      CheckLastBatch(110 to 114: _*),
-      StopStream,
-      StartStream(ProcessingTime(100), clock),
-      waitUntilBatchProcessed,
-      // smallest now empty, 5 from bigger one
-      CheckLastBatch(115 to 119: _*),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // smallest now empty, 5 from bigger one
-      CheckLastBatch(120 to 124: _*)
-    )
-  }
-}
-
-abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
-
-  import testImplicits._
-
-  test("cannot stop Kafka stream") {
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("subscribePattern", s"$topic.*")
-
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    testStream(mapped)(
-      makeSureGetOffsetCalled,
-      StopStream
-    )
-  }
-
-  for (failOnDataLoss <- Seq(true, false)) {
-    test(s"assign from latest offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromLatestOffsets(
-        topic,
-        addPartitions = false,
-        failOnDataLoss = failOnDataLoss,
-        "assign" -> assignString(topic, 0 to 4))
-    }
-
-    test(s"assign from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromEarliestOffsets(
-        topic,
-        addPartitions = false,
-        failOnDataLoss = failOnDataLoss,
-        "assign" -> assignString(topic, 0 to 4))
-    }
-
-    test(s"assign from specific offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromSpecificOffsets(
-        topic,
-        failOnDataLoss = failOnDataLoss,
-        "assign" -> assignString(topic, 0 to 4),
-        "failOnDataLoss" -> failOnDataLoss.toString)
-    }
-
-    test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromLatestOffsets(
-        topic,
-        addPartitions = true,
-        failOnDataLoss = failOnDataLoss,
-        "subscribe" -> topic)
-    }
-
-    test(s"subscribing topic by name from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromEarliestOffsets(
-        topic,
-        addPartitions = true,
-        failOnDataLoss = failOnDataLoss,
-        "subscribe" -> topic)
-    }
-
-    test(s"subscribing topic by name from specific offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, "subscribe" -> topic)
-    }
-
-    test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topicPrefix = newTopic()
-      val topic = topicPrefix + "-suffix"
-      testFromLatestOffsets(
-        topic,
-        addPartitions = true,
-        failOnDataLoss = failOnDataLoss,
-        "subscribePattern" -> s"$topicPrefix-.*")
-    }
-
-    test(s"subscribing topic by pattern from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topicPrefix = newTopic()
-      val topic = topicPrefix + "-suffix"
-      testFromEarliestOffsets(
-        topic,
-        addPartitions = true,
-        failOnDataLoss = failOnDataLoss,
-        "subscribePattern" -> s"$topicPrefix-.*")
-    }
-
-    test(s"subscribing topic by pattern from specific offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topicPrefix = newTopic()
-      val topic = topicPrefix + "-suffix"
-      testFromSpecificOffsets(
-        topic,
-        failOnDataLoss = failOnDataLoss,
-        "subscribePattern" -> s"$topicPrefix-.*")
-    }
-  }
-
-  test("bad source options") {
-    def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
-      val ex = intercept[IllegalArgumentException] {
-        val reader = spark
-          .readStream
-          .format("kafka")
-        options.foreach { case (k, v) => reader.option(k, v) }
-        reader.load()
-      }
-      expectedMsgs.foreach { m =>
-        assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT)))
-      }
-    }
-
-    // Specifying an ending offset
-    testBadOptions("endingOffsets" -> "latest")("Ending offset not valid in streaming queries")
-
-    // No strategy specified
-    testBadOptions()("options must be specified", "subscribe", "subscribePattern")
-
-    // Multiple strategies specified
-    testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
-      "only one", "options can be specified")
-
-    testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")(
-      "only one", "options can be specified")
-
-    testBadOptions("assign" -> "")("no topicpartitions to assign")
-    testBadOptions("subscribe" -> "")("no topics to subscribe")
-    testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
-  }
-
-  test("unsupported kafka configs") {
-    def testUnsupportedConfig(key: String, value: String = "someValue"): Unit = {
-      val ex = intercept[IllegalArgumentException] {
-        val reader = spark
-          .readStream
-          .format("kafka")
-          .option("subscribe", "topic")
-          .option("kafka.bootstrap.servers", "somehost")
-          .option(s"$key", value)
-        reader.load()
-      }
-      assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported"))
-    }
-
-    testUnsupportedConfig("kafka.group.id")
-    testUnsupportedConfig("kafka.auto.offset.reset")
-    testUnsupportedConfig("kafka.enable.auto.commit")
-    testUnsupportedConfig("kafka.interceptor.classes")
-    testUnsupportedConfig("kafka.key.deserializer")
-    testUnsupportedConfig("kafka.value.deserializer")
-
-    testUnsupportedConfig("kafka.auto.offset.reset", "none")
-    testUnsupportedConfig("kafka.auto.offset.reset", "someValue")
-    testUnsupportedConfig("kafka.auto.offset.reset", "earliest")
-    testUnsupportedConfig("kafka.auto.offset.reset", "latest")
-  }
-
-  test("get offsets from case insensitive parameters") {
-    for ((optionKey, optionValue, answer) <- Seq(
-      (STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
-      (ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit),
-      (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
-        SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) {
-      val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), optionKey, answer)
-      assert(offset === answer)
-    }
-
-    for ((optionKey, answer) <- Seq(
-      (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
-      (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
-      val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer)
-      assert(offset === answer)
-    }
-  }
-
-  private def assignString(topic: String, partitions: Iterable[Int]): String = {
-    JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
-  }
-
-  private def testFromSpecificOffsets(
-      topic: String,
-      failOnDataLoss: Boolean,
-      options: (String, String)*): Unit = {
-    val partitionOffsets = Map(
-      new TopicPartition(topic, 0) -> -2L,
-      new TopicPartition(topic, 1) -> -1L,
-      new TopicPartition(topic, 2) -> 0L,
-      new TopicPartition(topic, 3) -> 1L,
-      new TopicPartition(topic, 4) -> 2L
-    )
-    val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
-
-    testUtils.createTopic(topic, partitions = 5)
-    // part 0 starts at earliest, these should all be seen
-    testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
-    // part 1 starts at latest, these should all be skipped
-    testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
-    // part 2 starts at 0, these should all be seen
-    testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
-    // part 3 starts at 1, first should be skipped
-    testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
-    // part 4 starts at 2, first and second should be skipped
-    testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("startingOffsets", startingOffsets)
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("failOnDataLoss", failOnDataLoss.toString)
-    options.foreach { case (k, v) => reader.option(k, v) }
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
-
-    testStream(mapped)(
-      makeSureGetOffsetCalled,
-      Execute { q =>
-        // wait to reach the last offset in every partition
-        q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)))
-      },
-      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
-      StopStream,
-      StartStream(),
-      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery
-      AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true),
-      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34),
-      StopStream
-    )
-  }
-
-  test("Kafka column types") {
-    val now = System.currentTimeMillis()
-    val topic = newTopic()
-    testUtils.createTopic(newTopic(), partitions = 1)
-    testUtils.sendMessages(topic, Array(1).map(_.toString))
-
-    val kafka = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("startingOffsets", s"earliest")
-      .option("subscribe", topic)
-      .load()
-
-    val query = kafka
-      .writeStream
-      .format("memory")
-      .queryName("kafkaColumnTypes")
-      .trigger(defaultTrigger)
-      .start()
-    eventually(timeout(streamingTimeout)) {
-      assert(spark.table("kafkaColumnTypes").count == 1,
-        s"Unexpected results: ${spark.table("kafkaColumnTypes").collectAsList()}")
-    }
-    val row = spark.table("kafkaColumnTypes").head()
-    assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
-    assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
-    assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
-    assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row")
-    assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
-    // We cannot check the exact timestamp as it's the time that messages were inserted by the
-    // producer. So here we just use a low bound to make sure the internal conversion works.
-    assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
-    assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
-    query.stop()
-  }
-
-  private def testFromLatestOffsets(
-      topic: String,
-      addPartitions: Boolean,
-      failOnDataLoss: Boolean,
-      options: (String, String)*): Unit = {
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, Array("-1"))
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("startingOffsets", s"latest")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("failOnDataLoss", failOnDataLoss.toString)
-    options.foreach { case (k, v) => reader.option(k, v) }
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    testStream(mapped)(
-      makeSureGetOffsetCalled,
-      AddKafkaData(Set(topic), 1, 2, 3),
-      CheckAnswer(2, 3, 4),
-      StopStream,
-      StartStream(),
-      CheckAnswer(2, 3, 4), // Should get the data back on recovery
-      StopStream,
-      AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
-      StartStream(),
-      CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
-      AddKafkaData(Set(topic), 7, 8),
-      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
-      AssertOnQuery("Add partitions") { query: StreamExecution =>
-        if (addPartitions) setTopicPartitions(topic, 10, query)
-        true
-      },
-      AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
-      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
-    )
-  }
-
-  private def testFromEarliestOffsets(
-      topic: String,
-      addPartitions: Boolean,
-      failOnDataLoss: Boolean,
-      options: (String, String)*): Unit = {
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val reader = spark.readStream
-    reader
-      .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
-      .option("startingOffsets", s"earliest")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("failOnDataLoss", failOnDataLoss.toString)
-    options.foreach { case (k, v) => reader.option(k, v) }
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    testStream(mapped)(
-      AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
-      CheckAnswer(2, 3, 4, 5, 6, 7),
-      StopStream,
-      StartStream(),
-      CheckAnswer(2, 3, 4, 5, 6, 7),
-      StopStream,
-      AddKafkaData(Set(topic), 7, 8),
-      StartStream(),
-      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
-      AssertOnQuery("Add partitions") { query: StreamExecution =>
-        if (addPartitions) setTopicPartitions(topic, 10, query)
-        true
-      },
-      AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
-      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
-    )
-  }
-}
-
-object KafkaSourceSuite {
-  @volatile var globalTestUtils: KafkaTestUtils = _
-  val collectedData = new ConcurrentLinkedQueue[Any]()
-}
-
-
-class KafkaSourceStressSuite extends KafkaSourceTest {
-
-  import testImplicits._
-
-  val topicId = new AtomicInteger(1)
-
-  @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic)
-
-  def newStressTopic: String = s"stress${topicId.getAndIncrement()}"
-
-  private def nextInt(start: Int, end: Int): Int = {
-    start + Random.nextInt(start + end - 1)
-  }
-
-  test("stress test with multiple topics and partitions")  {
-    topics.foreach { topic =>
-      testUtils.createTopic(topic, partitions = nextInt(1, 6))
-      testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
-    }
-
-    // Create Kafka source that reads from latest offset
-    val kafka =
-      spark.readStream
-        .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
-        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-        .option("kafka.metadata.max.age.ms", "1")
-        .option("subscribePattern", "stress.*")
-        .option("failOnDataLoss", "false")
-        .load()
-        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-        .as[(String, String)]
-
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    runStressTest(
-      mapped,
-      Seq(makeSureGetOffsetCalled),
-      (d, running) => {
-        Random.nextInt(5) match {
-          case 0 => // Add a new topic
-            topics = topics ++ Seq(newStressTopic)
-            AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic",
-              topicAction = (topic, partition) => {
-                if (partition.isEmpty) {
-                  testUtils.createTopic(topic, partitions = nextInt(1, 6))
-                }
-              })
-          case 1 if running =>
-            // Only delete a topic when the query is running. Otherwise, we may lost data and
-            // cannot check the correctness.
-            val deletedTopic = topics(Random.nextInt(topics.size))
-            if (deletedTopic != topics.head) {
-              topics = topics.filterNot(_ == deletedTopic)
-            }
-            AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic $deletedTopic",
-              topicAction = (topic, partition) => {
-                // Never remove the first topic to make sure we have at least one topic
-                if (topic == deletedTopic && deletedTopic != topics.head) {
-                  testUtils.deleteTopic(deletedTopic)
-                }
-              })
-          case 2 => // Add new partitions
-            AddKafkaData(topics.toSet, d: _*)(message = "Add partition",
-              topicAction = (topic, partition) => {
-                testUtils.addPartitions(topic, partition.get + nextInt(1, 6))
-              })
-          case _ => // Just add new data
-            AddKafkaData(topics.toSet, d: _*)
-        }
-      },
-      iterations = 50)
-  }
-}
-
-class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext {
-
-  import testImplicits._
-
-  private var testUtils: KafkaTestUtils = _
-
-  private val topicId = new AtomicInteger(0)
-
-  private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
-
-  override def createSparkSession(): TestSparkSession = {
-    // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
-    new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
-  }
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    testUtils = new KafkaTestUtils {
-      override def brokerConfiguration: Properties = {
-        val props = super.brokerConfiguration
-        // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
-        // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
-        // least 30 seconds.
-        props.put("log.cleaner.backoff.ms", "100")
-        props.put("log.segment.bytes", "40")
-        props.put("log.retention.bytes", "40")
-        props.put("log.retention.check.interval.ms", "100")
-        props.put("delete.retention.ms", "10")
-        props.put("log.flush.scheduler.interval.ms", "10")
-        props
-      }
-    }
-    testUtils.setup()
-  }
-
-  override def afterAll(): Unit = {
-    if (testUtils != null) {
-      testUtils.teardown()
-      testUtils = null
-      super.afterAll()
-    }
-  }
-
-  protected def startStream(ds: Dataset[Int]) = {
-    ds.writeStream.foreach(new ForeachWriter[Int] {
-
-      override def open(partitionId: Long, version: Long): Boolean = {
-        true
-      }
-
-      override def process(value: Int): Unit = {
-        // Slow down the processing speed so that messages may be aged out.
-        Thread.sleep(Random.nextInt(500))
-      }
-
-      override def close(errorOrNull: Throwable): Unit = {
-      }
-    }).start()
-  }
-
-  test("stress test for failOnDataLoss=false") {
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("subscribePattern", "failOnDataLoss.*")
-      .option("startingOffsets", "earliest")
-      .option("failOnDataLoss", "false")
-      .option("fetchOffset.retryIntervalMs", "3000")
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val query = startStream(kafka.map(kv => kv._2.toInt))
-
-    val testTime = 1.minutes
-    val startTime = System.currentTimeMillis()
-    // Track the current existing topics
-    val topics = mutable.ArrayBuffer[String]()
-    // Track topics that have been deleted
-    val deletedTopics = mutable.Set[String]()
-    while (System.currentTimeMillis() - testTime.toMillis < startTime) {
-      Random.nextInt(10) match {
-        case 0 => // Create a new topic
-          val topic = newTopic()
-          topics += topic
-          // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
-          // chance that a topic will be recreated after deletion due to the asynchronous update.
-          // Hence, always overwrite to handle this race condition.
-          testUtils.createTopic(topic, partitions = 1, overwrite = true)
-          logInfo(s"Create topic $topic")
-        case 1 if topics.nonEmpty => // Delete an existing topic
-          val topic = topics.remove(Random.nextInt(topics.size))
-          testUtils.deleteTopic(topic)
-          logInfo(s"Delete topic $topic")
-          deletedTopics += topic
-        case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted.
-          val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
-          deletedTopics -= topic
-          topics += topic
-          // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
-          // chance that a topic will be recreated after deletion due to the asynchronous update.
-          // Hence, always overwrite to handle this race condition.
-          testUtils.createTopic(topic, partitions = 1, overwrite = true)
-          logInfo(s"Create topic $topic")
-        case 3 =>
-          Thread.sleep(1000)
-        case _ => // Push random messages
-          for (topic <- topics) {
-            val size = Random.nextInt(10)
-            for (_ <- 0 until size) {
-              testUtils.sendMessages(topic, Array(Random.nextInt(10).toString))
-            }
-          }
-      }
-      // `failOnDataLoss` is `false`, we should not fail the query
-      if (query.exception.nonEmpty) {
-        throw query.exception.get
-      }
-    }
-
-    query.stop()
-    // `failOnDataLoss` is `false`, we should not fail the query
-    if (query.exception.nonEmpty) {
-      throw query.exception.get
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f24fd7f..e75e1d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1146,10 +1146,20 @@ object SQLConf {
   val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
     .internal()
     .doc("A comma-separated list of fully qualified data source register class names for which" +
-      " StreamWriteSupport is disabled. Writes to these sources will fail back to the V1 Sink.")
+      " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.")
     .stringConf
     .createWithDefault("")
 
+  val DISABLED_V2_STREAMING_MICROBATCH_READERS =
+    buildConf("spark.sql.streaming.disabledV2MicroBatchReaders")
+      .internal()
+      .doc(
+        "A comma-separated list of fully qualified data source register class names for which " +
+          "MicroBatchReadSupport is disabled. Reads from these sources will fall back to the " +
+          "V1 Sources.")
+      .stringConf
+      .createWithDefault("")
+
   object PartitionOverwriteMode extends Enumeration {
     val STATIC, DYNAMIC = Value
   }
@@ -1525,6 +1535,9 @@ class SQLConf extends Serializable with Logging {
 
   def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
 
+  def disabledV2StreamingMicroBatchReaders: String =
+    getConf(DISABLED_V2_STREAMING_MICROBATCH_READERS)
+
   def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
 
   def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index ac73ba3..8465501 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -72,27 +72,36 @@ class MicroBatchExecution(
     // Note that we have to use the previous `output` as attributes in StreamingExecutionRelation,
     // since the existing logical plan has already used those attributes. The per-microbatch
     // transformation is responsible for replacing attributes with their final values.
+
+    val disabledSources =
+      sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
+
     val _logicalPlan = analyzedPlan.transform {
-      case streamingRelation@StreamingRelation(dataSource, _, output) =>
+      case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-          val source = dataSource.createSource(metadataPath)
+          val source = dataSourceV1.createSource(metadataPath)
           nextSourceId += 1
+          logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]")
           StreamingExecutionRelation(source, output)(sparkSession)
         })
-      case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) =>
+      case s @ StreamingRelationV2(
+        dataSourceV2: MicroBatchReadSupport, sourceName, options, output, _) if
+          !disabledSources.contains(dataSourceV2.getClass.getCanonicalName) =>
         v2ToExecutionRelationMap.getOrElseUpdate(s, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-          val reader = source.createMicroBatchReader(
+          val reader = dataSourceV2.createMicroBatchReader(
             Optional.empty(), // user specified schema
             metadataPath,
             new DataSourceOptions(options.asJava))
           nextSourceId += 1
+          logInfo(s"Using MicroBatchReader [$reader] from " +
+            s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
           StreamingExecutionRelation(reader, output)(sparkSession)
         })
-      case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
+      case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output, v1Relation) =>
         v2ToExecutionRelationMap.getOrElseUpdate(s, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
@@ -102,6 +111,7 @@ class MicroBatchExecution(
           }
           val source = v1Relation.get.dataSource.createSource(metadataPath)
           nextSourceId += 1
+          logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dataSourceV2]")
           StreamingExecutionRelation(source, output)(sparkSession)
         })
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by td...@apache.org.
[SPARK-23362][SS] Migrate Kafka Microbatch source to v2

## What changes were proposed in this pull request?
Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2).

Performance comparison:
In a unit test with in-process Kafka broker, I tested the read throughput of V1 and V2 using 20M records in a single partition. They were comparable.

## How was this patch tested?
Existing tests, few modified to be better tests than the existing ones.

Author: Tathagata Das <ta...@gmail.com>

Closes #20554 from tdas/SPARK-23362.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a73aa31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a73aa31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a73aa31

Branch: refs/heads/master
Commit: 0a73aa31f41c83503d5d99eff3c9d7b406014ab3
Parents: c5857e4
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Feb 16 14:30:19 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Feb 16 14:30:19 2018 -0800

----------------------------------------------------------------------
 dev/.rat-excludes                               |    1 +
 .../sql/kafka010/CachedKafkaConsumer.scala      |    2 +-
 .../sql/kafka010/KafkaContinuousReader.scala    |   29 +-
 .../sql/kafka010/KafkaMicroBatchReader.scala    |  403 ++++++
 .../KafkaRecordToUnsafeRowConverter.scala       |   52 +
 .../apache/spark/sql/kafka010/KafkaSource.scala |   19 +-
 .../sql/kafka010/KafkaSourceProvider.scala      |   70 +-
 ...fka-source-initial-offset-future-version.bin |    2 +
 ...afka-source-initial-offset-version-2.1.0.bin |    2 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 1222 ++++++++++++++++++
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 1122 ----------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   15 +-
 .../streaming/MicroBatchExecution.scala         |   20 +-
 13 files changed, 1786 insertions(+), 1173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/dev/.rat-excludes
----------------------------------------------------------------------
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 243fbe3..9552d00 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -105,3 +105,4 @@ META-INF/*
 spark-warehouse
 structured-streaming/*
 kafka-source-initial-offset-version-2.1.0.bin
+kafka-source-initial-offset-future-version.bin

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 90ed7b1..e97881c 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index b049a05..97a0f66 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
@@ -187,13 +187,9 @@ class KafkaContinuousDataReader(
     kafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
-  private val topic = topicPartition.topic
-  private val kafkaPartition = topicPartition.partition
-  private val consumer = CachedKafkaConsumer.createUncached(topic, kafkaPartition, kafkaParams)
-
-  private val sharedRow = new UnsafeRow(7)
-  private val bufferHolder = new BufferHolder(sharedRow)
-  private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
+  private val consumer =
+    CachedKafkaConsumer.createUncached(topicPartition.topic, topicPartition.partition, kafkaParams)
+  private val converter = new KafkaRecordToUnsafeRowConverter
 
   private var nextKafkaOffset = startOffset
   private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
@@ -232,22 +228,7 @@ class KafkaContinuousDataReader(
   }
 
   override def get(): UnsafeRow = {
-    bufferHolder.reset()
-
-    if (currentRecord.key == null) {
-      rowWriter.setNullAt(0)
-    } else {
-      rowWriter.write(0, currentRecord.key)
-    }
-    rowWriter.write(1, currentRecord.value)
-    rowWriter.write(2, UTF8String.fromString(currentRecord.topic))
-    rowWriter.write(3, currentRecord.partition)
-    rowWriter.write(4, currentRecord.offset)
-    rowWriter.write(5,
-      DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(currentRecord.timestamp)))
-    rowWriter.write(6, currentRecord.timestampType.id)
-    sharedRow.setTotalSize(bufferHolder.totalSize)
-    sharedRow
+    converter.toUnsafeRow(currentRecord)
   }
 
   override def getOffset(): KafkaSourcePartitionOffset = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
new file mode 100644
index 0000000..fb647ca
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+    kafkaOffsetReader: KafkaOffsetReader,
+    executorKafkaParams: ju.Map[String, Object],
+    options: DataSourceOptions,
+    metadataPath: String,
+    startingOffsets: KafkaOffsetRangeLimit,
+    failOnDataLoss: Boolean)
+  extends MicroBatchReader with SupportsScanUnsafeRow with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+    "kafkaConsumer.pollTimeoutMs",
+    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+    // Make sure initialPartitionOffsets is initialized
+    initialPartitionOffsets
+
+    startPartitionOffsets = Option(start.orElse(null))
+        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+        .getOrElse(initialPartitionOffsets)
+
+    endPartitionOffsets = Option(end.orElse(null))
+        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+        .getOrElse {
+          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+          maxOffsetsPerTrigger.map { maxOffsets =>
+            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+          }.getOrElse {
+            latestPartitionOffsets
+          }
+        }
+  }
+
+  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+    // Find the new partitions, and get their earliest offsets
+    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+    if (newPartitionOffsets.keySet != newPartitions) {
+      // We cannot get from offsets for some partitions. It means they got deleted.
+      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+      reportDataLoss(
+        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+    }
+    logInfo(s"Partitions added: $newPartitionOffsets")
+    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+      reportDataLoss(
+        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+    }
+
+    // Find deleted partitions, and report data loss if required
+    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+    if (deletedPartitions.nonEmpty) {
+      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+    }
+
+    // Use the until partitions to calculate offset ranges to ignore partitions that have
+    // been deleted
+    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+      // Ignore partitions that we don't know the from offsets.
+      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+    }.toSeq
+    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+    val sortedExecutors = getSortedExecutorList()
+    val numExecutors = sortedExecutors.length
+    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+    // Calculate offset ranges
+    val factories = topicPartitions.flatMap { tp =>
+      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+        newPartitionOffsets.getOrElse(
+        tp, {
+          // This should not happen since newPartitionOffsets contains all partitions not in
+          // fromPartitionOffsets
+          throw new IllegalStateException(s"$tp doesn't have a from offset")
+        })
+      }
+      val untilOffset = endPartitionOffsets(tp)
+
+      if (untilOffset >= fromOffset) {
+        // This allows cached KafkaConsumers in the executors to be re-used to read the same
+        // partition in every batch.
+        val preferredLoc = if (numExecutors > 0) {
+          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+        } else None
+        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+        Some(
+          new KafkaMicroBatchDataReaderFactory(
+            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+      } else {
+        reportDataLoss(
+          s"Partition $tp's offset was changed from " +
+            s"$fromOffset to $untilOffset, some data may have been missed")
+        None
+      }
+    }
+    factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
+  }
+
+  override def getStartOffset: Offset = {
+    KafkaSourceOffset(startPartitionOffsets)
+  }
+
+  override def getEndOffset: Offset = {
+    KafkaSourceOffset(endPartitionOffsets)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+  }
+
+  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+  override def commit(end: Offset): Unit = {}
+
+  override def stop(): Unit = {
+    kafkaOffsetReader.close()
+  }
+
+  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+  /**
+   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+   * the checkpoint.
+   */
+  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+    // (KAFKA-1894).
+    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+    assert(SparkSession.getActiveSession.nonEmpty)
+
+    val metadataLog =
+      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+    metadataLog.get(0).getOrElse {
+      val offsets = startingOffsets match {
+        case EarliestOffsetRangeLimit =>
+          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+        case LatestOffsetRangeLimit =>
+          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+        case SpecificOffsetRangeLimit(p) =>
+          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+      }
+      metadataLog.add(0, offsets)
+      logInfo(s"Initial offsets: $offsets")
+      offsets
+    }.partitionToOffsets
+  }
+
+  /** Proportionally distribute limit number of offsets among topicpartitions */
+  private def rateLimit(
+      limit: Long,
+      from: PartitionOffsetMap,
+      until: PartitionOffsetMap): PartitionOffsetMap = {
+    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+    val sizes = until.flatMap {
+      case (tp, end) =>
+        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+          val size = end - begin
+          logDebug(s"rateLimit $tp size is $size")
+          if (size > 0) Some(tp -> size) else None
+        }
+    }
+    val total = sizes.values.sum.toDouble
+    if (total < 1) {
+      until
+    } else {
+      until.map {
+        case (tp, end) =>
+          tp -> sizes.get(tp).map { size =>
+            val begin = from.get(tp).getOrElse(fromNew(tp))
+            val prorate = limit * (size / total)
+            // Don't completely starve small topicpartitions
+            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+            // Paranoia, make sure not to return an offset that's past end
+            Math.min(end, off)
+          }.getOrElse(end)
+      }
+    }
+  }
+
+  private def getSortedExecutorList(): Array[String] = {
+
+    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+      if (a.host == b.host) {
+        a.executorId > b.executorId
+      } else {
+        a.host > b.host
+      }
+    }
+
+    val bm = SparkEnv.get.blockManager
+    bm.master.getPeers(bm.blockManagerId).toArray
+      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+      .sortWith(compare)
+      .map(_.toString)
+  }
+
+  /**
+   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * Otherwise, just log a warning.
+   */
+  private def reportDataLoss(message: String): Unit = {
+    if (failOnDataLoss) {
+      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+    } else {
+      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+    }
+  }
+
+  /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */
+  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+    val VERSION = 1
+
+    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+      writer.write("v" + VERSION + "\n")
+      writer.write(metadata.json)
+      writer.flush
+    }
+
+    override def deserialize(in: InputStream): KafkaSourceOffset = {
+      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+      // HDFSMetadataLog guarantees that it never creates a partial file.
+      assert(content.length != 0)
+      if (content(0) == 'v') {
+        val indexOfNewLine = content.indexOf("\n")
+        if (indexOfNewLine > 0) {
+          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+        } else {
+          throw new IllegalStateException(
+            s"Log file was malformed: failed to detect the log file version line.")
+        }
+      } else {
+        // The log was generated by Spark 2.1.0
+        KafkaSourceOffset(SerializedOffset(content))
+      }
+    }
+  }
+}
+
+/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReaderFactory(
+    range: KafkaOffsetRange,
+    preferredLoc: Option[String],
+    executorKafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
+
+  override def preferredLocations(): Array[String] = preferredLoc.toArray
+
+  override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
+    range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+}
+
+/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReader(
+    offsetRange: KafkaOffsetRange,
+    executorKafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging {
+
+  private val consumer = CachedKafkaConsumer.getOrCreate(
+    offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+  private val rangeToRead = resolveRange(offsetRange)
+  private val converter = new KafkaRecordToUnsafeRowConverter
+
+  private var nextOffset = rangeToRead.fromOffset
+  private var nextRow: UnsafeRow = _
+
+  override def next(): Boolean = {
+    if (nextOffset < rangeToRead.untilOffset) {
+      val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+      if (record != null) {
+        nextRow = converter.toUnsafeRow(record)
+        true
+      } else {
+        false
+      }
+    } else {
+      false
+    }
+  }
+
+  override def get(): UnsafeRow = {
+    assert(nextRow != null)
+    nextOffset += 1
+    nextRow
+  }
+
+  override def close(): Unit = {
+    // Indicate that we're no longer using this consumer
+    CachedKafkaConsumer.releaseKafkaConsumer(
+      offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+  }
+
+  private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
+    if (range.fromOffset < 0 || range.untilOffset < 0) {
+      // Late bind the offset range
+      val availableOffsetRange = consumer.getAvailableOffsetRange()
+      val fromOffset = if (range.fromOffset < 0) {
+        assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
+          s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
+        availableOffsetRange.earliest
+      } else {
+        range.fromOffset
+      }
+      val untilOffset = if (range.untilOffset < 0) {
+        assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
+          s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
+        availableOffsetRange.latest
+      } else {
+        range.untilOffset
+      }
+      KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset)
+    } else {
+      range
+    }
+  }
+}
+
+private[kafka010] case class KafkaOffsetRange(
+  topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
new file mode 100644
index 0000000..1acdd56
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.unsafe.types.UTF8String
+
+/** A simple class for converting Kafka ConsumerRecord to UnsafeRow */
+private[kafka010] class KafkaRecordToUnsafeRowConverter {
+  private val sharedRow = new UnsafeRow(7)
+  private val bufferHolder = new BufferHolder(sharedRow)
+  private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
+
+  def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
+    bufferHolder.reset()
+
+    if (record.key == null) {
+      rowWriter.setNullAt(0)
+    } else {
+      rowWriter.write(0, record.key)
+    }
+    rowWriter.write(1, record.value)
+    rowWriter.write(2, UTF8String.fromString(record.topic))
+    rowWriter.write(3, record.partition)
+    rowWriter.write(4, record.offset)
+    rowWriter.write(
+      5,
+      DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(record.timestamp)))
+    rowWriter.write(6, record.timestampType.id)
+    sharedRow.setTotalSize(bufferHolder.totalSize)
+    sharedRow
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 169a5d0..1c7b3a2 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.kafka010.KafkaSource._
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -306,7 +307,7 @@ private[kafka010] class KafkaSource(
     kafkaReader.close()
   }
 
-  override def toString(): String = s"KafkaSource[$kafkaReader]"
+  override def toString(): String = s"KafkaSourceV1[$kafkaReader]"
 
   /**
    * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
@@ -323,22 +324,6 @@ private[kafka010] class KafkaSource(
 
 /** Companion object for the [[KafkaSource]]. */
 private[kafka010] object KafkaSource {
-  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
-    """
-      |Some data may have been lost because they are not available in Kafka any more; either the
-      | data was aged out by Kafka or the topic may have been deleted before all the data in the
-      | topic was processed. If you want your streaming query to fail on such cases, set the source
-      | option "failOnDataLoss" to "true".
-    """.stripMargin
-
-  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
-    """
-      |Some data may have been lost because they are not available in Kafka any more; either the
-      | data was aged out by Kafka or the topic may have been deleted before all the data in the
-      | topic was processed. If you don't want your streaming query to fail on such cases, set the
-      | source option "failOnDataLoss" to "false".
-    """.stripMargin
-
   private[kafka010] val VERSION = 1
 
   def getSortedExecutorList(sc: SparkContext): Array[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index d4fa035..0aa64a6 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,13 +30,13 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 /**
- * The provider class for the [[KafkaSource]]. This provider is designed such that it throws
+ * The provider class for all Kafka readers and writers. It is designed such that it throws
  * IllegalArgumentException when the Kafka Dataset is created, so that it can catch
  * missing options even before the query is started.
  */
@@ -47,6 +47,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     with CreatableRelationProvider
     with StreamWriteSupport
     with ContinuousReadSupport
+    with MicroBatchReadSupport
     with Logging {
   import KafkaSourceProvider._
 
@@ -105,6 +106,52 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
       failOnDataLoss(caseInsensitiveParams))
   }
 
+  /**
+   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches
+   * of Kafka data in a micro-batch streaming query.
+   */
+  override def createMicroBatchReader(
+      schema: Optional[StructType],
+      metadataPath: String,
+      options: DataSourceOptions): KafkaMicroBatchReader = {
+
+    val parameters = options.asMap().asScala.toMap
+    validateStreamOptions(parameters)
+    // Each running query should use its own group id. Otherwise, the query may be only assigned
+    // partial data since Kafka will assign partitions to multiple consumers having the same group
+    // id. Hence, we should generate a unique id for each query.
+    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
+
+    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
+    val specifiedKafkaParams =
+      parameters
+        .keySet
+        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
+        .map { k => k.drop(6).toString -> parameters(k) }
+        .toMap
+
+    val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
+      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
+
+    val kafkaOffsetReader = new KafkaOffsetReader(
+      strategy(caseInsensitiveParams),
+      kafkaParamsForDriver(specifiedKafkaParams),
+      parameters,
+      driverGroupIdPrefix = s"$uniqueGroupId-driver")
+
+    new KafkaMicroBatchReader(
+      kafkaOffsetReader,
+      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+      options,
+      metadataPath,
+      startingStreamOffsets,
+      failOnDataLoss(caseInsensitiveParams))
+  }
+
+  /**
+   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
+   * Kafka data in a continuous streaming query.
+   */
   override def createContinuousReader(
       schema: Optional[StructType],
       metadataPath: String,
@@ -408,8 +455,27 @@ private[kafka010] object KafkaSourceProvider extends Logging {
   private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
   private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+
   val TOPIC_OPTION_KEY = "topic"
 
+  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
+    """
+      |Some data may have been lost because they are not available in Kafka any more; either the
+      | data was aged out by Kafka or the topic may have been deleted before all the data in the
+      | topic was processed. If you want your streaming query to fail on such cases, set the source
+      | option "failOnDataLoss" to "true".
+    """.stripMargin
+
+  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
+    """
+      |Some data may have been lost because they are not available in Kafka any more; either the
+      | data was aged out by Kafka or the topic may have been deleted before all the data in the
+      | topic was processed. If you don't want your streaming query to fail on such cases, set the
+      | source option "failOnDataLoss" to "false".
+    """.stripMargin
+
+
+
   private val deserClassName = classOf[ByteArrayDeserializer].getName
 
   def getKafkaOffsetRangeLimit(

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
new file mode 100644
index 0000000..d530773
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
@@ -0,0 +1,2 @@
+0v99999
+{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
index ae928e7..8c78d9e 100644
--- a/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
+++ b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
new file mode 100644
index 0000000..ed4ecfe
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -0,0 +1,1222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+import java.util.{Locale, Properties}
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.io.Source
+import scala.util.Random
+
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, ForeachWriter}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider._
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override val streamingTimeout = 30.seconds
+
+  protected val brokerProps = Map[String, Object]()
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils(brokerProps)
+    testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    if (testUtils != null) {
+      testUtils.teardown()
+      testUtils = null
+    }
+    super.afterAll()
+  }
+
+  protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
+    // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure
+    // its "getOffset" is called before pushing any data. Otherwise, because of the race condition,
+    // we don't know which data should be fetched when `startingOffsets` is latest.
+    q match {
+      case c: ContinuousExecution => c.awaitEpoch(0)
+      case m: MicroBatchExecution => m.processAllAvailable()
+    }
+    true
+  }
+
+  protected def setTopicPartitions(topic: String, newCount: Int, query: StreamExecution) : Unit = {
+    testUtils.addPartitions(topic, newCount)
+  }
+
+  /**
+   * Add data to Kafka.
+   *
+   * `topicAction` can be used to run actions for each topic before inserting data.
+   */
+  case class AddKafkaData(topics: Set[String], data: Int*)
+    (implicit ensureDataInMultiplePartition: Boolean = false,
+      concurrent: Boolean = false,
+      message: String = "",
+      topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData {
+
+    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
+      query match {
+        // Make sure no Spark job is running when deleting a topic
+        case Some(m: MicroBatchExecution) => m.processAllAvailable()
+        case _ =>
+      }
+
+      val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
+      val newTopics = topics.diff(existingTopics.keySet)
+      for (newTopic <- newTopics) {
+        topicAction(newTopic, None)
+      }
+      for (existingTopicPartitions <- existingTopics) {
+        topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2))
+      }
+
+      require(
+        query.nonEmpty,
+        "Cannot add data when there is no query for finding the active kafka source")
+
+      val sources = {
+        query.get.logicalPlan.collect {
+          case StreamingExecutionRelation(source: KafkaSource, _) => source
+          case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
+        } ++ (query.get.lastExecution match {
+          case null => Seq()
+          case e => e.logical.collect {
+            case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+          }
+        })
+      }.distinct
+
+      if (sources.isEmpty) {
+        throw new Exception(
+          "Could not find Kafka source in the StreamExecution logical plan to add data to")
+      } else if (sources.size > 1) {
+        throw new Exception(
+          "Could not select the Kafka source in the StreamExecution logical plan as there" +
+            "are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
+      }
+      val kafkaSource = sources.head
+      val topic = topics.toSeq(Random.nextInt(topics.size))
+      val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString }.toArray)
+
+      def metadataToStr(m: (String, RecordMetadata)): String = {
+        s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}"
+      }
+      // Verify that the test data gets inserted into multiple partitions
+      if (ensureDataInMultiplePartition) {
+        require(
+          sentMetadata.groupBy(_._2.partition).size > 1,
+          s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}")
+      }
+
+      val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics))
+      logInfo(s"Added data, expected offset $offset")
+      (kafkaSource, offset)
+    }
+
+    override def toString: String =
+      s"AddKafkaData(topics = $topics, data = $data, message = $message)"
+  }
+
+  private val topicId = new AtomicInteger(0)
+  protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+}
+
+abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
+
+  import testImplicits._
+
+  test("(de)serialization of initial offsets") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+
+    testStream(reader.load)(
+      makeSureGetOffsetCalled,
+      StopStream,
+      StartStream(),
+      StopStream)
+  }
+
+  test("maxOffsetsPerTrigger") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("1"), Some(2))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 10)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+      true
+    }
+
+    testStream(mapped)(
+      StartStream(ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // 1 from smallest, 1 from middle, 8 from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smallest now empty, 1 more from middle, 9 more from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116
+      ),
+      StopStream,
+      StartStream(ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // smallest now empty, 1 more from middle, 9 more from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 121, 122, 123, 124, 125
+      ),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smallest now empty, 1 more from middle, 9 more from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
+        13, 126, 127, 128, 129, 130, 131, 132, 133, 134
+      )
+    )
+  }
+
+  test("input row metrics") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("-1"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("subscribe", topic)
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+    testStream(mapped)(
+      StartStream(trigger = ProcessingTime(1)),
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(2, 3, 4),
+      AssertOnQuery { query =>
+        val recordsRead = query.recentProgress.map(_.numInputRows).sum
+        recordsRead == 3
+      }
+    )
+  }
+
+  test("subscribing topic by pattern with topic deletions") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-seems"
+    val topic2 = topicPrefix + "-bad"
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("-1"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribePattern", s"$topicPrefix-.*")
+      .option("failOnDataLoss", "false")
+
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(2, 3, 4),
+      Assert {
+        testUtils.deleteTopic(topic)
+        testUtils.createTopic(topic2, partitions = 5)
+        true
+      },
+      AddKafkaData(Set(topic2), 4, 5, 6),
+      CheckAnswer(2, 3, 4, 5, 6, 7)
+    )
+  }
+
+  test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
+    withTempDir { metadataPath =>
+      val topic = "kafka-initial-offset-current"
+      testUtils.createTopic(topic, partitions = 1)
+
+      val initialOffsetFile = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0").toFile
+
+      val df = spark
+        .readStream
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("subscribe", topic)
+        .option("startingOffsets", s"earliest")
+        .load()
+
+      // Test the written initial offset file has 0 byte in the beginning, so that
+      // Spark 2.1.0 can read the offsets (see SPARK-19517)
+      testStream(df)(
+        StartStream(checkpointLocation = metadataPath.getAbsolutePath),
+        makeSureGetOffsetCalled)
+
+      val binarySource = Source.fromFile(initialOffsetFile)
+      try {
+        assert(binarySource.next().toInt == 0)  // first byte is binary 0
+      } finally {
+        binarySource.close()
+      }
+    }
+  }
+
+  test("deserialization of initial offset written by Spark 2.1.0 (SPARK-19517)") {
+    withTempDir { metadataPath =>
+      val topic = "kafka-initial-offset-2-1-0"
+      testUtils.createTopic(topic, partitions = 3)
+      testUtils.sendMessages(topic, Array("0", "1", "2"), Some(0))
+      testUtils.sendMessages(topic, Array("0", "10", "20"), Some(1))
+      testUtils.sendMessages(topic, Array("0", "100", "200"), Some(2))
+
+      // Copy the initial offset file into the right location inside the checkpoint root directory
+      // such that the Kafka source can read it for initial offsets.
+      val from = new File(
+        getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
+      val to = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0")
+      Files.createDirectories(to.getParent)
+      Files.copy(from, to)
+
+      val df = spark
+        .readStream
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("subscribe", topic)
+        .option("startingOffsets", s"earliest")
+        .load()
+        .selectExpr("CAST(value AS STRING)")
+        .as[String]
+        .map(_.toInt)
+
+      // Test that the query starts from the expected initial offset (i.e. read older offsets,
+      // even though startingOffsets is latest).
+      testStream(df)(
+        StartStream(checkpointLocation = metadataPath.getAbsolutePath),
+        AddKafkaData(Set(topic), 1000),
+        CheckAnswer(0, 1, 2, 10, 20, 200, 1000))
+    }
+  }
+
+  test("deserialization of initial offset written by future version") {
+    withTempDir { metadataPath =>
+      val topic = "kafka-initial-offset-future-version"
+      testUtils.createTopic(topic, partitions = 3)
+
+      // Copy the initial offset file into the right location inside the checkpoint root directory
+      // such that the Kafka source can read it for initial offsets.
+      val from = new File(
+        getClass.getResource("/kafka-source-initial-offset-future-version.bin").toURI).toPath
+      val to = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0")
+      Files.createDirectories(to.getParent)
+      Files.copy(from, to)
+
+      val df = spark
+        .readStream
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("subscribe", topic)
+        .load()
+        .selectExpr("CAST(value AS STRING)")
+        .as[String]
+        .map(_.toInt)
+
+      testStream(df)(
+        StartStream(checkpointLocation = metadataPath.getAbsolutePath),
+        ExpectFailure[IllegalStateException](e => {
+          Seq(
+            s"maximum supported log version is v1, but encountered v99999",
+            "produced by a newer version of Spark and cannot be read by this version"
+          ).foreach { message =>
+            assert(e.toString.contains(message))
+          }
+        }))
+    }
+  }
+
+  test("KafkaSource with watermark") {
+    val now = System.currentTimeMillis()
+    val topic = newTopic()
+    testUtils.createTopic(newTopic(), partitions = 1)
+    testUtils.sendMessages(topic, Array(1).map(_.toString))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("startingOffsets", s"earliest")
+      .option("subscribe", topic)
+      .load()
+
+    val windowedAggregation = kafka
+      .withWatermark("timestamp", "10 seconds")
+      .groupBy(window($"timestamp", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start") as 'window, $"count")
+
+    val query = windowedAggregation
+      .writeStream
+      .format("memory")
+      .outputMode("complete")
+      .queryName("kafkaWatermark")
+      .start()
+    query.processAllAvailable()
+    val rows = spark.table("kafkaWatermark").collect()
+    assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+    val row = rows(0)
+    // We cannot check the exact window start time as it depands on the time that messages were
+    // inserted by the producer. So here we just use a low bound to make sure the internal
+    // conversion works.
+    assert(
+      row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+      s"Unexpected results: $row")
+    assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+    query.stop()
+  }
+
+  test("delete a topic when a Spark job is running") {
+    KafkaSourceSuite.collectedData.clear()
+
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribe", topic)
+      // If a topic is deleted and we try to poll data starting from offset 0,
+      // the Kafka consumer will just block until timeout and return an empty result.
+      // So set the timeout to 1 second to make this test fast.
+      .option("kafkaConsumer.pollTimeoutMs", "1000")
+      .option("startingOffsets", "earliest")
+      .option("failOnDataLoss", "false")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    KafkaSourceSuite.globalTestUtils = testUtils
+    // The following ForeachWriter will delete the topic before fetching data from Kafka
+    // in executors.
+    val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {
+      override def open(partitionId: Long, version: Long): Boolean = {
+        KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
+        true
+      }
+
+      override def process(value: Int): Unit = {
+        KafkaSourceSuite.collectedData.add(value)
+      }
+
+      override def close(errorOrNull: Throwable): Unit = {}
+    }).start()
+    query.processAllAvailable()
+    query.stop()
+    // `failOnDataLoss` is `false`, we should not fail the query
+    assert(query.exception.isEmpty)
+  }
+
+  test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") {
+    def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = {
+      val topic = newTopic()
+      testUtils.createTopic(topic, partitions = 1)
+      testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
+
+      val reader = spark
+        .readStream
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("kafka.metadata.max.age.ms", "1")
+        .option("maxOffsetsPerTrigger", 5)
+        .option("subscribe", topic)
+        .option("startingOffsets", "earliest")
+
+      reader.load()
+        .selectExpr("CAST(value AS STRING)")
+        .as[String]
+        .map(k => k.toInt)
+    }
+
+    val df1 = getSpecificDF(0 to 9)
+    val df2 = getSpecificDF(100 to 199)
+
+    val kafka = df1.union(df2)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+      true
+    }
+
+    testStream(kafka)(
+      StartStream(ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // 5 from smaller topic, 5 from bigger one
+      CheckLastBatch((0 to 4) ++ (100 to 104): _*),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // 5 from smaller topic, 5 from bigger one
+      CheckLastBatch((5 to 9) ++ (105 to 109): _*),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smaller topic empty, 5 from bigger one
+      CheckLastBatch(110 to 114: _*),
+      StopStream,
+      StartStream(ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // smallest now empty, 5 from bigger one
+      CheckLastBatch(115 to 119: _*),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smallest now empty, 5 from bigger one
+      CheckLastBatch(120 to 124: _*)
+    )
+  }
+
+  test("ensure stream-stream self-join generates only one offset in offset log") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 2)
+    require(testUtils.getLatestOffsets(Set(topic)).size === 2)
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("subscribe", topic)
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .load()
+
+    val values = kafka
+      .selectExpr("CAST(CAST(value AS STRING) AS INT) AS value",
+        "CAST(CAST(value AS STRING) AS INT) % 5 AS key")
+
+    val join = values.join(values, "key")
+
+    testStream(join)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2),
+      CheckAnswer((1, 1, 1), (2, 2, 2)),
+      AddKafkaData(Set(topic), 6, 3),
+      CheckAnswer((1, 1, 1), (2, 2, 2), (3, 3, 3), (1, 6, 1), (1, 1, 6), (1, 6, 6))
+    )
+  }
+}
+
+
+class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(
+      "spark.sql.streaming.disabledV2MicroBatchReaders",
+      classOf[KafkaSourceProvider].getCanonicalName)
+  }
+
+  test("V1 Source is used when disabled through SQLConf") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribePattern", s"$topic.*")
+      .load()
+
+    testStream(kafka)(
+      makeSureGetOffsetCalled,
+      AssertOnQuery { query =>
+        query.logicalPlan.collect {
+          case StreamingExecutionRelation(_: KafkaSource, _) => true
+        }.nonEmpty
+      }
+    )
+  }
+}
+
+class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
+
+  test("V2 Source is used by default") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribePattern", s"$topic.*")
+      .load()
+
+    testStream(kafka)(
+      makeSureGetOffsetCalled,
+      AssertOnQuery { query =>
+        query.logicalPlan.collect {
+          case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true
+        }.nonEmpty
+      }
+    )
+  }
+}
+
+abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
+
+  import testImplicits._
+
+  test("cannot stop Kafka stream") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribePattern", s"$topic.*")
+
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      StopStream
+    )
+  }
+
+  for (failOnDataLoss <- Seq(true, false)) {
+    test(s"assign from latest offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topic = newTopic()
+      testFromLatestOffsets(
+        topic,
+        addPartitions = false,
+        failOnDataLoss = failOnDataLoss,
+        "assign" -> assignString(topic, 0 to 4))
+    }
+
+    test(s"assign from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topic = newTopic()
+      testFromEarliestOffsets(
+        topic,
+        addPartitions = false,
+        failOnDataLoss = failOnDataLoss,
+        "assign" -> assignString(topic, 0 to 4))
+    }
+
+    test(s"assign from specific offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topic = newTopic()
+      testFromSpecificOffsets(
+        topic,
+        failOnDataLoss = failOnDataLoss,
+        "assign" -> assignString(topic, 0 to 4),
+        "failOnDataLoss" -> failOnDataLoss.toString)
+    }
+
+    test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topic = newTopic()
+      testFromLatestOffsets(
+        topic,
+        addPartitions = true,
+        failOnDataLoss = failOnDataLoss,
+        "subscribe" -> topic)
+    }
+
+    test(s"subscribing topic by name from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topic = newTopic()
+      testFromEarliestOffsets(
+        topic,
+        addPartitions = true,
+        failOnDataLoss = failOnDataLoss,
+        "subscribe" -> topic)
+    }
+
+    test(s"subscribing topic by name from specific offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topic = newTopic()
+      testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, "subscribe" -> topic)
+    }
+
+    test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topicPrefix = newTopic()
+      val topic = topicPrefix + "-suffix"
+      testFromLatestOffsets(
+        topic,
+        addPartitions = true,
+        failOnDataLoss = failOnDataLoss,
+        "subscribePattern" -> s"$topicPrefix-.*")
+    }
+
+    test(s"subscribing topic by pattern from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topicPrefix = newTopic()
+      val topic = topicPrefix + "-suffix"
+      testFromEarliestOffsets(
+        topic,
+        addPartitions = true,
+        failOnDataLoss = failOnDataLoss,
+        "subscribePattern" -> s"$topicPrefix-.*")
+    }
+
+    test(s"subscribing topic by pattern from specific offsets (failOnDataLoss: $failOnDataLoss)") {
+      val topicPrefix = newTopic()
+      val topic = topicPrefix + "-suffix"
+      testFromSpecificOffsets(
+        topic,
+        failOnDataLoss = failOnDataLoss,
+        "subscribePattern" -> s"$topicPrefix-.*")
+    }
+  }
+
+  test("bad source options") {
+    def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
+      val ex = intercept[IllegalArgumentException] {
+        val reader = spark
+          .readStream
+          .format("kafka")
+        options.foreach { case (k, v) => reader.option(k, v) }
+        reader.load()
+      }
+      expectedMsgs.foreach { m =>
+        assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT)))
+      }
+    }
+
+    // Specifying an ending offset
+    testBadOptions("endingOffsets" -> "latest")("Ending offset not valid in streaming queries")
+
+    // No strategy specified
+    testBadOptions()("options must be specified", "subscribe", "subscribePattern")
+
+    // Multiple strategies specified
+    testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
+      "only one", "options can be specified")
+
+    testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")(
+      "only one", "options can be specified")
+
+    testBadOptions("assign" -> "")("no topicpartitions to assign")
+    testBadOptions("subscribe" -> "")("no topics to subscribe")
+    testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
+  }
+
+  test("unsupported kafka configs") {
+    def testUnsupportedConfig(key: String, value: String = "someValue"): Unit = {
+      val ex = intercept[IllegalArgumentException] {
+        val reader = spark
+          .readStream
+          .format("kafka")
+          .option("subscribe", "topic")
+          .option("kafka.bootstrap.servers", "somehost")
+          .option(s"$key", value)
+        reader.load()
+      }
+      assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported"))
+    }
+
+    testUnsupportedConfig("kafka.group.id")
+    testUnsupportedConfig("kafka.auto.offset.reset")
+    testUnsupportedConfig("kafka.enable.auto.commit")
+    testUnsupportedConfig("kafka.interceptor.classes")
+    testUnsupportedConfig("kafka.key.deserializer")
+    testUnsupportedConfig("kafka.value.deserializer")
+
+    testUnsupportedConfig("kafka.auto.offset.reset", "none")
+    testUnsupportedConfig("kafka.auto.offset.reset", "someValue")
+    testUnsupportedConfig("kafka.auto.offset.reset", "earliest")
+    testUnsupportedConfig("kafka.auto.offset.reset", "latest")
+  }
+
+  test("get offsets from case insensitive parameters") {
+    for ((optionKey, optionValue, answer) <- Seq(
+      (STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
+      (ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit),
+      (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
+        SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) {
+      val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), optionKey, answer)
+      assert(offset === answer)
+    }
+
+    for ((optionKey, answer) <- Seq(
+      (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
+      (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
+      val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer)
+      assert(offset === answer)
+    }
+  }
+
+  private def assignString(topic: String, partitions: Iterable[Int]): String = {
+    JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  private def testFromSpecificOffsets(
+      topic: String,
+      failOnDataLoss: Boolean,
+      options: (String, String)*): Unit = {
+    val partitionOffsets = Map(
+      new TopicPartition(topic, 0) -> -2L,
+      new TopicPartition(topic, 1) -> -1L,
+      new TopicPartition(topic, 2) -> 0L,
+      new TopicPartition(topic, 3) -> 1L,
+      new TopicPartition(topic, 4) -> 2L
+    )
+    val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
+
+    testUtils.createTopic(topic, partitions = 5)
+    // part 0 starts at earliest, these should all be seen
+    testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
+    // part 1 starts at latest, these should all be skipped
+    testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
+    // part 2 starts at 0, these should all be seen
+    testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
+    // part 3 starts at 1, first should be skipped
+    testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
+    // part 4 starts at 2, first and second should be skipped
+    testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("startingOffsets", startingOffsets)
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("failOnDataLoss", failOnDataLoss.toString)
+    options.foreach { case (k, v) => reader.option(k, v) }
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      Execute { q =>
+        // wait to reach the last offset in every partition
+        q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)))
+      },
+      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
+      StopStream,
+      StartStream(),
+      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery
+      AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true),
+      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34),
+      StopStream
+    )
+  }
+
+  test("Kafka column types") {
+    val now = System.currentTimeMillis()
+    val topic = newTopic()
+    testUtils.createTopic(newTopic(), partitions = 1)
+    testUtils.sendMessages(topic, Array(1).map(_.toString))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("startingOffsets", s"earliest")
+      .option("subscribe", topic)
+      .load()
+
+    val query = kafka
+      .writeStream
+      .format("memory")
+      .queryName("kafkaColumnTypes")
+      .trigger(defaultTrigger)
+      .start()
+    eventually(timeout(streamingTimeout)) {
+      assert(spark.table("kafkaColumnTypes").count == 1,
+        s"Unexpected results: ${spark.table("kafkaColumnTypes").collectAsList()}")
+    }
+    val row = spark.table("kafkaColumnTypes").head()
+    assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
+    assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
+    assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
+    assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row")
+    assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
+    // We cannot check the exact timestamp as it's the time that messages were inserted by the
+    // producer. So here we just use a low bound to make sure the internal conversion works.
+    assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
+    assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
+    query.stop()
+  }
+
+  private def testFromLatestOffsets(
+      topic: String,
+      addPartitions: Boolean,
+      failOnDataLoss: Boolean,
+      options: (String, String)*): Unit = {
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("-1"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("startingOffsets", s"latest")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("failOnDataLoss", failOnDataLoss.toString)
+    options.foreach { case (k, v) => reader.option(k, v) }
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(2, 3, 4),
+      StopStream,
+      StartStream(),
+      CheckAnswer(2, 3, 4), // Should get the data back on recovery
+      StopStream,
+      AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
+      StartStream(),
+      CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
+      AddKafkaData(Set(topic), 7, 8),
+      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
+      AssertOnQuery("Add partitions") { query: StreamExecution =>
+        if (addPartitions) setTopicPartitions(topic, 10, query)
+        true
+      },
+      AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
+      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
+    )
+  }
+
+  private def testFromEarliestOffsets(
+      topic: String,
+      addPartitions: Boolean,
+      failOnDataLoss: Boolean,
+      options: (String, String)*): Unit = {
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark.readStream
+    reader
+      .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
+      .option("startingOffsets", s"earliest")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("failOnDataLoss", failOnDataLoss.toString)
+    options.foreach { case (k, v) => reader.option(k, v) }
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
+      CheckAnswer(2, 3, 4, 5, 6, 7),
+      StopStream,
+      StartStream(),
+      CheckAnswer(2, 3, 4, 5, 6, 7),
+      StopStream,
+      AddKafkaData(Set(topic), 7, 8),
+      StartStream(),
+      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
+      AssertOnQuery("Add partitions") { query: StreamExecution =>
+        if (addPartitions) setTopicPartitions(topic, 10, query)
+        true
+      },
+      AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
+      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
+    )
+  }
+}
+
+object KafkaSourceSuite {
+  @volatile var globalTestUtils: KafkaTestUtils = _
+  val collectedData = new ConcurrentLinkedQueue[Any]()
+}
+
+
+class KafkaSourceStressSuite extends KafkaSourceTest {
+
+  import testImplicits._
+
+  val topicId = new AtomicInteger(1)
+
+  @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic)
+
+  def newStressTopic: String = s"stress${topicId.getAndIncrement()}"
+
+  private def nextInt(start: Int, end: Int): Int = {
+    start + Random.nextInt(start + end - 1)
+  }
+
+  test("stress test with multiple topics and partitions")  {
+    topics.foreach { topic =>
+      testUtils.createTopic(topic, partitions = nextInt(1, 6))
+      testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
+    }
+
+    // Create Kafka source that reads from latest offset
+    val kafka =
+      spark.readStream
+        .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("kafka.metadata.max.age.ms", "1")
+        .option("subscribePattern", "stress.*")
+        .option("failOnDataLoss", "false")
+        .load()
+        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+        .as[(String, String)]
+
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    runStressTest(
+      mapped,
+      Seq(makeSureGetOffsetCalled),
+      (d, running) => {
+        Random.nextInt(5) match {
+          case 0 => // Add a new topic
+            topics = topics ++ Seq(newStressTopic)
+            AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic",
+              topicAction = (topic, partition) => {
+                if (partition.isEmpty) {
+                  testUtils.createTopic(topic, partitions = nextInt(1, 6))
+                }
+              })
+          case 1 if running =>
+            // Only delete a topic when the query is running. Otherwise, we may lost data and
+            // cannot check the correctness.
+            val deletedTopic = topics(Random.nextInt(topics.size))
+            if (deletedTopic != topics.head) {
+              topics = topics.filterNot(_ == deletedTopic)
+            }
+            AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic $deletedTopic",
+              topicAction = (topic, partition) => {
+                // Never remove the first topic to make sure we have at least one topic
+                if (topic == deletedTopic && deletedTopic != topics.head) {
+                  testUtils.deleteTopic(deletedTopic)
+                }
+              })
+          case 2 => // Add new partitions
+            AddKafkaData(topics.toSet, d: _*)(message = "Add partition",
+              topicAction = (topic, partition) => {
+                testUtils.addPartitions(topic, partition.get + nextInt(1, 6))
+              })
+          case _ => // Just add new data
+            AddKafkaData(topics.toSet, d: _*)
+        }
+      },
+      iterations = 50)
+  }
+}
+
+class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext {
+
+  import testImplicits._
+
+  private var testUtils: KafkaTestUtils = _
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  override def createSparkSession(): TestSparkSession = {
+    // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+    new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils {
+      override def brokerConfiguration: Properties = {
+        val props = super.brokerConfiguration
+        // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
+        // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
+        // least 30 seconds.
+        props.put("log.cleaner.backoff.ms", "100")
+        props.put("log.segment.bytes", "40")
+        props.put("log.retention.bytes", "40")
+        props.put("log.retention.check.interval.ms", "100")
+        props.put("delete.retention.ms", "10")
+        props.put("log.flush.scheduler.interval.ms", "10")
+        props
+      }
+    }
+    testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    if (testUtils != null) {
+      testUtils.teardown()
+      testUtils = null
+      super.afterAll()
+    }
+  }
+
+  protected def startStream(ds: Dataset[Int]) = {
+    ds.writeStream.foreach(new ForeachWriter[Int] {
+
+      override def open(partitionId: Long, version: Long): Boolean = {
+        true
+      }
+
+      override def process(value: Int): Unit = {
+        // Slow down the processing speed so that messages may be aged out.
+        Thread.sleep(Random.nextInt(500))
+      }
+
+      override def close(errorOrNull: Throwable): Unit = {
+      }
+    }).start()
+  }
+
+  test("stress test for failOnDataLoss=false") {
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribePattern", "failOnDataLoss.*")
+      .option("startingOffsets", "earliest")
+      .option("failOnDataLoss", "false")
+      .option("fetchOffset.retryIntervalMs", "3000")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val query = startStream(kafka.map(kv => kv._2.toInt))
+
+    val testTime = 1.minutes
+    val startTime = System.currentTimeMillis()
+    // Track the current existing topics
+    val topics = mutable.ArrayBuffer[String]()
+    // Track topics that have been deleted
+    val deletedTopics = mutable.Set[String]()
+    while (System.currentTimeMillis() - testTime.toMillis < startTime) {
+      Random.nextInt(10) match {
+        case 0 => // Create a new topic
+          val topic = newTopic()
+          topics += topic
+          // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
+          // chance that a topic will be recreated after deletion due to the asynchronous update.
+          // Hence, always overwrite to handle this race condition.
+          testUtils.createTopic(topic, partitions = 1, overwrite = true)
+          logInfo(s"Create topic $topic")
+        case 1 if topics.nonEmpty => // Delete an existing topic
+          val topic = topics.remove(Random.nextInt(topics.size))
+          testUtils.deleteTopic(topic)
+          logInfo(s"Delete topic $topic")
+          deletedTopics += topic
+        case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted.
+          val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
+          deletedTopics -= topic
+          topics += topic
+          // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
+          // chance that a topic will be recreated after deletion due to the asynchronous update.
+          // Hence, always overwrite to handle this race condition.
+          testUtils.createTopic(topic, partitions = 1, overwrite = true)
+          logInfo(s"Create topic $topic")
+        case 3 =>
+          Thread.sleep(1000)
+        case _ => // Push random messages
+          for (topic <- topics) {
+            val size = Random.nextInt(10)
+            for (_ <- 0 until size) {
+              testUtils.sendMessages(topic, Array(Random.nextInt(10).toString))
+            }
+          }
+      }
+      // `failOnDataLoss` is `false`, we should not fail the query
+      if (query.exception.nonEmpty) {
+        throw query.exception.get
+      }
+    }
+
+    query.stop()
+    // `failOnDataLoss` is `false`, we should not fail the query
+    if (query.exception.nonEmpty) {
+      throw query.exception.get
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org