You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/09/12 18:25:35 UTC

[7/7] spark git commit: [SPARK-24882][SQL] Revert [] improve data source v2 API from branch 2.4

[SPARK-24882][SQL] Revert [] improve data source v2 API from branch 2.4

## What changes were proposed in this pull request?

As discussed in the dev list, we don't want to include https://github.com/apache/spark/pull/22009 in Spark 2.4, as it needs data source v2 users to change the implementation intensitively, while they need to change again in next release.

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #22388 from cloud-fan/revert.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15d2e9d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15d2e9d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15d2e9d7

Branch: refs/heads/branch-2.4
Commit: 15d2e9d7d2f0d5ecefd69bdc3f8a149670b05e79
Parents: 4c1428f
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Sep 12 11:25:24 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Sep 12 11:25:24 2018 -0700

----------------------------------------------------------------------
 .../kafka010/KafkaContinuousReadSupport.scala   | 255 -----------
 .../sql/kafka010/KafkaContinuousReader.scala    | 248 +++++++++++
 .../kafka010/KafkaMicroBatchReadSupport.scala   | 377 ----------------
 .../sql/kafka010/KafkaMicroBatchReader.scala    | 382 ++++++++++++++++
 .../sql/kafka010/KafkaSourceProvider.scala      |  37 +-
 .../spark/sql/kafka010/KafkaStreamWriter.scala  | 118 +++++
 .../kafka010/KafkaStreamingWriteSupport.scala   | 118 -----
 .../kafka010/KafkaContinuousSourceSuite.scala   |  12 +-
 .../sql/kafka010/KafkaContinuousTest.scala      |   8 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  35 +-
 .../sources/v2/BatchReadSupportProvider.java    |  61 ---
 .../sources/v2/BatchWriteSupportProvider.java   |  59 ---
 .../sql/sources/v2/ContinuousReadSupport.java   |  46 ++
 .../v2/ContinuousReadSupportProvider.java       |  70 ---
 .../spark/sql/sources/v2/DataSourceV2.java      |  10 +-
 .../sql/sources/v2/MicroBatchReadSupport.java   |  52 +++
 .../v2/MicroBatchReadSupportProvider.java       |  70 ---
 .../spark/sql/sources/v2/ReadSupport.java       |  65 +++
 .../sql/sources/v2/SessionConfigSupport.java    |   7 +-
 .../sql/sources/v2/StreamWriteSupport.java      |  52 +++
 .../v2/StreamingWriteSupportProvider.java       |  54 ---
 .../spark/sql/sources/v2/WriteSupport.java      |  53 +++
 .../sql/sources/v2/reader/BatchReadSupport.java |  51 ---
 .../v2/reader/ContinuousInputPartition.java     |  35 ++
 .../sql/sources/v2/reader/DataSourceReader.java |  75 ++++
 .../sql/sources/v2/reader/InputPartition.java   |  26 +-
 .../sources/v2/reader/InputPartitionReader.java |  53 +++
 .../sql/sources/v2/reader/PartitionReader.java  |  49 ---
 .../v2/reader/PartitionReaderFactory.java       |  66 ---
 .../sql/sources/v2/reader/ReadSupport.java      |  50 ---
 .../spark/sql/sources/v2/reader/ScanConfig.java |  45 --
 .../sources/v2/reader/ScanConfigBuilder.java    |  30 --
 .../spark/sql/sources/v2/reader/Statistics.java |   2 +-
 .../v2/reader/SupportsPushDownFilters.java      |   6 +-
 .../reader/SupportsPushDownRequiredColumns.java |   8 +-
 .../v2/reader/SupportsReportPartitioning.java   |  12 +-
 .../v2/reader/SupportsReportStatistics.java     |  14 +-
 .../v2/reader/SupportsScanColumnarBatch.java    |  53 +++
 .../partitioning/ClusteredDistribution.java     |   4 +-
 .../v2/reader/partitioning/Distribution.java    |   6 +-
 .../v2/reader/partitioning/Partitioning.java    |   5 +-
 .../ContinuousInputPartitionReader.java         |  36 ++
 .../streaming/ContinuousPartitionReader.java    |  37 --
 .../ContinuousPartitionReaderFactory.java       |  40 --
 .../reader/streaming/ContinuousReadSupport.java |  77 ----
 .../v2/reader/streaming/ContinuousReader.java   |  79 ++++
 .../reader/streaming/MicroBatchReadSupport.java |  60 ---
 .../v2/reader/streaming/MicroBatchReader.java   |  75 ++++
 .../sql/sources/v2/reader/streaming/Offset.java |   4 +-
 .../reader/streaming/StreamingReadSupport.java  |  49 ---
 .../sources/v2/writer/BatchWriteSupport.java    | 101 -----
 .../sql/sources/v2/writer/DataSourceWriter.java | 116 +++++
 .../spark/sql/sources/v2/writer/DataWriter.java |  16 +-
 .../sources/v2/writer/DataWriterFactory.java    |  23 +-
 .../sources/v2/writer/WriterCommitMessage.java  |   9 +-
 .../v2/writer/streaming/StreamWriter.java       |  71 +++
 .../streaming/StreamingDataWriterFactory.java   |  59 ---
 .../writer/streaming/StreamingWriteSupport.java |  71 ---
 .../org/apache/spark/sql/DataFrameReader.scala  |   4 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   8 +-
 .../datasources/v2/DataSourceRDD.scala          |  44 +-
 .../datasources/v2/DataSourceV2Relation.scala   |  72 ++-
 .../datasources/v2/DataSourceV2ScanExec.scala   |  65 +--
 .../datasources/v2/DataSourceV2Strategy.scala   |  49 +--
 .../datasources/v2/DataSourceV2Utils.scala      |   9 -
 .../v2/WriteToDataSourceV2Exec.scala            |  40 +-
 .../streaming/MicroBatchExecution.scala         |  91 ++--
 .../execution/streaming/ProgressReporter.scala  |   8 +-
 .../SimpleStreamingScanConfigBuilder.scala      |  40 --
 .../execution/streaming/StreamingRelation.scala |   6 +-
 .../spark/sql/execution/streaming/console.scala |  14 +-
 .../continuous/ContinuousDataSourceRDD.scala    |  37 +-
 .../continuous/ContinuousExecution.scala        |  51 +--
 .../continuous/ContinuousQueuedDataReader.scala |  29 +-
 .../continuous/ContinuousRateStreamSource.scala |  60 +--
 .../continuous/ContinuousTextSocketSource.scala |  72 ++-
 .../continuous/ContinuousWriteRDD.scala         |   7 +-
 .../streaming/continuous/EpochCoordinator.scala |  18 +-
 .../WriteToContinuousDataSource.scala           |   4 +-
 .../WriteToContinuousDataSourceExec.scala       |  10 +-
 .../spark/sql/execution/streaming/memory.scala  |  51 ++-
 .../streaming/sources/ConsoleWriteSupport.scala |  71 ---
 .../streaming/sources/ConsoleWriter.scala       |  72 +++
 .../sources/ContinuousMemoryStream.scala        |  76 ++--
 .../sources/ForeachWriteSupportProvider.scala   | 140 ------
 .../sources/ForeachWriterProvider.scala         | 139 ++++++
 .../sources/MicroBatchWritSupport.scala         |  51 ---
 .../streaming/sources/MicroBatchWriter.scala    |  37 ++
 .../sources/PackedRowWriterFactory.scala        |   9 +-
 .../RateControlMicroBatchReadSupport.scala      |  31 --
 .../RateStreamMicroBatchReadSupport.scala       | 215 ---------
 .../sources/RateStreamMicroBatchReader.scala    | 220 ++++++++++
 .../streaming/sources/RateStreamProvider.scala  |  27 +-
 .../execution/streaming/sources/memoryV2.scala  |  35 +-
 .../execution/streaming/sources/socket.scala    | 114 ++---
 .../spark/sql/streaming/DataStreamReader.scala  |  52 +--
 .../spark/sql/streaming/DataStreamWriter.scala  |   9 +-
 .../sql/streaming/StreamingQueryManager.scala   |   4 +-
 .../sources/v2/JavaAdvancedDataSourceV2.java    | 147 +++----
 .../sql/sources/v2/JavaBatchDataSourceV2.java   | 114 +++++
 .../sources/v2/JavaColumnarDataSourceV2.java    | 114 -----
 .../v2/JavaPartitionAwareDataSource.java        |  81 ++--
 .../v2/JavaSchemaRequiredDataSource.java        |  26 +-
 .../sql/sources/v2/JavaSimpleDataSourceV2.java  |  68 ++-
 .../sql/sources/v2/JavaSimpleReadSupport.java   |  99 -----
 ....apache.spark.sql.sources.DataSourceRegister |   4 +-
 .../execution/streaming/MemorySinkV2Suite.scala |   2 +-
 .../sources/ConsoleWriteSupportSuite.scala      | 151 -------
 .../streaming/sources/ConsoleWriterSuite.scala  | 153 +++++++
 .../sources/RateStreamProviderSuite.scala       |  84 ++--
 .../sources/TextSocketStreamSuite.scala         |  81 ++--
 .../sql/sources/v2/DataSourceV2Suite.scala      | 438 +++++++++----------
 .../sources/v2/SimpleWritableDataSource.scala   | 110 +++--
 .../apache/spark/sql/streaming/StreamTest.scala |   2 +-
 .../streaming/StreamingQueryListenerSuite.scala |   4 +-
 .../sql/streaming/StreamingQuerySuite.scala     |  58 ++-
 .../ContinuousQueuedDataReaderSuite.scala       |  45 +-
 .../streaming/continuous/ContinuousSuite.scala  |   2 +-
 .../continuous/EpochCoordinatorSuite.scala      |  18 +-
 .../sources/StreamingDataSourceV2Suite.scala    |  95 ++--
 120 files changed, 3619 insertions(+), 4070 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
