You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/09/12 18:25:31 UTC

[3/7] spark git commit: [SPARK-24882][SQL] Revert [] improve data source v2 API from branch 2.4

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala
deleted file mode 100644
index 9f88416..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming.sources
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage}
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
-
-/**
- * A [[BatchWriteSupport]] used to hook V2 stream writers into a microbatch plan. It implements
- * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped
- * streaming write support.
- */
-class MicroBatchWritSupport(eppchId: Long, val writeSupport: StreamingWriteSupport)
-  extends BatchWriteSupport {
-
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    writeSupport.commit(eppchId, messages)
-  }
-
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    writeSupport.abort(eppchId, messages)
-  }
-
-  override def createBatchWriterFactory(): DataWriterFactory = {
-    new MicroBatchWriterFactory(eppchId, writeSupport.createStreamingWriterFactory())
-  }
-}
-
-class MicroBatchWriterFactory(epochId: Long, streamingWriterFactory: StreamingDataWriterFactory)
-  extends DataWriterFactory {
-
-  override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
-    streamingWriterFactory.createWriter(partitionId, taskId, epochId)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
new file mode 100644
index 0000000..2d43a7b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+
+/**
+ * A [[DataSourceWriter]] used to hook V2 stream writers into a microbatch plan. It implements
+ * the non-streaming interface, forwarding the batch ID determined at construction to a wrapped
+ * streaming writer.
+ */
+class MicroBatchWriter(batchId: Long, val writer: StreamWriter) extends DataSourceWriter {
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    writer.commit(batchId, messages)
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = writer.abort(batchId, messages)
+
+  override def createWriterFactory(): DataWriterFactory[InternalRow] = writer.createWriterFactory()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
index ac3c71c..f26e11d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
@@ -21,18 +21,17 @@ import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage}
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingDataWriterFactory
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, DataWriterFactory, WriterCommitMessage}
 
 /**
  * A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery
- * to a [[BatchWriteSupport]] on the driver.
+ * to a [[DataSourceWriter]] on the driver.
  *
  * Note that, because it sends all rows to the driver, this factory will generally be unsuitable
  * for production-quality sinks. It's intended for use in tests.
  */
-case object PackedRowWriterFactory extends StreamingDataWriterFactory {
-  override def createWriter(
+case object PackedRowWriterFactory extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(
       partitionId: Int,
       taskId: Long,
       epochId: Long): DataWriter[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
deleted file mode 100644
index 90680ea..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming.sources
-
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
-
-// A special `MicroBatchReadSupport` that can get latestOffset with a start offset.
-trait RateControlMicroBatchReadSupport extends MicroBatchReadSupport {
-
-  override def latestOffset(): Offset = {
-    throw new IllegalAccessException(
-      "latestOffset should not be called for RateControlMicroBatchReadSupport")
-  }
-
-  def latestOffset(start: Offset): Offset
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReadSupport.scala
deleted file mode 100644
index f536404..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReadSupport.scala
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming.sources
-
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.TimeUnit
-
-import org.apache.commons.io.IOUtils
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{ManualClock, SystemClock}
-
-class RateStreamMicroBatchReadSupport(options: DataSourceOptions, checkpointLocation: String)
-  extends MicroBatchReadSupport with Logging {
-  import RateStreamProvider._
-
-  private[sources] val clock = {
-    // The option to use a manual clock is provided only for unit testing purposes.
-    if (options.getBoolean("useManualClock", false)) new ManualClock else new SystemClock
-  }
-
-  private val rowsPerSecond =
-    options.get(ROWS_PER_SECOND).orElse("1").toLong
-
-  private val rampUpTimeSeconds =
-    Option(options.get(RAMP_UP_TIME).orElse(null.asInstanceOf[String]))
-      .map(JavaUtils.timeStringAsSec(_))
-      .getOrElse(0L)
-
-  private val maxSeconds = Long.MaxValue / rowsPerSecond
-
-  if (rampUpTimeSeconds > maxSeconds) {
-    throw new ArithmeticException(
-      s"Integer overflow. Max offset with $rowsPerSecond rowsPerSecond" +
-        s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
-  }
-
-  private[sources] val creationTimeMs = {
-    val session = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
-    require(session.isDefined)
-
-    val metadataLog =
-      new HDFSMetadataLog[LongOffset](session.get, checkpointLocation) {
-        override def serialize(metadata: LongOffset, out: OutputStream): Unit = {
-          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
-          writer.write("v" + VERSION + "\n")
-          writer.write(metadata.json)
-          writer.flush
-        }
-
-        override def deserialize(in: InputStream): LongOffset = {
-          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
-          // HDFSMetadataLog guarantees that it never creates a partial file.
-          assert(content.length != 0)
-          if (content(0) == 'v') {
-            val indexOfNewLine = content.indexOf("\n")
-            if (indexOfNewLine > 0) {
-              parseVersion(content.substring(0, indexOfNewLine), VERSION)
-              LongOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
-            } else {
-              throw new IllegalStateException(
-                s"Log file was malformed: failed to detect the log file version line.")
-            }
-          } else {
-            throw new IllegalStateException(
-              s"Log file was malformed: failed to detect the log file version line.")
-          }
-        }
-      }
-
-    metadataLog.get(0).getOrElse {
-      val offset = LongOffset(clock.getTimeMillis())
-      metadataLog.add(0, offset)
-      logInfo(s"Start time: $offset")
-      offset
-    }.offset
-  }
-
-  @volatile private var lastTimeMs: Long = creationTimeMs
-
-  override def initialOffset(): Offset = LongOffset(0L)
-
-  override def latestOffset(): Offset = {
-    val now = clock.getTimeMillis()
-    if (lastTimeMs < now) {
-      lastTimeMs = now
-    }
-    LongOffset(TimeUnit.MILLISECONDS.toSeconds(lastTimeMs - creationTimeMs))
-  }
-
-  override def deserializeOffset(json: String): Offset = {
-    LongOffset(json.toLong)
-  }
-
-  override def fullSchema(): StructType = SCHEMA
-
-  override def newScanConfigBuilder(start: Offset, end: Offset): ScanConfigBuilder = {
-    new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
-  }
-
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
-    val sc = config.asInstanceOf[SimpleStreamingScanConfig]
-    val startSeconds = sc.start.asInstanceOf[LongOffset].offset
-    val endSeconds = sc.end.get.asInstanceOf[LongOffset].offset
-    assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
-    if (endSeconds > maxSeconds) {
-      throw new ArithmeticException("Integer overflow. Max offset with " +
-        s"$rowsPerSecond rowsPerSecond is $maxSeconds, but it's $endSeconds now.")
-    }
-    // Fix "lastTimeMs" for recovery
-    if (lastTimeMs < TimeUnit.SECONDS.toMillis(endSeconds) + creationTimeMs) {
-      lastTimeMs = TimeUnit.SECONDS.toMillis(endSeconds) + creationTimeMs
-    }
-    val rangeStart = valueAtSecond(startSeconds, rowsPerSecond, rampUpTimeSeconds)
-    val rangeEnd = valueAtSecond(endSeconds, rowsPerSecond, rampUpTimeSeconds)
-    logDebug(s"startSeconds: $startSeconds, endSeconds: $endSeconds, " +
-      s"rangeStart: $rangeStart, rangeEnd: $rangeEnd")
-
-    if (rangeStart == rangeEnd) {
-      return Array.empty
-    }
-
-    val localStartTimeMs = creationTimeMs + TimeUnit.SECONDS.toMillis(startSeconds)
-    val relativeMsPerValue =
-      TimeUnit.SECONDS.toMillis(endSeconds - startSeconds).toDouble / (rangeEnd - rangeStart)
-    val numPartitions = {
-      val activeSession = SparkSession.getActiveSession
-      require(activeSession.isDefined)
-      Option(options.get(NUM_PARTITIONS).orElse(null.asInstanceOf[String]))
-        .map(_.toInt)
-        .getOrElse(activeSession.get.sparkContext.defaultParallelism)
-    }
-
-    (0 until numPartitions).map { p =>
-      new RateStreamMicroBatchInputPartition(
-        p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
-    }.toArray
-  }
-
-  override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = {
-    RateStreamMicroBatchReaderFactory
-  }
-
-  override def commit(end: Offset): Unit = {}
-
-  override def stop(): Unit = {}
-
-  override def toString: String = s"RateStreamV2[rowsPerSecond=$rowsPerSecond, " +
-    s"rampUpTimeSeconds=$rampUpTimeSeconds, " +
-    s"numPartitions=${options.get(NUM_PARTITIONS).orElse("default")}"
-}
-
-case class RateStreamMicroBatchInputPartition(
-    partitionId: Int,
-    numPartitions: Int,
-    rangeStart: Long,
-    rangeEnd: Long,
-    localStartTimeMs: Long,
-    relativeMsPerValue: Double) extends InputPartition
-
-object RateStreamMicroBatchReaderFactory extends PartitionReaderFactory {
-  override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
-    val p = partition.asInstanceOf[RateStreamMicroBatchInputPartition]
-    new RateStreamMicroBatchPartitionReader(p.partitionId, p.numPartitions, p.rangeStart,
-      p.rangeEnd, p.localStartTimeMs, p.relativeMsPerValue)
-  }
-}
-
-class RateStreamMicroBatchPartitionReader(
-    partitionId: Int,
-    numPartitions: Int,
-    rangeStart: Long,
-    rangeEnd: Long,
-    localStartTimeMs: Long,
-    relativeMsPerValue: Double) extends PartitionReader[InternalRow] {
-  private var count: Long = 0
-
-  override def next(): Boolean = {
-    rangeStart + partitionId + numPartitions * count < rangeEnd
-  }
-
-  override def get(): InternalRow = {
-    val currValue = rangeStart + partitionId + numPartitions * count
-    count += 1
-    val relative = math.round((currValue - rangeStart) * relativeMsPerValue)
-    InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), currValue)
-  }
-
-  override def close(): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
new file mode 100644
index 0000000..9e0d954
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.Optional
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{ManualClock, SystemClock}
+
+class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: String)
+  extends MicroBatchReader with Logging {
+  import RateStreamProvider._
+
+  private[sources] val clock = {
+    // The option to use a manual clock is provided only for unit testing purposes.
+    if (options.getBoolean("useManualClock", false)) new ManualClock else new SystemClock
+  }
+
+  private val rowsPerSecond =
+    options.get(ROWS_PER_SECOND).orElse("1").toLong
+
+  private val rampUpTimeSeconds =
+    Option(options.get(RAMP_UP_TIME).orElse(null.asInstanceOf[String]))
+      .map(JavaUtils.timeStringAsSec(_))
+      .getOrElse(0L)
+
+  private val maxSeconds = Long.MaxValue / rowsPerSecond
+
+  if (rampUpTimeSeconds > maxSeconds) {
+    throw new ArithmeticException(
+      s"Integer overflow. Max offset with $rowsPerSecond rowsPerSecond" +
+        s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
+  }
+
+  private[sources] val creationTimeMs = {
+    val session = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
+    require(session.isDefined)
+
+    val metadataLog =
+      new HDFSMetadataLog[LongOffset](session.get, checkpointLocation) {
+        override def serialize(metadata: LongOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush
+        }
+
+        override def deserialize(in: InputStream): LongOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+          // HDFSMetadataLog guarantees that it never creates a partial file.
+          assert(content.length != 0)
+          if (content(0) == 'v') {
+            val indexOfNewLine = content.indexOf("\n")
+            if (indexOfNewLine > 0) {
+              parseVersion(content.substring(0, indexOfNewLine), VERSION)
+              LongOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+            } else {
+              throw new IllegalStateException(
+                s"Log file was malformed: failed to detect the log file version line.")
+            }
+          } else {
+            throw new IllegalStateException(
+              s"Log file was malformed: failed to detect the log file version line.")
+          }
+        }
+      }
+
+    metadataLog.get(0).getOrElse {
+      val offset = LongOffset(clock.getTimeMillis())
+      metadataLog.add(0, offset)
+      logInfo(s"Start time: $offset")
+      offset
+    }.offset
+  }
+
+  @volatile private var lastTimeMs: Long = creationTimeMs
+
+  private var start: LongOffset = _
+  private var end: LongOffset = _
+
+  override def readSchema(): StructType = SCHEMA
+
+  override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {
+    this.start = start.orElse(LongOffset(0L)).asInstanceOf[LongOffset]
+    this.end = end.orElse {
+      val now = clock.getTimeMillis()
+      if (lastTimeMs < now) {
+        lastTimeMs = now
+      }
+      LongOffset(TimeUnit.MILLISECONDS.toSeconds(lastTimeMs - creationTimeMs))
+    }.asInstanceOf[LongOffset]
+  }
+
+  override def getStartOffset(): Offset = {
+    if (start == null) throw new IllegalStateException("start offset not set")
+    start
+  }
+  override def getEndOffset(): Offset = {
+    if (end == null) throw new IllegalStateException("end offset not set")
+    end
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    LongOffset(json.toLong)
+  }
+
+  override def planInputPartitions(): java.util.List[InputPartition[InternalRow]] = {
+    val startSeconds = LongOffset.convert(start).map(_.offset).getOrElse(0L)
+    val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L)
+    assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
+    if (endSeconds > maxSeconds) {
+      throw new ArithmeticException("Integer overflow. Max offset with " +
+        s"$rowsPerSecond rowsPerSecond is $maxSeconds, but it's $endSeconds now.")
+    }
+    // Fix "lastTimeMs" for recovery
+    if (lastTimeMs < TimeUnit.SECONDS.toMillis(endSeconds) + creationTimeMs) {
+      lastTimeMs = TimeUnit.SECONDS.toMillis(endSeconds) + creationTimeMs
+    }
+    val rangeStart = valueAtSecond(startSeconds, rowsPerSecond, rampUpTimeSeconds)
+    val rangeEnd = valueAtSecond(endSeconds, rowsPerSecond, rampUpTimeSeconds)
+    logDebug(s"startSeconds: $startSeconds, endSeconds: $endSeconds, " +
+      s"rangeStart: $rangeStart, rangeEnd: $rangeEnd")
+
+    if (rangeStart == rangeEnd) {
+      return List.empty.asJava
+    }
+
+    val localStartTimeMs = creationTimeMs + TimeUnit.SECONDS.toMillis(startSeconds)
+    val relativeMsPerValue =
+      TimeUnit.SECONDS.toMillis(endSeconds - startSeconds).toDouble / (rangeEnd - rangeStart)
+    val numPartitions = {
+      val activeSession = SparkSession.getActiveSession
+      require(activeSession.isDefined)
+      Option(options.get(NUM_PARTITIONS).orElse(null.asInstanceOf[String]))
+        .map(_.toInt)
+        .getOrElse(activeSession.get.sparkContext.defaultParallelism)
+    }
+
+    (0 until numPartitions).map { p =>
+      new RateStreamMicroBatchInputPartition(
+        p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
+        : InputPartition[InternalRow]
+    }.toList.asJava
+  }
+
+  override def commit(end: Offset): Unit = {}
+
+  override def stop(): Unit = {}
+
+  override def toString: String = s"RateStreamV2[rowsPerSecond=$rowsPerSecond, " +
+    s"rampUpTimeSeconds=$rampUpTimeSeconds, " +
+    s"numPartitions=${options.get(NUM_PARTITIONS).orElse("default")}"
+}
+
+class RateStreamMicroBatchInputPartition(
+    partitionId: Int,
+    numPartitions: Int,
+    rangeStart: Long,
+    rangeEnd: Long,
+    localStartTimeMs: Long,
+    relativeMsPerValue: Double) extends InputPartition[InternalRow] {
+
+  override def createPartitionReader(): InputPartitionReader[InternalRow] =
+    new RateStreamMicroBatchInputPartitionReader(partitionId, numPartitions, rangeStart, rangeEnd,
+      localStartTimeMs, relativeMsPerValue)
+}
+
+class RateStreamMicroBatchInputPartitionReader(
+    partitionId: Int,
+    numPartitions: Int,
+    rangeStart: Long,
+    rangeEnd: Long,
+    localStartTimeMs: Long,
+    relativeMsPerValue: Double) extends InputPartitionReader[InternalRow] {
+  private var count: Long = 0
+
+  override def next(): Boolean = {
+    rangeStart + partitionId + numPartitions * count < rangeEnd
+  }
+
+  override def get(): InternalRow = {
+    val currValue = rangeStart + partitionId + numPartitions * count
+    count += 1
+    val relative = math.round((currValue - rangeStart) * relativeMsPerValue)
+    InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), currValue)
+  }
+
+  override def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 6942dfb..6bdd492 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.execution.streaming.sources
 
+import java.util.Optional
+
 import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReadSupport
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader}
 import org.apache.spark.sql.types._
 
 /**
@@ -39,12 +42,13 @@ import org.apache.spark.sql.types._
  *    be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.
  */
 class RateStreamProvider extends DataSourceV2