deleted file mode 100644
index 1753a28..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.{util => ju}
-import java.util.concurrent.TimeoutException
-
-import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
-import org.apache.kafka.common.TopicPartition
-
-import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
-
-/**
- * A [[ContinuousReadSupport]] for data from kafka.
- *
- * @param offsetReader  a reader used to get kafka offsets. Note that the actual data will be
- *                      read by per-task consumers generated later.
- * @param kafkaParams   String params for per-task Kafka consumers.
- * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
- *                      are not Kafka consumer params.
- * @param metadataPath Path to a directory this reader can use for writing metadata.
- * @param initialOffsets The Kafka offsets to start reading data at.
- * @param failOnDataLoss Flag indicating whether reading should fail in data loss
- *                       scenarios, where some offsets after the specified initial ones can't be
- *                       properly read.
- */
-class KafkaContinuousReadSupport(
-    offsetReader: KafkaOffsetReader,
-    kafkaParams: ju.Map[String, Object],
-    sourceOptions: Map[String, String],
-    metadataPath: String,
-    initialOffsets: KafkaOffsetRangeLimit,
-    failOnDataLoss: Boolean)
-  extends ContinuousReadSupport with Logging {
-
-  private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
-
-  override def initialOffset(): Offset = {
-    val offsets = initialOffsets match {
-      case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-      case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
-      case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
-    }
-    logInfo(s"Initial offsets: $offsets")
-    offsets
-  }
-
-  override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
-  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
-    new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss)
-  }
-
-  override def deserializeOffset(json: String): Offset = {
-    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
-  }
-
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
-    val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets
-    startOffsets.toSeq.map {
-      case (topicPartition, start) =>
-        KafkaContinuousInputPartition(
-          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
-    }.toArray
-  }
-
-  override def createContinuousReaderFactory(
-      config: ScanConfig): ContinuousPartitionReaderFactory = {
-    KafkaContinuousReaderFactory
-  }
-
-  /** Stop this source and free any resources it has allocated. */
-  def stop(): Unit = synchronized {
-    offsetReader.close()
-  }
-
-  override def commit(end: Offset): Unit = {}
-
-  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
-    val mergedMap = offsets.map {
-      case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
-    }.reduce(_ ++ _)
-    KafkaSourceOffset(mergedMap)
-  }
-
-  override def needsReconfiguration(config: ScanConfig): Boolean = {
-    val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
-    offsetReader.fetchLatestOffsets().keySet != knownPartitions
-  }
-
-  override def toString(): String = s"KafkaSource[$offsetReader]"
-
-  /**
-   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
-   * Otherwise, just log a warning.
-   */
-  private def reportDataLoss(message: String): Unit = {
-    if (failOnDataLoss) {
-      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
-    } else {
-      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
-    }
-  }
-}
-
-/**
- * An input partition for continuous Kafka processing. This will be serialized and transformed
- * into a full reader on executors.
- *
- * @param topicPartition The (topic, partition) pair this task is responsible for.
- * @param startOffset The offset to start reading from within the partition.
- * @param kafkaParams Kafka consumer params to use.
- * @param pollTimeoutMs The timeout for Kafka consumer polling.
- * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
- *                       are skipped.
- */
-case class KafkaContinuousInputPartition(
-    topicPartition: TopicPartition,
-    startOffset: Long,
-    kafkaParams: ju.Map[String, Object],
-    pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends InputPartition
-
-object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory {
-  override def createReader(partition: InputPartition): ContinuousPartitionReader[InternalRow] = {
-    val p = partition.asInstanceOf[KafkaContinuousInputPartition]
-    new KafkaContinuousPartitionReader(
-      p.topicPartition, p.startOffset, p.kafkaParams, p.pollTimeoutMs, p.failOnDataLoss)
-  }
-}
-
-class KafkaContinuousScanConfigBuilder(
-    schema: StructType,
-    startOffset: Offset,
-    offsetReader: KafkaOffsetReader,
-    reportDataLoss: String => Unit)
-  extends ScanConfigBuilder {
-
-  override def build(): ScanConfig = {
-    val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset)
-
-    val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
-    val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
-    val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-
-    val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
-    if (deletedPartitions.nonEmpty) {
-      reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
-    }
-
-    val startOffsets = newPartitionOffsets ++
-      oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
-    KafkaContinuousScanConfig(schema, startOffsets)
-  }
-}
-
-case class KafkaContinuousScanConfig(
-    readSchema: StructType,
-    startOffsets: Map[TopicPartition, Long])
-  extends ScanConfig {
-
-  // Created when building the scan config builder. If this diverges from the partitions at the
-  // latest offsets, we need to reconfigure the kafka read support.
-  def knownPartitions: Set[TopicPartition] = startOffsets.keySet
-}
-
-/**
- * A per-task data reader for continuous Kafka processing.
- *
- * @param topicPartition The (topic, partition) pair this data reader is responsible for.
- * @param startOffset The offset to start reading from within the partition.
- * @param kafkaParams Kafka consumer params to use.
- * @param pollTimeoutMs The timeout for Kafka consumer polling.
- * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
- *                       are skipped.
- */
-class KafkaContinuousPartitionReader(
-    topicPartition: TopicPartition,
-    startOffset: Long,
-    kafkaParams: ju.Map[String, Object],
-    pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ContinuousPartitionReader[InternalRow] {
-  private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
-  private val converter = new KafkaRecordToUnsafeRowConverter
-
-  private var nextKafkaOffset = startOffset
-  private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
-
-  override def next(): Boolean = {
-    var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
-    while (r == null) {
-      if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false
-      // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving
-      // interrupt points to end the query rather than waiting for new data that might never come.
-      try {
-        r = consumer.get(
-          nextKafkaOffset,
-          untilOffset = Long.MaxValue,
-          pollTimeoutMs,
-          failOnDataLoss)
-      } catch {
-        // We didn't read within the timeout. We're supposed to block indefinitely for new data, so
-        // swallow and ignore this.
-        case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException =>
-
-        // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
-        // or if it's the endpoint of the data range (i.e. the "true" next offset).
-        case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
-          val range = consumer.getAvailableOffsetRange()
-          if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
-            // retry
-          } else {
-            throw e
-          }
-      }
-    }
-    nextKafkaOffset = r.offset + 1
-    currentRecord = r
-    true
-  }
-
-  override def get(): UnsafeRow = {
-    converter.toUnsafeRow(currentRecord)
-  }
-
-  override def getOffset(): KafkaSourcePartitionOffset = {
-    KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
-  }
-
-  override def close(): Unit = {
-    consumer.release()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
new file mode 100644
index 0000000..8ce56a2
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeoutException
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[ContinuousReader]] for data from kafka.
+ *
+ * @param offsetReader  a reader used to get kafka offsets. Note that the actual data will be
+ *                      read by per-task consumers generated later.
+ * @param kafkaParams   String params for per-task Kafka consumers.
+ * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
+ *                      are not Kafka consumer params.
+ * @param metadataPath Path to a directory this reader can use for writing metadata.
+ * @param initialOffsets The Kafka offsets to start reading data at.
+ * @param failOnDataLoss Flag indicating whether reading should fail in data loss
+ *                       scenarios, where some offsets after the specified initial ones can't be
+ *                       properly read.
+ */
+class KafkaContinuousReader(
+    offsetReader: KafkaOffsetReader,
+    kafkaParams: ju.Map[String, Object],
+    sourceOptions: Map[String, String],
+    metadataPath: String,
+    initialOffsets: KafkaOffsetRangeLimit,
+    failOnDataLoss: Boolean)
+  extends ContinuousReader with Logging {
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+
+  // Initialized when creating reader factories. If this diverges from the partitions at the latest
+  // offsets, we need to reconfigure.
+  // Exposed outside this object only for unit tests.
+  @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
+
+  override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
+
+  private var offset: Offset = _
+  override def setStartOffset(start: ju.Optional[Offset]): Unit = {
+    offset = start.orElse {
+      val offsets = initialOffsets match {
+        case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
+        case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+        case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
+      }
+      logInfo(s"Initial offsets: $offsets")
+      offsets
+    }
+  }
+
+  override def getStartOffset(): Offset = offset
+
+  override def deserializeOffset(json: String): Offset = {
+    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+  }
+
+  override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
+    import scala.collection.JavaConverters._
+
+    val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
+
+    val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
+    val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
+    val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+
+    val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
+    if (deletedPartitions.nonEmpty) {
+      reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
+    }
+
+    val startOffsets = newPartitionOffsets ++
+      oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
+    knownPartitions = startOffsets.keySet
+
+    startOffsets.toSeq.map {
+      case (topicPartition, start) =>
+        KafkaContinuousInputPartition(
+          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss
+        ): InputPartition[InternalRow]
+    }.asJava
+  }
+
+  /** Stop this source and free any resources it has allocated. */
+  def stop(): Unit = synchronized {
+    offsetReader.close()
+  }
+
+  override def commit(end: Offset): Unit = {}
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+    val mergedMap = offsets.map {
+      case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+    }.reduce(_ ++ _)
+    KafkaSourceOffset(mergedMap)
+  }
+
+  override def needsReconfiguration(): Boolean = {
+    knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions
+  }
+
+  override def toString(): String = s"KafkaSource[$offsetReader]"
+
+  /**
+   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * Otherwise, just log a warning.
+   */
+  private def reportDataLoss(message: String): Unit = {
+    if (failOnDataLoss) {
+      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+    } else {
+      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+    }
+  }
+}
+
+/**
+ * An input partition for continuous Kafka processing. This will be serialized and transformed
+ * into a full reader on executors.
+ *
+ * @param topicPartition The (topic, partition) pair this task is responsible for.
+ * @param startOffset The offset to start reading from within the partition.
+ * @param kafkaParams Kafka consumer params to use.
+ * @param pollTimeoutMs The timeout for Kafka consumer polling.
+ * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
+ *                       are skipped.
+ */
+case class KafkaContinuousInputPartition(
+    topicPartition: TopicPartition,
+    startOffset: Long,
+    kafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow] {
+
+  override def createContinuousReader(
+      offset: PartitionOffset): InputPartitionReader[InternalRow] = {
+    val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
+    require(kafkaOffset.topicPartition == topicPartition,
+      s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
+    new KafkaContinuousInputPartitionReader(
+      topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
+  }
+
+  override def createPartitionReader(): KafkaContinuousInputPartitionReader = {
+    new KafkaContinuousInputPartitionReader(
+      topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
+  }
+}
+
+/**
+ * A per-task data reader for continuous Kafka processing.
+ *
+ * @param topicPartition The (topic, partition) pair this data reader is responsible for.
+ * @param startOffset The offset to start reading from within the partition.
+ * @param kafkaParams Kafka consumer params to use.
+ * @param pollTimeoutMs The timeout for Kafka consumer polling.
+ * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
+ *                       are skipped.
+ */
+class KafkaContinuousInputPartitionReader(
+    topicPartition: TopicPartition,
+    startOffset: Long,
+    kafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[InternalRow] {
+  private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
+  private val converter = new KafkaRecordToUnsafeRowConverter
+
+  private var nextKafkaOffset = startOffset
+  private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
+
+  override def next(): Boolean = {
+    var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
+    while (r == null) {
+      if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false
+      // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving
+      // interrupt points to end the query rather than waiting for new data that might never come.
+      try {
+        r = consumer.get(
+          nextKafkaOffset,
+          untilOffset = Long.MaxValue,
+          pollTimeoutMs,
+          failOnDataLoss)
+      } catch {
+        // We didn't read within the timeout. We're supposed to block indefinitely for new data, so
+        // swallow and ignore this.
+        case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException =>
+
+        // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
+        // or if it's the endpoint of the data range (i.e. the "true" next offset).
+        case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
+          val range = consumer.getAvailableOffsetRange()
+          if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
+            // retry
+          } else {
+            throw e
+          }
+      }
+    }
+    nextKafkaOffset = r.offset + 1
+    currentRecord = r
+    true
+  }
+
+  override def get(): UnsafeRow = {
+    converter.toUnsafeRow(currentRecord)
+  }
+
+  override def getOffset(): KafkaSourcePartitionOffset = {
+    KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
+  }
+
+  override def close(): Unit = {
+    consumer.release()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
deleted file mode 100644
index bb4de67..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.{util => ju}
-import java.io._
-import java.nio.charset.StandardCharsets
-
-import org.apache.commons.io.IOUtils
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.ExecutorCacheTaskLocation
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
-import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
-import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.UninterruptibleThread
-
-/**
- * A [[MicroBatchReadSupport]] that reads data from Kafka.
- *
- * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
- * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
- * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
- * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
- * with the semantics of `KafkaConsumer.position()`.
- *
- * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
- * must make sure all messages in a topic have been processed when deleting a topic.
- *
- * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
- * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
- * and not use wrong broker addresses.
- */
-private[kafka010] class KafkaMicroBatchReadSupport(
-    kafkaOffsetReader: KafkaOffsetReader,
-    executorKafkaParams: ju.Map[String, Object],
-    options: DataSourceOptions,
-    metadataPath: String,
-    startingOffsets: KafkaOffsetRangeLimit,
-    failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging {
-
-  private val pollTimeoutMs = options.getLong(
-    "kafkaConsumer.pollTimeoutMs",
-    SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
-
-  private val maxOffsetsPerTrigger =
-    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
-
-  private val rangeCalculator = KafkaOffsetRangeCalculator(options)
-
-  private var endPartitionOffsets: KafkaSourceOffset = _
-
-  /**
-   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
-   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
-   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
-   */
-  override def initialOffset(): Offset = {
-    KafkaSourceOffset(getOrCreateInitialPartitionOffsets())
-  }
-
-  override def latestOffset(start: Offset): Offset = {
-    val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-    val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
-    endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
-      rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
-    }.getOrElse {
-      latestPartitionOffsets
-    })
-    endPartitionOffsets
-  }
-
-  override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
-  override def newScanConfigBuilder(start: Offset, end: Offset): ScanConfigBuilder = {
-    new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
-  }
-
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
-    val sc = config.asInstanceOf[SimpleStreamingScanConfig]
-    val startPartitionOffsets = sc.start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-    val endPartitionOffsets = sc.end.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-
-    // Find the new partitions, and get their earliest offsets
-    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
-    val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-    if (newPartitionInitialOffsets.keySet != newPartitions) {
-      // We cannot get from offsets for some partitions. It means they got deleted.
-      val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
-      reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
-    }
-    logInfo(s"Partitions added: $newPartitionInitialOffsets")
-    newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
-      reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
-    }
-
-    // Find deleted partitions, and report data loss if required
-    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
-    if (deletedPartitions.nonEmpty) {
-      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
-    }
-
-    // Use the end partitions to calculate offset ranges to ignore partitions that have
-    // been deleted
-    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
-      // Ignore partitions that we don't know the from offsets.
-      newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp)
-    }.toSeq
-    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
-
-    // Calculate offset ranges
-    val offsetRanges = rangeCalculator.getRanges(
-      fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
-      untilOffsets = endPartitionOffsets,
-      executorLocations = getSortedExecutorList())
-
-    // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
-    // that is, concurrent tasks will not read the same TopicPartitions.
-    val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size
-
-    // Generate factories based on the offset ranges
-    offsetRanges.map { range =>
-      KafkaMicroBatchInputPartition(
-        range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
-    }.toArray
-  }
-
-  override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = {
-    KafkaMicroBatchReaderFactory
-  }
-
-  override def deserializeOffset(json: String): Offset = {
-    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
-  }
-
-  override def commit(end: Offset): Unit = {}
-
-  override def stop(): Unit = {
-    kafkaOffsetReader.close()
-  }
-
-  override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
-
-  /**
-   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
-   * the checkpoint.
-   */
-  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
-    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
-    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
-    // (KAFKA-1894).
-    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
-
-    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
-    assert(SparkSession.getActiveSession.nonEmpty)
-
-    val metadataLog =
-      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
-    metadataLog.get(0).getOrElse {
-      val offsets = startingOffsets match {
-        case EarliestOffsetRangeLimit =>
-          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
-        case LatestOffsetRangeLimit =>
-          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
-        case SpecificOffsetRangeLimit(p) =>
-          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
-      }
-      metadataLog.add(0, offsets)
-      logInfo(s"Initial offsets: $offsets")
-      offsets
-    }.partitionToOffsets
-  }
-
-  /** Proportionally distribute limit number of offsets among topicpartitions */
-  private def rateLimit(
-      limit: Long,
-      from: PartitionOffsetMap,
-      until: PartitionOffsetMap): PartitionOffsetMap = {
-    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
-    val sizes = until.flatMap {
-      case (tp, end) =>
-        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
-        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
-          val size = end - begin
-          logDebug(s"rateLimit $tp size is $size")
-          if (size > 0) Some(tp -> size) else None
-        }
-    }
-    val total = sizes.values.sum.toDouble
-    if (total < 1) {
-      until
-    } else {
-      until.map {
-        case (tp, end) =>
-          tp -> sizes.get(tp).map { size =>
-            val begin = from.get(tp).getOrElse(fromNew(tp))
-            val prorate = limit * (size / total)
-            // Don't completely starve small topicpartitions
-            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
-            // Paranoia, make sure not to return an offset that's past end
-            Math.min(end, off)
-          }.getOrElse(end)
-      }
-    }
-  }
-
-  private def getSortedExecutorList(): Array[String] = {
-
-    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
-      if (a.host == b.host) {
-        a.executorId > b.executorId
-      } else {
-        a.host > b.host
-      }
-    }
-
-    val bm = SparkEnv.get.blockManager
-    bm.master.getPeers(bm.blockManagerId).toArray
-      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
-      .sortWith(compare)
-      .map(_.toString)
-  }
-
-  /**
-   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
-   * Otherwise, just log a warning.
-   */
-  private def reportDataLoss(message: String): Unit = {
-    if (failOnDataLoss) {
-      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
-    } else {
-      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
-    }
-  }
-
-  /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */
-  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
-    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
-
-    val VERSION = 1
-
-    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
-      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
-      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
-      writer.write("v" + VERSION + "\n")
-      writer.write(metadata.json)
-      writer.flush
-    }
-
-    override def deserialize(in: InputStream): KafkaSourceOffset = {
-      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
-      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
-      // HDFSMetadataLog guarantees that it never creates a partial file.
-      assert(content.length != 0)
-      if (content(0) == 'v') {
-        val indexOfNewLine = content.indexOf("\n")
-        if (indexOfNewLine > 0) {
-          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
-          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
-        } else {
-          throw new IllegalStateException(
-            s"Log file was malformed: failed to detect the log file version line.")
-        }
-      } else {
-        // The log was generated by Spark 2.1.0
-        KafkaSourceOffset(SerializedOffset(content))
-      }
-    }
-  }
-}
-
-/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
-private[kafka010] case class KafkaMicroBatchInputPartition(
-    offsetRange: KafkaOffsetRange,
-    executorKafkaParams: ju.Map[String, Object],
-    pollTimeoutMs: Long,
-    failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends InputPartition
-
-private[kafka010] object KafkaMicroBatchReaderFactory extends PartitionReaderFactory {
-  override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
-    val p = partition.asInstanceOf[KafkaMicroBatchInputPartition]
-    KafkaMicroBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs,
-      p.failOnDataLoss, p.reuseKafkaConsumer)
-  }
-}
-
-/** A [[PartitionReader]] for reading Kafka data in a micro-batch streaming query. */
-private[kafka010] case class KafkaMicroBatchPartitionReader(
-    offsetRange: KafkaOffsetRange,
-    executorKafkaParams: ju.Map[String, Object],
-    pollTimeoutMs: Long,
-    failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends PartitionReader[InternalRow] with Logging {
-
-  private val consumer = KafkaDataConsumer.acquire(
-    offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
-
-  private val rangeToRead = resolveRange(offsetRange)
-  private val converter = new KafkaRecordToUnsafeRowConverter
-
-  private var nextOffset = rangeToRead.fromOffset
-  private var nextRow: UnsafeRow = _
-
-  override def next(): Boolean = {
-    if (nextOffset < rangeToRead.untilOffset) {
-      val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
-      if (record != null) {
-        nextRow = converter.toUnsafeRow(record)
-        nextOffset = record.offset + 1
-        true
-      } else {
-        false
-      }
-    } else {
-      false
-    }
-  }
-
-  override def get(): UnsafeRow = {
-    assert(nextRow != null)
-    nextRow
-  }
-
-  override def close(): Unit = {
-    consumer.release()
-  }
-
-  private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
-    if (range.fromOffset < 0 || range.untilOffset < 0) {
-      // Late bind the offset range
-      val availableOffsetRange = consumer.getAvailableOffsetRange()
-      val fromOffset = if (range.fromOffset < 0) {
-        assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
-          s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
-        availableOffsetRange.earliest
-      } else {
-        range.fromOffset
-      }
-      val untilOffset = if (range.untilOffset < 0) {
-        assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
-          s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
-        availableOffsetRange.latest
-      } else {
-        range.untilOffset
-      }
-      KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None)
-    } else {
-      range
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
new file mode 100644
index 0000000..8cc989f
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+    kafkaOffsetReader: KafkaOffsetReader,
+    executorKafkaParams: ju.Map[String, Object],
+    options: DataSourceOptions,
+    metadataPath: String,
+    startingOffsets: KafkaOffsetRangeLimit,
+    failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+    "kafkaConsumer.pollTimeoutMs",
+    SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
+
+  private val maxOffsetsPerTrigger =
+    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  private val rangeCalculator = KafkaOffsetRangeCalculator(options)
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+    // Make sure initialPartitionOffsets is initialized
+    initialPartitionOffsets
+
+    startPartitionOffsets = Option(start.orElse(null))
+        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+        .getOrElse(initialPartitionOffsets)
+
+    endPartitionOffsets = Option(end.orElse(null))
+        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+        .getOrElse {
+          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+          maxOffsetsPerTrigger.map { maxOffsets =>
+            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+          }.getOrElse {
+            latestPartitionOffsets
+          }
+        }
+  }
+
+  override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
+    // Find the new partitions, and get their earliest offsets
+    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+    val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+    if (newPartitionInitialOffsets.keySet != newPartitions) {
+      // We cannot get from offsets for some partitions. It means they got deleted.
+      val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
+      reportDataLoss(
+        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+    }
+    logInfo(s"Partitions added: $newPartitionInitialOffsets")
+    newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+      reportDataLoss(
+        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+    }
+
+    // Find deleted partitions, and report data loss if required
+    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+    if (deletedPartitions.nonEmpty) {
+      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+    }
+
+    // Use the end partitions to calculate offset ranges to ignore partitions that have
+    // been deleted
+    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+      // Ignore partitions that we don't know the from offsets.
+      newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+    }.toSeq
+    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+    // Calculate offset ranges
+    val offsetRanges = rangeCalculator.getRanges(
+      fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
+      untilOffsets = endPartitionOffsets,
+      executorLocations = getSortedExecutorList())
+
+    // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
+    // that is, concurrent tasks will not read the same TopicPartitions.
+    val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size
+
+    // Generate factories based on the offset ranges
+    offsetRanges.map { range =>
+      new KafkaMicroBatchInputPartition(
+        range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer
+      ): InputPartition[InternalRow]
+    }.asJava
+  }
+
+  override def getStartOffset: Offset = {
+    KafkaSourceOffset(startPartitionOffsets)
+  }
+
+  override def getEndOffset: Offset = {
+    KafkaSourceOffset(endPartitionOffsets)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+  }
+
+  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+  override def commit(end: Offset): Unit = {}
+
+  override def stop(): Unit = {
+    kafkaOffsetReader.close()
+  }
+
+  override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
+
+  /**
+   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+   * the checkpoint.
+   */
+  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+    // (KAFKA-1894).
+    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+    assert(SparkSession.getActiveSession.nonEmpty)
+
+    val metadataLog =
+      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+    metadataLog.get(0).getOrElse {
+      val offsets = startingOffsets match {
+        case EarliestOffsetRangeLimit =>
+          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+        case LatestOffsetRangeLimit =>
+          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+        case SpecificOffsetRangeLimit(p) =>
+          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+      }
+      metadataLog.add(0, offsets)
+      logInfo(s"Initial offsets: $offsets")
+      offsets
+    }.partitionToOffsets
+  }
+
+  /** Proportionally distribute limit number of offsets among topicpartitions */
+  private def rateLimit(
+      limit: Long,
+      from: PartitionOffsetMap,
+      until: PartitionOffsetMap): PartitionOffsetMap = {
+    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+    val sizes = until.flatMap {
+      case (tp, end) =>
+        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+          val size = end - begin
+          logDebug(s"rateLimit $tp size is $size")
+          if (size > 0) Some(tp -> size) else None
+        }
+    }
+    val total = sizes.values.sum.toDouble
+    if (total < 1) {
+      until
+    } else {
+      until.map {
+        case (tp, end) =>
+          tp -> sizes.get(tp).map { size =>
+            val begin = from.get(tp).getOrElse(fromNew(tp))
+            val prorate = limit * (size / total)
+            // Don't completely starve small topicpartitions
+            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+            // Paranoia, make sure not to return an offset that's past end
+            Math.min(end, off)
+          }.getOrElse(end)
+      }
+    }
+  }
+
+  private def getSortedExecutorList(): Array[String] = {
+
+    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+      if (a.host == b.host) {
+        a.executorId > b.executorId
+      } else {
+        a.host > b.host
+      }
+    }
+
+    val bm = SparkEnv.get.blockManager
+    bm.master.getPeers(bm.blockManagerId).toArray
+      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+      .sortWith(compare)
+      .map(_.toString)
+  }
+
+  /**
+   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * Otherwise, just log a warning.
+   */
+  private def reportDataLoss(message: String): Unit = {
+    if (failOnDataLoss) {
+      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+    } else {
+      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+    }
+  }
+
+  /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */
+  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+    val VERSION = 1
+
+    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+      writer.write("v" + VERSION + "\n")
+      writer.write(metadata.json)
+      writer.flush
+    }
+
+    override def deserialize(in: InputStream): KafkaSourceOffset = {
+      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+      // HDFSMetadataLog guarantees that it never creates a partial file.
+      assert(content.length != 0)
+      if (content(0) == 'v') {
+        val indexOfNewLine = content.indexOf("\n")
+        if (indexOfNewLine > 0) {
+          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+        } else {
+          throw new IllegalStateException(
+            s"Log file was malformed: failed to detect the log file version line.")
+        }
+      } else {
+        // The log was generated by Spark 2.1.0
+        KafkaSourceOffset(SerializedOffset(content))
+      }
+    }
+  }
+}
+
+/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] case class KafkaMicroBatchInputPartition(
+    offsetRange: KafkaOffsetRange,
+    executorKafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean,
+    reuseKafkaConsumer: Boolean) extends InputPartition[InternalRow] {
+
+  override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray
+
+  override def createPartitionReader(): InputPartitionReader[InternalRow] =
+    new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs,
+      failOnDataLoss, reuseKafkaConsumer)
+}
+
+/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] case class KafkaMicroBatchInputPartitionReader(
+    offsetRange: KafkaOffsetRange,
+    executorKafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean,
+    reuseKafkaConsumer: Boolean) extends InputPartitionReader[InternalRow] with Logging {
+
+  private val consumer = KafkaDataConsumer.acquire(
+    offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
+
+  private val rangeToRead = resolveRange(offsetRange)
+  private val converter = new KafkaRecordToUnsafeRowConverter
+
+  private var nextOffset = rangeToRead.fromOffset
+  private var nextRow: UnsafeRow = _
+
+  override def next(): Boolean = {
+    if (nextOffset < rangeToRead.untilOffset) {
+      val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+      if (record != null) {
+        nextRow = converter.toUnsafeRow(record)
+        nextOffset = record.offset + 1
+        true
+      } else {
+        false
+      }
+    } else {
+      false
+    }
+  }
+
+  override def get(): UnsafeRow = {
+    assert(nextRow != null)
+    nextRow
+  }
+
+  override def close(): Unit = {
+    consumer.release()
+  }
+
+  private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
+    if (range.fromOffset < 0 || range.untilOffset < 0) {
+      // Late bind the offset range
+      val availableOffsetRange = consumer.getAvailableOffsetRange()
+      val fromOffset = if (range.fromOffset < 0) {
+        assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
+          s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
+        availableOffsetRange.earliest
+      } else {
+        range.fromOffset
+      }
+      val untilOffset = if (range.untilOffset < 0) {
+        assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
+          s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
+        availableOffsetRange.latest
+      } else {
+        range.untilOffset
+      }
+      KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None)
+    } else {
+      range
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 28c9853..d225c1e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,8 +30,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -45,9 +46,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     with StreamSinkProvider
     with RelationProvider
     with CreatableRelationProvider
-    with StreamingWriteSupportProvider
-    with ContinuousReadSupportProvider
-    with MicroBatchReadSupportProvider
+    with StreamWriteSupport
+    with ContinuousReadSupport
+    with MicroBatchReadSupport
     with Logging {
   import KafkaSourceProvider._
 
@@ -107,12 +108,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   }
 
   /**
-   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read
-   * batches of Kafka data in a micro-batch streaming query.
+   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches
+   * of Kafka data in a micro-batch streaming query.
    */
-  override def createMicroBatchReadSupport(
+  override def createMicroBatchReader(
+      schema: Optional[StructType],
       metadataPath: String,
-      options: DataSourceOptions): KafkaMicroBatchReadSupport = {
+      options: DataSourceOptions): KafkaMicroBatchReader = {
 
     val parameters = options.asMap().asScala.toMap
     validateStreamOptions(parameters)
@@ -138,7 +140,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
       parameters,
       driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-    new KafkaMicroBatchReadSupport(
+    new KafkaMicroBatchReader(
       kafkaOffsetReader,
       kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
       options,
@@ -148,12 +150,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   }
 
   /**
-   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
+   * Creates a [[ContinuousInputPartitionReader]] to read
    * Kafka data in a continuous streaming query.
    */
-  override def createContinuousReadSupport(
+  override def createContinuousReader(
+      schema: Optional[StructType],
       metadataPath: String,
-      options: DataSourceOptions): KafkaContinuousReadSupport = {
+      options: DataSourceOptions): KafkaContinuousReader = {
     val parameters = options.asMap().asScala.toMap
     validateStreamOptions(parameters)
     // Each running query should use its own group id. Otherwise, the query may be only assigned
@@ -178,7 +181,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
       parameters,
       driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-    new KafkaContinuousReadSupport(
+    new KafkaContinuousReader(
       kafkaOffsetReader,
       kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
       parameters,
@@ -267,11 +270,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     }
   }
 
-  override def createStreamingWriteSupport(
+  override def createStreamWriter(
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = {
+      options: DataSourceOptions): StreamWriter = {
     import scala.collection.JavaConverters._
 
     val spark = SparkSession.getActiveSession.get
@@ -282,7 +285,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     KafkaWriter.validateQuery(
       schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic)
 
-    new KafkaStreamingWriteSupport(topic, producerParams, schema)
+    new KafkaStreamWriter(topic, producerParams, schema)
   }
 
   private def strategy(caseInsensitiveParams: Map[String, String]) =

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
new file mode 100644
index 0000000..97c577d
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
+ * don't need to really send one.
+ */
+case object KafkaWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[StreamWriter]] for Kafka writing. Responsible for generating the writer factory.
+ *
+ * @param topic The topic this writer is responsible for. If None, topic will be inferred from
+ *              a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+class KafkaStreamWriter(
+    topic: Option[String], producerParams: Map[String, String], schema: StructType)
+  extends StreamWriter {
+
+  validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
+
+  override def createWriterFactory(): KafkaStreamWriterFactory =
+    KafkaStreamWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+/**
+ * A [[StreamingDataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to
+ * generate the per-task data writers.
+ * @param topic The topic that should be written to. If None, topic will be inferred from
+ *              a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+case class KafkaStreamWriterFactory(
+    topic: Option[String], producerParams: Map[String, String], schema: StructType)
+  extends DataWriterFactory[InternalRow] {
+
+  override def createDataWriter(
+      partitionId: Int,
+      taskId: Long,
+      epochId: Long): DataWriter[InternalRow] = {
+    new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
+ * process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
+ *                    from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+class KafkaStreamDataWriter(
+    targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
+  import scala.collection.JavaConverters._
+
+  private lazy val producer = CachedKafkaProducer.getOrCreate(
+    new java.util.HashMap[String, Object](producerParams.asJava))
+
+  def write(row: InternalRow): Unit = {
+    checkForErrors()
+    sendRow(row, producer)
+  }
+
+  def commit(): WriterCommitMessage = {
+    // Send is asynchronous, but we can't commit until all rows are actually in Kafka.
+    // This requires flushing and then checking that no callbacks produced errors.
+    // We also check for errors before to fail as soon as possible - the check is cheap.
+    checkForErrors()
+    producer.flush()
+    checkForErrors()
+    KafkaWriterCommitMessage
+  }
+
+  def abort(): Unit = {}
+
+  def close(): Unit = {
+    checkForErrors()
+    if (producer != null) {
+      producer.flush()
+      checkForErrors()
+      CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
deleted file mode 100644
index 927c56d..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
-import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
-import org.apache.spark.sql.types.StructType
-
-/**
- * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
- * don't need to really send one.
- */
-case object KafkaWriterCommitMessage extends WriterCommitMessage
-
-/**
- * A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory.
- *
- * @param topic The topic this writer is responsible for. If None, topic will be inferred from
- *              a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-class KafkaStreamingWriteSupport(
-    topic: Option[String], producerParams: Map[String, String], schema: StructType)
-  extends StreamingWriteSupport {
-
-  validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
-
-  override def createStreamingWriterFactory(): KafkaStreamWriterFactory =
-    KafkaStreamWriterFactory(topic, producerParams, schema)
-
-  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
-  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
-}
-
-/**
- * A [[StreamingDataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to
- * generate the per-task data writers.
- * @param topic The topic that should be written to. If None, topic will be inferred from
- *              a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-case class KafkaStreamWriterFactory(
-    topic: Option[String], producerParams: Map[String, String], schema: StructType)
-  extends StreamingDataWriterFactory {
-
-  override def createWriter(
-      partitionId: Int,
-      taskId: Long,
-      epochId: Long): DataWriter[InternalRow] = {
-    new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
-  }
-}
-
-/**
- * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
- * process incoming rows.
- *
- * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
- *                    from a `topic` field in the incoming data.
- * @param producerParams Parameters to use for the Kafka producer.
- * @param inputSchema The attributes in the input data.
- */
-class KafkaStreamDataWriter(
-    targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
-  extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
-  import scala.collection.JavaConverters._
-
-  private lazy val producer = CachedKafkaProducer.getOrCreate(
-    new java.util.HashMap[String, Object](producerParams.asJava))
-
-  def write(row: InternalRow): Unit = {
-    checkForErrors()
-    sendRow(row, producer)
-  }
-
-  def commit(): WriterCommitMessage = {
-    // Send is asynchronous, but we can't commit until all rows are actually in Kafka.
-    // This requires flushing and then checking that no callbacks produced errors.
-    // We also check for errors before to fail as soon as possible - the check is cheap.
-    checkForErrors()
-    producer.flush()
-    checkForErrors()
-    KafkaWriterCommitMessage
-  }
-
-  def abort(): Unit = {}
-
-  def close(): Unit = {
-    checkForErrors()
-    if (producer != null) {
-      producer.flush()
-      checkForErrors()
-      CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index af51021..a0e5818 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
 import org.apache.kafka.clients.producer.ProducerRecord
 
 import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.streaming.Trigger
 
@@ -207,13 +207,11 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
         testUtils.createTopic(topic2, partitions = 5)
         eventually(timeout(streamingTimeout)) {
           assert(
-            query.lastExecution.executedPlan.collectFirst {
-              case scan: DataSourceV2ScanExec
-                if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
-                scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
-            }.exists { config =>
+            query.lastExecution.logical.collectFirst {
+              case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
+            }.exists { r =>
               // Ensure the new topic is present and the old topic is gone.
-              config.knownPartitions.exists(_.topic == topic2)
+              r.knownPartitions.exists(_.topic == topic2)
             },
             s"query never reconfigured to new topic $topic2")
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index fa6bdc2..fa1468a 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.streaming.Trigger
@@ -46,10 +46,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
     testUtils.addPartitions(topic, newCount)
     eventually(timeout(streamingTimeout)) {
       assert(
-        query.lastExecution.executedPlan.collectFirst {
-          case scan: DataSourceV2ScanExec
-              if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
-            scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
+        query.lastExecution.logical.collectFirst {
+          case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
         }.exists(_.knownPartitions.size == newCount),
         s"query never reconfigured to $newCount partitions")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 8e246db..65615fd 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
 import java.io._
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.{Files, Paths}
-import java.util.Locale
+import java.util.{Locale, Optional}
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
 import scala.io.Source
 import scala.util.Random
 
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.TopicPartition
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
@@ -40,9 +40,11 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
 
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest {
 
@@ -112,16 +114,14 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
         query.nonEmpty,
         "Cannot add data when there is no query for finding the active kafka source")
 
-      val sources: Seq[BaseStreamingSource] = {
+      val sources = {
         query.get.logicalPlan.collect {
           case StreamingExecutionRelation(source: KafkaSource, _) => source
-          case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport, _) => source
+          case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
         } ++ (query.get.lastExecution match {
           case null => Seq()
           case e => e.logical.collect {
-            case r: StreamingDataSourceV2Relation
-                if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
-              r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
+            case StreamingDataSourceV2Relation(_, _, _, reader: KafkaContinuousReader) => reader
           }
         })
       }.distinct
@@ -905,7 +905,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       makeSureGetOffsetCalled,
       AssertOnQuery { query =>
         query.logicalPlan.collect {
-          case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) => true
+          case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true
         }.nonEmpty
       }
     )
@@ -930,16 +930,17 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
           "kafka.bootstrap.servers" -> testUtils.brokerAddress,
           "subscribe" -> topic
         ) ++ Option(minPartitions).map { p => "minPartitions" -> p}
-        val readSupport = provider.createMicroBatchReadSupport(
-          dir.getAbsolutePath, new DataSourceOptions(options.asJava))
-        val config = readSupport.newScanConfigBuilder(
-          KafkaSourceOffset(Map(tp -> 0L)),
-          KafkaSourceOffset(Map(tp -> 100L))).build()
-        val inputPartitions = readSupport.planInputPartitions(config)
+        val reader = provider.createMicroBatchReader(
+          Optional.empty[StructType], dir.getAbsolutePath, new DataSourceOptions(options.asJava))
+        reader.setOffsetRange(
+          Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
+          Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
+        )
+        val factories = reader.planInputPartitions().asScala
           .map(_.asInstanceOf[KafkaMicroBatchInputPartition])
-        withClue(s"minPartitions = $minPartitions generated factories $inputPartitions\n\t") {
-          assert(inputPartitions.size == numPartitionsGenerated)
-          inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }
+        withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
+          assert(factories.size == numPartitionsGenerated)
+          factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org