-  with MicroBatchReadSupportProvider with ContinuousReadSupportProvider with DataSourceRegister {
+  with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister {
   import RateStreamProvider._
 
-  override def createMicroBatchReadSupport(
+  override def createMicroBatchReader(
+      schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceOptions): MicroBatchReadSupport = {
+      options: DataSourceOptions): MicroBatchReader = {
       if (options.get(ROWS_PER_SECOND).isPresent) {
       val rowsPerSecond = options.get(ROWS_PER_SECOND).get().toLong
       if (rowsPerSecond <= 0) {
@@ -70,14 +74,17 @@ class RateStreamProvider extends DataSourceV2
       }
     }
 
-    new RateStreamMicroBatchReadSupport(options, checkpointLocation)
+    if (schema.isPresent) {
+      throw new AnalysisException("The rate source does not support a user-specified schema.")
+    }
+
+    new RateStreamMicroBatchReader(options, checkpointLocation)
   }
 
-  override def createContinuousReadSupport(
+  override def createContinuousReader(
+     schema: Optional[StructType],
      checkpointLocation: String,
-     options: DataSourceOptions): ContinuousReadSupport = {
-    new RateStreamContinuousReadSupport(options)
-  }
+     options: DataSourceOptions): ContinuousReader = new RateStreamContinuousReader(options)
 
   override def shortName(): String = "rate"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index c50dc7b..cb76e86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -32,9 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
 import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -42,15 +42,13 @@ import org.apache.spark.sql.types.StructType
  * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
  * tests and does not provide durability.
  */
-class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider
-  with MemorySinkBase with Logging {
-
-  override def createStreamingWriteSupport(
+class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkBase with Logging {
+  override def createStreamWriter(
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = {
-    new MemoryStreamingWriteSupport(this, mode, schema)
+      options: DataSourceOptions): StreamWriter = {
+    new MemoryStreamWriter(this, mode, schema)
   }
 
   private case class AddedData(batchId: Long, data: Array[Row])
@@ -122,13 +120,10 @@ class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider
 case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
   extends WriterCommitMessage {}
 
-class MemoryStreamingWriteSupport(
-    val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
-  extends StreamingWriteSupport {
+class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
+  extends StreamWriter {
 
-  override def createStreamingWriterFactory: MemoryWriterFactory = {
-    MemoryWriterFactory(outputMode, schema)
-  }
+  override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode, schema)
 
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
     val newRows = messages.flatMap {
@@ -143,19 +138,13 @@ class MemoryStreamingWriteSupport(
 }
 
 case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
-  extends DataWriterFactory with StreamingDataWriterFactory {
+  extends DataWriterFactory[InternalRow] {
 
-  override def createWriter(
-      partitionId: Int,
-      taskId: Long): DataWriter[InternalRow] = {
-    new MemoryDataWriter(partitionId, outputMode, schema)
-  }
-
-  override def createWriter(
+  override def createDataWriter(
       partitionId: Int,
       taskId: Long,
       epochId: Long): DataWriter[InternalRow] = {
-    createWriter(partitionId, taskId)
+    new MemoryDataWriter(partitionId, outputMode, schema)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
index b2a573e..874c479 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.execution.streaming.sources
 import java.io.{BufferedReader, InputStreamReader, IOException}
 import java.net.Socket
 import java.text.SimpleDateFormat
-import java.util.{Calendar, Locale}
+import java.util.{Calendar, List => JList, Locale, Optional}
 import java.util.concurrent.atomic.AtomicBoolean
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
@@ -31,15 +32,16 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.streaming.{LongOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
-import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReadSupport
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReader
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, DataSourceV2, MicroBatchReadSupportProvider}
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchReadSupport, Offset}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, DataSourceV2, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 
+// Shared object for micro-batch and continuous reader
 object TextSocketReader {
   val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
   val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
@@ -48,12 +50,14 @@ object TextSocketReader {
 }
 
 /**
- * A MicroBatchReadSupport that reads text lines through a TCP socket, designed only for tutorials
- * and debugging. This MicroBatchReadSupport will *not* work in production applications due to
- * multiple reasons, including no support for fault recovery.
+ * A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and
+ * debugging. This MicroBatchReader will *not* work in production applications due to multiple
+ * reasons, including no support for fault recovery.
  */
-class TextSocketMicroBatchReadSupport(options: DataSourceOptions)
-  extends MicroBatchReadSupport with Logging {
+class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchReader with Logging {
+
+  private var startOffset: Offset = _
+  private var endOffset: Offset = _
 
   private val host: String = options.get("host").get()
   private val port: Int = options.get("port").get().toInt
@@ -99,7 +103,7 @@ class TextSocketMicroBatchReadSupport(options: DataSourceOptions)
               logWarning(s"Stream closed by $host:$port")
               return
             }
-            TextSocketMicroBatchReadSupport.this.synchronized {
+            TextSocketMicroBatchReader.this.synchronized {
               val newData = (
                 UTF8String.fromString(line),
                 DateTimeUtils.fromMillis(Calendar.getInstance().getTimeInMillis)
@@ -116,15 +120,24 @@ class TextSocketMicroBatchReadSupport(options: DataSourceOptions)
     readThread.start()
   }
 
-  override def initialOffset(): Offset = LongOffset(-1L)
+  override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = synchronized {
+    startOffset = start.orElse(LongOffset(-1L))
+    endOffset = end.orElse(currentOffset)
+  }
 
-  override def latestOffset(): Offset = currentOffset
+  override def getStartOffset(): Offset = {
+    Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
+  }
+
+  override def getEndOffset(): Offset = {
+    Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
+  }
 
   override def deserializeOffset(json: String): Offset = {
     LongOffset(json.toLong)
   }
 
-  override def fullSchema(): StructType = {
+  override def readSchema(): StructType = {
     if (options.getBoolean("includeTimestamp", false)) {
       TextSocketReader.SCHEMA_TIMESTAMP
     } else {
@@ -132,14 +145,12 @@ class TextSocketMicroBatchReadSupport(options: DataSourceOptions)
     }
   }
 
-  override def newScanConfigBuilder(start: Offset, end: Offset): ScanConfigBuilder = {
-    new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
-  }
+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
+    assert(startOffset != null && endOffset != null,
+      "start offset and end offset should already be set before create read tasks.")
 
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
-    val sc = config.asInstanceOf[SimpleStreamingScanConfig]
-    val startOrdinal = sc.start.asInstanceOf[LongOffset].offset.toInt + 1
-    val endOrdinal = sc.end.get.asInstanceOf[LongOffset].offset.toInt + 1
+    val startOrdinal = LongOffset.convert(startOffset).get.offset.toInt + 1
+    val endOrdinal = LongOffset.convert(endOffset).get.offset.toInt + 1
 
     // Internal buffer only holds the batches after lastOffsetCommitted
     val rawList = synchronized {
@@ -161,29 +172,26 @@ class TextSocketMicroBatchReadSupport(options: DataSourceOptions)
       slices(idx % numPartitions).append(r)
     }
 
-    slices.map(TextSocketInputPartition)
-  }
+    (0 until numPartitions).map { i =>
+      val slice = slices(i)
+      new InputPartition[InternalRow] {
+        override def createPartitionReader(): InputPartitionReader[InternalRow] =
+          new InputPartitionReader[InternalRow] {
+            private var currentIdx = -1
 
-  override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = {
-    new PartitionReaderFactory {
-      override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
-        val slice = partition.asInstanceOf[TextSocketInputPartition].slice
-        new PartitionReader[InternalRow] {
-          private var currentIdx = -1
+            override def next(): Boolean = {
+              currentIdx += 1
+              currentIdx < slice.size
+            }
 
-          override def next(): Boolean = {
-            currentIdx += 1
-            currentIdx < slice.size
-          }
+            override def get(): InternalRow = {
+              InternalRow(slice(currentIdx)._1, slice(currentIdx)._2)
+            }
 
-          override def get(): InternalRow = {
-            InternalRow(slice(currentIdx)._1, slice(currentIdx)._2)
+            override def close(): Unit = {}
           }
-
-          override def close(): Unit = {}
-        }
       }
-    }
+    }.toList.asJava
   }
 
   override def commit(end: Offset): Unit = synchronized {
@@ -219,11 +227,8 @@ class TextSocketMicroBatchReadSupport(options: DataSourceOptions)
   override def toString: String = s"TextSocketV2[host: $host, port: $port]"
 }
 
-case class TextSocketInputPartition(slice: ListBuffer[(UTF8String, Long)]) extends InputPartition
-
 class TextSocketSourceProvider extends DataSourceV2
-  with MicroBatchReadSupportProvider with ContinuousReadSupportProvider
-  with DataSourceRegister with Logging {
+  with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister with Logging {
 
   private def checkParameters(params: DataSourceOptions): Unit = {
     logWarning("The socket source should not be used for production applications! " +
@@ -243,18 +248,27 @@ class TextSocketSourceProvider extends DataSourceV2
     }
   }
 
-  override def createMicroBatchReadSupport(
+  override def createMicroBatchReader(
+      schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceOptions): MicroBatchReadSupport = {
+      options: DataSourceOptions): MicroBatchReader = {
     checkParameters(options)
-    new TextSocketMicroBatchReadSupport(options)
+    if (schema.isPresent) {
+      throw new AnalysisException("The socket source does not support a user-specified schema.")
+    }
+
+    new TextSocketMicroBatchReader(options)
   }
 
-  override def createContinuousReadSupport(
+  override def createContinuousReader(
+      schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceOptions): ContinuousReadSupport = {
+      options: DataSourceOptions): ContinuousReader = {
     checkParameters(options)
-    new TextSocketContinuousReadSupport(options)
+    if (schema.isPresent) {
+      throw new AnalysisException("The socket source does not support a user-specified schema.")
+    }
+    new TextSocketContinuousReader(options)
   }
 
   /** String that represents the format that this data source provider uses. */

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 2a4db4a..7eb5db5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import java.util.Locale
+import java.util.{Locale, Optional}
 
 import scala.collection.JavaConverters._
 
@@ -28,8 +28,8 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
 import org.apache.spark.sql.sources.StreamSourceProvider
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, MicroBatchReadSupportProvider}
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -172,21 +172,19 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
       case _ => None
     }
     ds match {
-      case s: MicroBatchReadSupportProvider =>
-        var tempReadSupport: MicroBatchReadSupport = null
+      case s: MicroBatchReadSupport =>
+        var tempReader: MicroBatchReader = null
         val schema = try {
-          val tmpCheckpointPath = Utils.createTempDir(namePrefix = s"tempCP").getCanonicalPath
-          tempReadSupport = if (userSpecifiedSchema.isDefined) {
-            s.createMicroBatchReadSupport(userSpecifiedSchema.get, tmpCheckpointPath, options)
-          } else {
-            s.createMicroBatchReadSupport(tmpCheckpointPath, options)
-          }
-          tempReadSupport.fullSchema()
+          tempReader = s.createMicroBatchReader(
+            Optional.ofNullable(userSpecifiedSchema.orNull),
+            Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
+            options)
+          tempReader.readSchema()
         } finally {
           // Stop tempReader to avoid side-effect thing
-          if (tempReadSupport != null) {
-            tempReadSupport.stop()
-            tempReadSupport = null
+          if (tempReader != null) {
+            tempReader.stop()
+            tempReader = null
           }
         }
         Dataset.ofRows(
@@ -194,28 +192,16 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
           StreamingRelationV2(
             s, source, extraOptions.toMap,
             schema.toAttributes, v1Relation)(sparkSession))
-      case s: ContinuousReadSupportProvider =>
-        var tempReadSupport: ContinuousReadSupport = null
-        val schema = try {
-          val tmpCheckpointPath = Utils.createTempDir(namePrefix = s"tempCP").getCanonicalPath
-          tempReadSupport = if (userSpecifiedSchema.isDefined) {
-            s.createContinuousReadSupport(userSpecifiedSchema.get, tmpCheckpointPath, options)
-          } else {
-            s.createContinuousReadSupport(tmpCheckpointPath, options)
-          }
-          tempReadSupport.fullSchema()
-        } finally {
-          // Stop tempReader to avoid side-effect thing
-          if (tempReadSupport != null) {
-            tempReadSupport.stop()
-            tempReadSupport = null
-          }
-        }
+      case s: ContinuousReadSupport =>
+        val tempReader = s.createContinuousReader(
+          Optional.ofNullable(userSpecifiedSchema.orNull),
+          Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
+          options)
         Dataset.ofRows(
           sparkSession,
           StreamingRelationV2(
             s, source, extraOptions.toMap,
-            schema.toAttributes, v1Relation)(sparkSession))
+            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
       case _ =>
         // Code path for data source v1.
         Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 7866e4f..3b9a56f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -21,7 +21,7 @@ import java.util.Locale
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.annotation.InterfaceStability
+import org.apache.spark.annotation.{InterfaceStability, Since}
 import org.apache.spark.api.java.function.VoidFunction2
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.sources._
-import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
+import org.apache.spark.sql.sources.v2.StreamWriteSupport
 
 /**
  * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -270,7 +270,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       query
     } else if (source == "foreach") {
       assertNotPartitioned("foreach")
-      val sink = ForeachWriteSupportProvider[T](foreachWriter, ds.exprEnc)
+      val sink = ForeachWriterProvider[T](foreachWriter, ds.exprEnc)
       df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
         extraOptions.get("checkpointLocation"),
@@ -299,8 +299,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
       val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
       val sink = ds.newInstance() match {
-        case w: StreamingWriteSupportProvider
-            if !disabledSources.contains(w.getClass.getCanonicalName) => w
+        case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
         case _ =>
           val ds = DataSource(
             df.sparkSession,

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index cd52d99..25bb052 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution,
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
+import org.apache.spark.sql.sources.v2.StreamWriteSupport
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
@@ -256,7 +256,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
     }
 
     (sink, trigger) match {
-      case (v2Sink: StreamingWriteSupportProvider, trigger: ContinuousTrigger) =>
+      case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
         if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
           UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index 5602310..e4cead9 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -24,71 +24,29 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.GreaterThan;
-import org.apache.spark.sql.sources.v2.BatchReadSupportProvider;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
 
-public class JavaAdvancedDataSourceV2 implements DataSourceV2, BatchReadSupportProvider {
+public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
 
-  public class ReadSupport extends JavaSimpleReadSupport {
-    @Override
-    public ScanConfigBuilder newScanConfigBuilder() {
-      return new AdvancedScanConfigBuilder();
-    }
-
-    @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
-      Filter[] filters = ((AdvancedScanConfigBuilder) config).filters;
-      List<InputPartition> res = new ArrayList<>();
-
-      Integer lowerBound = null;
-      for (Filter filter : filters) {
-        if (filter instanceof GreaterThan) {
-          GreaterThan f = (GreaterThan) filter;
-          if ("i".equals(f.attribute()) && f.value() instanceof Integer) {
-            lowerBound = (Integer) f.value();
-            break;
-          }
-        }
-      }
-
-      if (lowerBound == null) {
-        res.add(new JavaRangeInputPartition(0, 5));
-        res.add(new JavaRangeInputPartition(5, 10));
-      } else if (lowerBound < 4) {
-        res.add(new JavaRangeInputPartition(lowerBound + 1, 5));
-        res.add(new JavaRangeInputPartition(5, 10));
-      } else if (lowerBound < 9) {
-        res.add(new JavaRangeInputPartition(lowerBound + 1, 10));
-      }
-
-      return res.stream().toArray(InputPartition[]::new);
-    }
-
-    @Override
-    public PartitionReaderFactory createReaderFactory(ScanConfig config) {
-      StructType requiredSchema = ((AdvancedScanConfigBuilder) config).requiredSchema;
-      return new AdvancedReaderFactory(requiredSchema);
-    }
-  }
-
-  public static class AdvancedScanConfigBuilder implements ScanConfigBuilder, ScanConfig,
-    SupportsPushDownFilters, SupportsPushDownRequiredColumns {
+  public class Reader implements DataSourceReader, SupportsPushDownRequiredColumns,
+    SupportsPushDownFilters {
 
     // Exposed for testing.
     public StructType requiredSchema = new StructType().add("i", "int").add("j", "int");
     public Filter[] filters = new Filter[0];
 
     @Override
-    public void pruneColumns(StructType requiredSchema) {
-      this.requiredSchema = requiredSchema;
+    public StructType readSchema() {
+      return requiredSchema;
     }
 
     @Override
-    public StructType readSchema() {
-      return requiredSchema;
+    public void pruneColumns(StructType requiredSchema) {
+      this.requiredSchema = requiredSchema;
     }
 
     @Override
@@ -121,54 +79,79 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, BatchReadSupportP
     }
 
     @Override
-    public ScanConfig build() {
-      return this;
+    public List<InputPartition<InternalRow>> planInputPartitions() {
+      List<InputPartition<InternalRow>> res = new ArrayList<>();
+
+      Integer lowerBound = null;
+      for (Filter filter : filters) {
+        if (filter instanceof GreaterThan) {
+          GreaterThan f = (GreaterThan) filter;
+          if ("i".equals(f.attribute()) && f.value() instanceof Integer) {
+            lowerBound = (Integer) f.value();
+            break;
+          }
+        }
+      }
+
+      if (lowerBound == null) {
+        res.add(new JavaAdvancedInputPartition(0, 5, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(5, 10, requiredSchema));
+      } else if (lowerBound < 4) {
+        res.add(new JavaAdvancedInputPartition(lowerBound + 1, 5, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(5, 10, requiredSchema));
+      } else if (lowerBound < 9) {
+        res.add(new JavaAdvancedInputPartition(lowerBound + 1, 10, requiredSchema));
+      }
+
+      return res;
     }
   }
 
-  static class AdvancedReaderFactory implements PartitionReaderFactory {
-    StructType requiredSchema;
+  static class JavaAdvancedInputPartition implements InputPartition<InternalRow>,
+      InputPartitionReader<InternalRow> {
+    private int start;
+    private int end;
+    private StructType requiredSchema;
 
-    AdvancedReaderFactory(StructType requiredSchema) {
+    JavaAdvancedInputPartition(int start, int end, StructType requiredSchema) {
+      this.start = start;
+      this.end = end;
       this.requiredSchema = requiredSchema;
     }
 
     @Override
-    public PartitionReader<InternalRow> createReader(InputPartition partition) {
-      JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
-      return new PartitionReader<InternalRow>() {
-        private int current = p.start - 1;
-
-        @Override
-        public boolean next() throws IOException {
-          current += 1;
-          return current < p.end;
-        }
+    public InputPartitionReader<InternalRow> createPartitionReader() {
+      return new JavaAdvancedInputPartition(start - 1, end, requiredSchema);
+    }
 
-        @Override
-        public InternalRow get() {
-          Object[] values = new Object[requiredSchema.size()];
-          for (int i = 0; i < values.length; i++) {
-            if ("i".equals(requiredSchema.apply(i).name())) {
-              values[i] = current;
-            } else if ("j".equals(requiredSchema.apply(i).name())) {
-              values[i] = -current;
-            }
-          }
-          return new GenericInternalRow(values);
+    @Override
+    public boolean next() {
+      start += 1;
+      return start < end;
+    }
+
+    @Override
+    public InternalRow get() {
+      Object[] values = new Object[requiredSchema.size()];
+      for (int i = 0; i < values.length; i++) {
+        if ("i".equals(requiredSchema.apply(i).name())) {
+          values[i] = start;
+        } else if ("j".equals(requiredSchema.apply(i).name())) {
+          values[i] = -start;
         }
+      }
+      return new GenericInternalRow(values);
+    }
 
-        @Override
-        public void close() throws IOException {
+    @Override
+    public void close() throws IOException {
 
-        }
-      };
     }
   }
 
 
   @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
-    return new ReadSupport();
+  public DataSourceReader createReader(DataSourceOptions options) {
+    return new Reader();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
new file mode 100644
index 0000000..97d6176
--- /dev/null
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+
+public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
+
+  class Reader implements DataSourceReader, SupportsScanColumnarBatch {
+    private final StructType schema = new StructType().add("i", "int").add("j", "int");
+
+    @Override
+    public StructType readSchema() {
+      return schema;
+    }
+
+    @Override
+    public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
+      return java.util.Arrays.asList(
+               new JavaBatchInputPartition(0, 50), new JavaBatchInputPartition(50, 90));
+    }
+  }
+
+  static class JavaBatchInputPartition
+      implements InputPartition<ColumnarBatch>, InputPartitionReader<ColumnarBatch> {
+    private int start;
+    private int end;
+
+    private static final int BATCH_SIZE = 20;
+
+    private OnHeapColumnVector i;
+    private OnHeapColumnVector j;
+    private ColumnarBatch batch;
+
+    JavaBatchInputPartition(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public InputPartitionReader<ColumnarBatch> createPartitionReader() {
+      this.i = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
+      this.j = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
+      ColumnVector[] vectors = new ColumnVector[2];
+      vectors[0] = i;
+      vectors[1] = j;
+      this.batch = new ColumnarBatch(vectors);
+      return this;
+    }
+
+    @Override
+    public boolean next() {
+      i.reset();
+      j.reset();
+      int count = 0;
+      while (start < end && count < BATCH_SIZE) {
+        i.putInt(count, start);
+        j.putInt(count, -start);
+        start += 1;
+        count += 1;
+      }
+
+      if (count == 0) {
+        return false;
+      } else {
+        batch.setNumRows(count);
+        return true;
+      }
+    }
+
+    @Override
+    public ColumnarBatch get() {
+      return batch;
+    }
+
+    @Override
+    public void close() throws IOException {
+      batch.close();
+    }
+  }
+
+
+  @Override
+  public DataSourceReader createReader(DataSourceOptions options) {
+    return new Reader();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
deleted file mode 100644
index 28a9330..0000000
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package test.org.apache.spark.sql.sources.v2;
-
-import java.io.IOException;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.sources.v2.BatchReadSupportProvider;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.reader.*;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.vectorized.ColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
-
-public class JavaColumnarDataSourceV2 implements DataSourceV2, BatchReadSupportProvider {
-
-  class ReadSupport extends JavaSimpleReadSupport {
-
-    @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
-      InputPartition[] partitions = new InputPartition[2];
-      partitions[0] = new JavaRangeInputPartition(0, 50);
-      partitions[1] = new JavaRangeInputPartition(50, 90);
-      return partitions;
-    }
-
-    @Override
-    public PartitionReaderFactory createReaderFactory(ScanConfig config) {
-      return new ColumnarReaderFactory();
-    }
-  }
-
-  static class ColumnarReaderFactory implements PartitionReaderFactory {
-    private static final int BATCH_SIZE = 20;
-
-    @Override
-    public boolean supportColumnarReads(InputPartition partition) {
-      return true;
-    }
-
-    @Override
-    public PartitionReader<InternalRow> createReader(InputPartition partition) {
-      throw new UnsupportedOperationException("");
-    }
-
-    @Override
-    public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
-      JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
-      OnHeapColumnVector i = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
-      OnHeapColumnVector j = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
-      ColumnVector[] vectors = new ColumnVector[2];
-      vectors[0] = i;
-      vectors[1] = j;
-      ColumnarBatch batch = new ColumnarBatch(vectors);
-
-      return new PartitionReader<ColumnarBatch>() {
-        private int current = p.start;
-
-        @Override
-        public boolean next() throws IOException {
-          i.reset();
-          j.reset();
-          int count = 0;
-          while (current < p.end && count < BATCH_SIZE) {
-            i.putInt(count, current);
-            j.putInt(count, -current);
-            current += 1;
-            count += 1;
-          }
-
-          if (count == 0) {
-            return false;
-          } else {
-            batch.setNumRows(count);
-            return true;
-          }
-        }
-
-        @Override
-        public ColumnarBatch get() {
-          return batch;
-        }
-
-        @Override
-        public void close() throws IOException {
-          batch.close();
-        }
-      };
-    }
-  }
-
-  @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
-    return new ReadSupport();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index 18a11dd..2d21324 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -19,34 +19,38 @@ package test.org.apache.spark.sql.sources.v2;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.*;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.sources.v2.reader.partitioning.ClusteredDistribution;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Distribution;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
+import org.apache.spark.sql.types.StructType;
 
-public class JavaPartitionAwareDataSource implements DataSourceV2, BatchReadSupportProvider {
+public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
 
-  class ReadSupport extends JavaSimpleReadSupport implements SupportsReportPartitioning {
+  class Reader implements DataSourceReader, SupportsReportPartitioning {
+    private final StructType schema = new StructType().add("a", "int").add("b", "int");
 
     @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
-      InputPartition[] partitions = new InputPartition[2];
-      partitions[0] = new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6});
-      partitions[1] = new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2});
-      return partitions;
+    public StructType readSchema() {
+      return schema;
     }
 
     @Override
-    public PartitionReaderFactory createReaderFactory(ScanConfig config) {
-      return new SpecificReaderFactory();
+    public List<InputPartition<InternalRow>> planInputPartitions() {
+      return java.util.Arrays.asList(
+        new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
+        new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
     }
 
     @Override
-    public Partitioning outputPartitioning(ScanConfig config) {
+    public Partitioning outputPartitioning() {
       return new MyPartitioning();
     }
   }
@@ -62,53 +66,50 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, BatchReadSupp
     public boolean satisfy(Distribution distribution) {
       if (distribution instanceof ClusteredDistribution) {
         String[] clusteredCols = ((ClusteredDistribution) distribution).clusteredColumns;
-        return Arrays.asList(clusteredCols).contains("i");
+        return Arrays.asList(clusteredCols).contains("a");
       }
 
       return false;
     }
   }
 
-  static class SpecificInputPartition implements InputPartition {
-    int[] i;
-    int[] j;
+  static class SpecificInputPartition implements InputPartition<InternalRow>,
+    InputPartitionReader<InternalRow> {
+
+    private int[] i;
+    private int[] j;
+    private int current = -1;
 
     SpecificInputPartition(int[] i, int[] j) {
       assert i.length == j.length;
       this.i = i;
       this.j = j;
     }
-  }
 
-  static class SpecificReaderFactory implements PartitionReaderFactory {
+    @Override
+    public boolean next() throws IOException {
+      current += 1;
+      return current < i.length;
+    }
+
+    @Override
+    public InternalRow get() {
+      return new GenericInternalRow(new Object[] {i[current], j[current]});
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
 
     @Override
-    public PartitionReader<InternalRow> createReader(InputPartition partition) {
-      SpecificInputPartition p = (SpecificInputPartition) partition;
-      return new PartitionReader<InternalRow>() {
-        private int current = -1;
-
-        @Override
-        public boolean next() throws IOException {
-          current += 1;
-          return current < p.i.length;
-        }
-
-        @Override
-        public InternalRow get() {
-          return new GenericInternalRow(new Object[] {p.i[current], p.j[current]});
-        }
-
-        @Override
-        public void close() throws IOException {
-
-        }
-      };
+    public InputPartitionReader<InternalRow> createPartitionReader() {
+      return this;
     }
   }
 
   @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
-    return new ReadSupport();
+  public DataSourceReader createReader(DataSourceOptions options) {
+    return new Reader();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index cc9ac04..6fd6a44 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -17,39 +17,43 @@
 
 package test.org.apache.spark.sql.sources.v2;
 
-import org.apache.spark.sql.sources.v2.BatchReadSupportProvider;
+import java.util.List;
+
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.types.StructType;
 
-public class JavaSchemaRequiredDataSource implements DataSourceV2, BatchReadSupportProvider {
+public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupport {
 
-  class ReadSupport extends JavaSimpleReadSupport {
+  class Reader implements DataSourceReader {
     private final StructType schema;
 
-    ReadSupport(StructType schema) {
+    Reader(StructType schema) {
       this.schema = schema;
     }
 
     @Override
-    public StructType fullSchema() {
+    public StructType readSchema() {
       return schema;
     }
 
     @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
-      return new InputPartition[0];
+    public List<InputPartition<InternalRow>> planInputPartitions() {
+      return java.util.Collections.emptyList();
     }
   }
 
   @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
+  public DataSourceReader createReader(DataSourceOptions options) {
     throw new IllegalArgumentException("requires a user-supplied schema");
   }
 
   @Override
-  public BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) {
-    return new ReadSupport(schema);
+  public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
+    return new Reader(schema);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 2cdbba8..274dc37 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -17,26 +17,72 @@
 
 package test.org.apache.spark.sql.sources.v2;
 
-import org.apache.spark.sql.sources.v2.BatchReadSupportProvider;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
+
+  class Reader implements DataSourceReader {
+    private final StructType schema = new StructType().add("i", "int").add("j", "int");
+
+    @Override
+    public StructType readSchema() {
+      return schema;
+    }
+
+    @Override
+    public List<InputPartition<InternalRow>> planInputPartitions() {
+      return java.util.Arrays.asList(
+        new JavaSimpleInputPartition(0, 5),
+        new JavaSimpleInputPartition(5, 10));
+    }
+  }
+
+  static class JavaSimpleInputPartition implements InputPartition<InternalRow>,
+    InputPartitionReader<InternalRow> {
 
-public class JavaSimpleDataSourceV2 implements DataSourceV2, BatchReadSupportProvider {
+    private int start;
+    private int end;
 
-  class ReadSupport extends JavaSimpleReadSupport {
+    JavaSimpleInputPartition(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public InputPartitionReader<InternalRow> createPartitionReader() {
+      return new JavaSimpleInputPartition(start - 1, end);
+    }
 
     @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
-      InputPartition[] partitions = new InputPartition[2];
-      partitions[0] = new JavaRangeInputPartition(0, 5);
-      partitions[1] = new JavaRangeInputPartition(5, 10);
-      return partitions;
+    public boolean next() {
+      start += 1;
+      return start < end;
+    }
+
+    @Override
+    public InternalRow get() {
+      return new GenericInternalRow(new Object[] {start, -start});
+    }
+
+    @Override
+    public void close() throws IOException {
+
     }
   }
 
   @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
-    return new ReadSupport();
+  public DataSourceReader createReader(DataSourceOptions options) {
+    return new Reader();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
deleted file mode 100644
index 685f9b9..0000000
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package test.org.apache.spark.sql.sources.v2;
-
-import java.io.IOException;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.reader.*;
-import org.apache.spark.sql.types.StructType;
-
-abstract class JavaSimpleReadSupport implements BatchReadSupport {
-
-  @Override
-  public StructType fullSchema() {
-    return new StructType().add("i", "int").add("j", "int");
-  }
-
-  @Override
-  public ScanConfigBuilder newScanConfigBuilder() {
-    return new JavaNoopScanConfigBuilder(fullSchema());
-  }
-
-  @Override
-  public PartitionReaderFactory createReaderFactory(ScanConfig config) {
-    return new JavaSimpleReaderFactory();
-  }
-}
-
-class JavaNoopScanConfigBuilder implements ScanConfigBuilder, ScanConfig {
-
-  private StructType schema;
-
-  JavaNoopScanConfigBuilder(StructType schema) {
-    this.schema = schema;
-  }
-
-  @Override
-  public ScanConfig build() {
-    return this;
-  }
-
-  @Override
-  public StructType readSchema() {
-    return schema;
-  }
-}
-
-class JavaSimpleReaderFactory implements PartitionReaderFactory {
-
-  @Override
-  public PartitionReader<InternalRow> createReader(InputPartition partition) {
-    JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
-    return new PartitionReader<InternalRow>() {
-      private int current = p.start - 1;
-
-      @Override
-      public boolean next() throws IOException {
-        current += 1;
-        return current < p.end;
-      }
-
-      @Override
-      public InternalRow get() {
-        return new GenericInternalRow(new Object[] {current, -current});
-      }
-
-      @Override
-      public void close() throws IOException {
-
-      }
-    };
-  }
-}
-
-class JavaRangeInputPartition implements InputPartition {
-  int start;
-  int end;
-
-  JavaRangeInputPartition(int start, int end) {
-    this.start = start;
-    this.end = end;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index a36b0cf..46b38be 100644
--- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -9,6 +9,6 @@ org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly
 org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly
 org.apache.spark.sql.streaming.sources.FakeReadBothModes
 org.apache.spark.sql.streaming.sources.FakeReadNeitherMode
-org.apache.spark.sql.streaming.sources.FakeWriteSupportProvider
+org.apache.spark.sql.streaming.sources.FakeWrite
 org.apache.spark.sql.streaming.sources.FakeNoWrite
-org.apache.spark.sql.streaming.sources.FakeWriteSupportProviderV1Fallback
+org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index 6185736..7bb2cf5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -43,7 +43,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
 
   test("streaming writer") {
     val sink = new MemorySinkV2
-    val writeSupport = new MemoryStreamingWriteSupport(
+    val writeSupport = new MemoryStreamWriter(
       sink, OutputMode.Append(), new StructType().add("i", "int"))
     writeSupport.commit(0,
       Array(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org