You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/10 04:48:59 UTC

[1/2] spark git commit: [SPARK-24073][SQL] Rename DataReaderFactory to InputPartition.

Repository: spark
Updated Branches:
  refs/heads/master 9341c951e -> 62d01391f


http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index 048d078..80eeffd 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
@@ -42,7 +42,7 @@ public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWi
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+    public List<InputPartition<Row>> planInputPartitions() {
       return java.util.Collections.emptyList();
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 96f55b8..8522a63 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.types.StructType;
 
@@ -41,25 +41,25 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+    public List<InputPartition<Row>> planInputPartitions() {
       return java.util.Arrays.asList(
-        new JavaSimpleDataReaderFactory(0, 5),
-        new JavaSimpleDataReaderFactory(5, 10));
+        new JavaSimpleInputPartition(0, 5),
+        new JavaSimpleInputPartition(5, 10));
     }
   }
 
-  static class JavaSimpleDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
+  static class JavaSimpleInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
     private int start;
     private int end;
 
-    JavaSimpleDataReaderFactory(int start, int end) {
+    JavaSimpleInputPartition(int start, int end) {
       this.start = start;
       this.end = end;
     }
 
     @Override
-    public DataReader<Row> createDataReader() {
-      return new JavaSimpleDataReaderFactory(start - 1, end);
+    public InputPartitionReader<Row> createPartitionReader() {
+      return new JavaSimpleInputPartition(start - 1, end);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
index c3916e0..3ad8e7a 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
@@ -38,20 +38,20 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories() {
+    public List<InputPartition<UnsafeRow>> planUnsafeInputPartitions() {
       return java.util.Arrays.asList(
-        new JavaUnsafeRowDataReaderFactory(0, 5),
-        new JavaUnsafeRowDataReaderFactory(5, 10));
+        new JavaUnsafeRowInputPartition(0, 5),
+        new JavaUnsafeRowInputPartition(5, 10));
     }
   }
 
-  static class JavaUnsafeRowDataReaderFactory
-      implements DataReaderFactory<UnsafeRow>, DataReader<UnsafeRow> {
+  static class JavaUnsafeRowInputPartition
+      implements InputPartition<UnsafeRow>, InputPartitionReader<UnsafeRow> {
     private int start;
     private int end;
     private UnsafeRow row;
 
-    JavaUnsafeRowDataReaderFactory(int start, int end) {
+    JavaUnsafeRowInputPartition(int start, int end) {
       this.start = start;
       this.end = end;
       this.row = new UnsafeRow(2);
@@ -59,8 +59,8 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public DataReader<UnsafeRow> createDataReader() {
-      return new JavaUnsafeRowDataReaderFactory(start - 1, end);
+    public InputPartitionReader<UnsafeRow> createPartitionReader() {
+      return new JavaUnsafeRowInputPartition(start - 1, end);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index ff14ec3..39a010f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -142,9 +142,9 @@ class RateSourceSuite extends StreamTest {
     val startOffset = LongOffset(0L)
     val endOffset = LongOffset(1L)
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 1)
-    val dataReader = tasks.get(0).createDataReader()
+    val dataReader = tasks.get(0).createPartitionReader()
     val data = ArrayBuffer[Row]()
     while (dataReader.next()) {
       data.append(dataReader.get())
@@ -159,11 +159,11 @@ class RateSourceSuite extends StreamTest {
     val startOffset = LongOffset(0L)
     val endOffset = LongOffset(1L)
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 11)
 
     val readData = tasks.asScala
-      .map(_.createDataReader())
+      .map(_.createPartitionReader())
       .flatMap { reader =>
         val buf = scala.collection.mutable.ListBuffer[Row]()
         while (reader.next()) buf.append(reader.get())
@@ -304,7 +304,7 @@ class RateSourceSuite extends StreamTest {
     val reader = new RateStreamContinuousReader(
       new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
     reader.setStartOffset(Optional.empty())
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 2)
 
     val data = scala.collection.mutable.ListBuffer[Row]()
@@ -314,7 +314,7 @@ class RateSourceSuite extends StreamTest {
           .asInstanceOf[RateStreamOffset]
           .partitionToValueAndRunTimeMs(t.partitionIndex)
           .runTimeMs
-        val r = t.createDataReader().asInstanceOf[RateStreamContinuousDataReader]
+        val r = t.createPartitionReader().asInstanceOf[RateStreamContinuousInputPartitionReader]
         for (rowIndex <- 0 to 9) {
           r.next()
           data.append(r.get())

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index e0a5327..505a3f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -346,8 +346,8 @@ class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
-      java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5))
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
+      java.util.Arrays.asList(new SimpleInputPartition(0, 5))
     }
   }
 
@@ -359,20 +359,21 @@ class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
-      java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new SimpleDataReaderFactory(5, 10))
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
+      java.util.Arrays.asList(new SimpleInputPartition(0, 5), new SimpleInputPartition(5, 10))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class SimpleDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[Row]
-  with DataReader[Row] {
+class SimpleInputPartition(start: Int, end: Int)
+  extends InputPartition[Row]
+  with InputPartitionReader[Row] {
   private var current = start - 1
 
-  override def createDataReader(): DataReader[Row] = new SimpleDataReaderFactory(start, end)
+  override def createPartitionReader(): InputPartitionReader[Row] =
+    new SimpleInputPartition(start, end)
 
   override def next(): Boolean = {
     current += 1
@@ -413,21 +414,21 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
       requiredSchema
     }
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       val lowerBound = filters.collect {
         case GreaterThan("i", v: Int) => v
       }.headOption
 
-      val res = new ArrayList[DataReaderFactory[Row]]
+      val res = new ArrayList[InputPartition[Row]]
 
       if (lowerBound.isEmpty) {
-        res.add(new AdvancedDataReaderFactory(0, 5, requiredSchema))
-        res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
+        res.add(new AdvancedInputPartition(0, 5, requiredSchema))
+        res.add(new AdvancedInputPartition(5, 10, requiredSchema))
       } else if (lowerBound.get < 4) {
-        res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 5, requiredSchema))
-        res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
+        res.add(new AdvancedInputPartition(lowerBound.get + 1, 5, requiredSchema))
+        res.add(new AdvancedInputPartition(5, 10, requiredSchema))
       } else if (lowerBound.get < 9) {
-        res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 10, requiredSchema))
+        res.add(new AdvancedInputPartition(lowerBound.get + 1, 10, requiredSchema))
       }
 
       res
@@ -437,13 +438,13 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: StructType)
-  extends DataReaderFactory[Row] with DataReader[Row] {
+class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
+  extends InputPartition[Row] with InputPartitionReader[Row] {
 
   private var current = start - 1
 
-  override def createDataReader(): DataReader[Row] = {
-    new AdvancedDataReaderFactory(start, end, requiredSchema)
+  override def createPartitionReader(): InputPartitionReader[Row] = {
+    new AdvancedInputPartition(start, end, requiredSchema)
   }
 
   override def close(): Unit = {}
@@ -468,24 +469,24 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader with SupportsScanUnsafeRow {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createUnsafeRowReaderFactories(): JList[DataReaderFactory[UnsafeRow]] = {
-      java.util.Arrays.asList(new UnsafeRowDataReaderFactory(0, 5),
-        new UnsafeRowDataReaderFactory(5, 10))
+    override def planUnsafeInputPartitions(): JList[InputPartition[UnsafeRow]] = {
+      java.util.Arrays.asList(new UnsafeRowInputPartitionReader(0, 5),
+        new UnsafeRowInputPartitionReader(5, 10))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class UnsafeRowDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[UnsafeRow] with DataReader[UnsafeRow] {
+class UnsafeRowInputPartitionReader(start: Int, end: Int)
+  extends InputPartition[UnsafeRow] with InputPartitionReader[UnsafeRow] {
 
   private val row = new UnsafeRow(2)
   row.pointTo(new Array[Byte](8 * 3), 8 * 3)
 
   private var current = start - 1
 
-  override def createDataReader(): DataReader[UnsafeRow] = this
+  override def createPartitionReader(): InputPartitionReader[UnsafeRow] = this
 
   override def next(): Boolean = {
     current += 1
@@ -503,7 +504,7 @@ class UnsafeRowDataReaderFactory(start: Int, end: Int)
 class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
 
   class Reader(val readSchema: StructType) extends DataSourceReader {
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] =
+    override def planInputPartitions(): JList[InputPartition[Row]] =
       java.util.Collections.emptyList()
   }
 
@@ -516,16 +517,17 @@ class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader with SupportsScanColumnarBatch {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createBatchDataReaderFactories(): JList[DataReaderFactory[ColumnarBatch]] = {
-      java.util.Arrays.asList(new BatchDataReaderFactory(0, 50), new BatchDataReaderFactory(50, 90))
+    override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = {
+      java.util.Arrays.asList(
+        new BatchInputPartitionReader(0, 50), new BatchInputPartitionReader(50, 90))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class BatchDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[ColumnarBatch] with DataReader[ColumnarBatch] {
+class BatchInputPartitionReader(start: Int, end: Int)
+  extends InputPartition[ColumnarBatch] with InputPartitionReader[ColumnarBatch] {
 
   private final val BATCH_SIZE = 20
   private lazy val i = new OnHeapColumnVector(BATCH_SIZE, IntegerType)
@@ -534,7 +536,7 @@ class BatchDataReaderFactory(start: Int, end: Int)
 
   private var current = start
 
-  override def createDataReader(): DataReader[ColumnarBatch] = this
+  override def createPartitionReader(): InputPartitionReader[ColumnarBatch] = this
 
   override def next(): Boolean = {
     i.reset()
@@ -568,11 +570,11 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader with SupportsReportPartitioning {
     override def readSchema(): StructType = new StructType().add("a", "int").add("b", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       // Note that we don't have same value of column `a` across partitions.
       java.util.Arrays.asList(
-        new SpecificDataReaderFactory(Array(1, 1, 3), Array(4, 4, 6)),
-        new SpecificDataReaderFactory(Array(2, 4, 4), Array(6, 2, 2)))
+        new SpecificInputPartitionReader(Array(1, 1, 3), Array(4, 4, 6)),
+        new SpecificInputPartitionReader(Array(2, 4, 4), Array(6, 2, 2)))
     }
 
     override def outputPartitioning(): Partitioning = new MyPartitioning
@@ -590,14 +592,14 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class SpecificDataReaderFactory(i: Array[Int], j: Array[Int])
-  extends DataReaderFactory[Row]
-  with DataReader[Row] {
+class SpecificInputPartitionReader(i: Array[Int], j: Array[Int])
+  extends InputPartition[Row]
+  with InputPartitionReader[Row] {
   assert(i.length == j.length)
 
   private var current = -1
 
-  override def createDataReader(): DataReader[Row] = this
+  override def createPartitionReader(): InputPartitionReader[Row] = this
 
   override def next(): Boolean = {
     current += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index a5007fa..694bb3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, DataSourceReader}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -45,7 +45,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
   class Reader(path: String, conf: Configuration) extends DataSourceReader {
     override def readSchema(): StructType = schema
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       val dataPath = new Path(path)
       val fs = dataPath.getFileSystem(conf)
       if (fs.exists(dataPath)) {
@@ -54,9 +54,9 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
           name.startsWith("_") || name.startsWith(".")
         }.map { f =>
           val serializableConf = new SerializableConfiguration(conf)
-          new SimpleCSVDataReaderFactory(
+          new SimpleCSVInputPartitionReader(
             f.getPath.toUri.toString,
-            serializableConf): DataReaderFactory[Row]
+            serializableConf): InputPartition[Row]
         }.toList.asJava
       } else {
         Collections.emptyList()
@@ -156,14 +156,14 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
   }
 }
 
-class SimpleCSVDataReaderFactory(path: String, conf: SerializableConfiguration)
-  extends DataReaderFactory[Row] with DataReader[Row] {
+class SimpleCSVInputPartitionReader(path: String, conf: SerializableConfiguration)
+  extends InputPartition[Row] with InputPartitionReader[Row] {
 
   @transient private var lines: Iterator[String] = _
   @transient private var currentLine: String = _
   @transient private var inputStream: FSDataInputStream = _
 
-  override def createDataReader(): DataReader[Row] = {
+  override def createPartitionReader(): InputPartitionReader[Row] = {
     val filePath = new Path(path)
     val fs = filePath.getFileSystem(conf.value)
     inputStream = fs.open(filePath)

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 5798699..dcf6cb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
 import org.apache.spark.sql.types.StructType
@@ -227,10 +227,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
       }
 
       // getBatch should take 100 ms the first time it is called
-      override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+      override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
         synchronized {
           clock.waitTillTime(1350)
-          super.createUnsafeRowReaderFactories()
+          super.planUnsafeInputPartitions()
         }
       }
     }
@@ -290,13 +290,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
 
       AdvanceManualClock(100), // time = 1150 to unblock getEndOffset
       AssertClockTime(1150),
-      AssertStreamExecThreadIsWaitingForTime(1350), // will block on createReadTasks that needs 1350
+      // will block on planInputPartitions that needs 1350
+      AssertStreamExecThreadIsWaitingForTime(1350),
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
-      AdvanceManualClock(200), // time = 1350 to unblock createReadTasks
+      AdvanceManualClock(200), // time = 1350 to unblock planInputPartitions
       AssertClockTime(1350),
       AssertStreamExecThreadIsWaitingForTime(1500), // will block on map task that needs 1500
       AssertOnQuery(_.status.isDataAvailable === true),

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index e755625..f47d3ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -27,8 +27,8 @@ import org.apache.spark.{SparkEnv, SparkFunSuite, TaskContext}
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types.{DataType, IntegerType}
@@ -72,8 +72,8 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
    */
   private def setup(): (BlockingQueue[UnsafeRow], ContinuousQueuedDataReader) = {
     val queue = new ArrayBlockingQueue[UnsafeRow](1024)
-    val factory = new DataReaderFactory[UnsafeRow] {
-      override def createDataReader() = new ContinuousDataReader[UnsafeRow] {
+    val factory = new InputPartition[UnsafeRow] {
+      override def createPartitionReader() = new ContinuousInputPartitionReader[UnsafeRow] {
         var index = -1
         var curr: UnsafeRow = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index af4618b..c1a28b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
@@ -44,7 +44,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
   def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
   def setStartOffset(start: Optional[Offset]): Unit = {}
 
-  def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = {
+  def planInputPartitions(): java.util.ArrayList[InputPartition[Row]] = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-24073][SQL] Rename DataReaderFactory to InputPartition.

Posted by li...@apache.org.
[SPARK-24073][SQL] Rename DataReaderFactory to InputPartition.

## What changes were proposed in this pull request?

Renames:
* `DataReaderFactory` to `InputPartition`
* `DataReader` to `InputPartitionReader`
* `createDataReaderFactories` to `planInputPartitions`
* `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions`
* `createBatchDataReaderFactories` to `planBatchInputPartitions`

This fixes the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.

ReadTask/DataReader function as Iterable/Iterator. One InputPartition is
a specific partition of the data to be read, in contrast to
DataWriterFactory where the same factory instance is used in all write
tasks. InputPartition's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.

## How was this patch tested?

Existing tests, which have been updated to use the new name.

Author: Ryan Blue <bl...@apache.org>

Closes #21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62d01391
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62d01391
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62d01391

Branch: refs/heads/master
Commit: 62d01391fee77eedd75b4e3f475ede8b9f0df0c2
Parents: 9341c95
Author: Ryan Blue <bl...@apache.org>
Authored: Wed May 9 21:48:54 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed May 9 21:48:54 2018 -0700

----------------------------------------------------------------------
 .../sql/kafka010/KafkaContinuousReader.scala    | 20 ++---
 .../sql/kafka010/KafkaMicroBatchReader.scala    | 21 +++---
 .../sql/kafka010/KafkaSourceProvider.scala      |  3 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  2 +-
 .../sql/sources/v2/MicroBatchReadSupport.java   |  2 +-
 .../v2/reader/ContinuousDataReaderFactory.java  | 35 ---------
 .../v2/reader/ContinuousInputPartition.java     | 35 +++++++++
 .../spark/sql/sources/v2/reader/DataReader.java | 53 -------------
 .../sources/v2/reader/DataReaderFactory.java    | 61 ---------------
 .../sql/sources/v2/reader/DataSourceReader.java | 10 +--
 .../sql/sources/v2/reader/InputPartition.java   | 61 +++++++++++++++
 .../sources/v2/reader/InputPartitionReader.java | 53 +++++++++++++
 .../v2/reader/SupportsReportPartitioning.java   |  2 +-
 .../v2/reader/SupportsScanColumnarBatch.java    | 10 +--
 .../v2/reader/SupportsScanUnsafeRow.java        |  8 +-
 .../partitioning/ClusteredDistribution.java     |  4 +-
 .../v2/reader/partitioning/Distribution.java    |  6 +-
 .../v2/reader/partitioning/Partitioning.java    |  4 +-
 .../reader/streaming/ContinuousDataReader.java  | 36 ---------
 .../ContinuousInputPartitionReader.java         | 36 +++++++++
 .../v2/reader/streaming/ContinuousReader.java   | 10 +--
 .../v2/reader/streaming/MicroBatchReader.java   |  2 +-
 .../datasources/v2/DataSourceRDD.scala          | 11 +--
 .../datasources/v2/DataSourceV2ScanExec.scala   | 46 ++++++------
 .../continuous/ContinuousDataSourceRDD.scala    | 22 +++---
 .../continuous/ContinuousQueuedDataReader.scala | 13 ++--
 .../continuous/ContinuousRateStreamSource.scala | 20 ++---
 .../spark/sql/execution/streaming/memory.scala  | 12 +--
 .../sources/ContinuousMemoryStream.scala        | 18 ++---
 .../sources/RateStreamMicroBatchReader.scala    | 15 ++--
 .../execution/streaming/sources/socket.scala    | 29 ++++----
 .../sources/v2/JavaAdvancedDataSourceV2.java    | 22 +++---
 .../sql/sources/v2/JavaBatchDataSourceV2.java   | 12 +--
 .../v2/JavaPartitionAwareDataSource.java        | 12 +--
 .../v2/JavaSchemaRequiredDataSource.java        |  4 +-
 .../sql/sources/v2/JavaSimpleDataSourceV2.java  | 18 ++---
 .../sources/v2/JavaUnsafeRowDataSourceV2.java   | 16 ++--
 .../sources/RateStreamProviderSuite.scala       | 12 +--
 .../sql/sources/v2/DataSourceV2Suite.scala      | 78 ++++++++++----------
 .../sources/v2/SimpleWritableDataSource.scala   | 14 ++--
 .../sql/streaming/StreamingQuerySuite.scala     | 11 +--
 .../ContinuousQueuedDataReaderSuite.scala       |  8 +-
 .../sources/StreamingDataSourceV2Suite.scala    |  4 +-
 43 files changed, 440 insertions(+), 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index f26c134..88abf8a 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -86,7 +86,7 @@ class KafkaContinuousReader(
     KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
 
-  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
     import scala.collection.JavaConverters._
 
     val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
@@ -108,7 +108,7 @@ class KafkaContinuousReader(
       case (topicPartition, start) =>
         KafkaContinuousDataReaderFactory(
           topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
-          .asInstanceOf[DataReaderFactory[UnsafeRow]]
+          .asInstanceOf[InputPartition[UnsafeRow]]
     }.asJava
   }
 
@@ -161,18 +161,18 @@ case class KafkaContinuousDataReaderFactory(
     startOffset: Long,
     kafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] {
+    failOnDataLoss: Boolean) extends ContinuousInputPartition[UnsafeRow] {
 
-  override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = {
+  override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[UnsafeRow] = {
     val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
     require(kafkaOffset.topicPartition == topicPartition,
       s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
-    new KafkaContinuousDataReader(
+    new KafkaContinuousInputPartitionReader(
       topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
   }
 
-  override def createDataReader(): KafkaContinuousDataReader = {
-    new KafkaContinuousDataReader(
+  override def createPartitionReader(): KafkaContinuousInputPartitionReader = {
+    new KafkaContinuousInputPartitionReader(
       topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
   }
 }
@@ -187,12 +187,12 @@ case class KafkaContinuousDataReaderFactory(
  * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
  *                       are skipped.
  */
-class KafkaContinuousDataReader(
+class KafkaContinuousInputPartitionReader(
     topicPartition: TopicPartition,
     startOffset: Long,
     kafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
+    failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[UnsafeRow] {
   private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
   private val converter = new KafkaRecordToUnsafeRowConverter
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index cbe655f..8a37773 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.UninterruptibleThread
@@ -101,7 +101,7 @@ private[kafka010] class KafkaMicroBatchReader(
         }
   }
 
-  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
     // Find the new partitions, and get their earliest offsets
     val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
     val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
@@ -146,7 +146,7 @@ private[kafka010] class KafkaMicroBatchReader(
       new KafkaMicroBatchDataReaderFactory(
         range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
     }
-    factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
+    factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
   }
 
   override def getStartOffset: Offset = {
@@ -299,27 +299,28 @@ private[kafka010] class KafkaMicroBatchReader(
   }
 }
 
-/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
 private[kafka010] case class KafkaMicroBatchDataReaderFactory(
     offsetRange: KafkaOffsetRange,
     executorKafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
+    reuseKafkaConsumer: Boolean) extends InputPartition[UnsafeRow] {
 
   override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray
 
-  override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
-    offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
+  override def createPartitionReader(): InputPartitionReader[UnsafeRow] =
+    new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs,
+      failOnDataLoss, reuseKafkaConsumer)
 }
 
-/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
-private[kafka010] case class KafkaMicroBatchDataReader(
+/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] case class KafkaMicroBatchInputPartitionReader(
     offsetRange: KafkaOffsetRange,
     executorKafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging {
+    reuseKafkaConsumer: Boolean) extends InputPartitionReader[UnsafeRow] with Logging {
 
   private val consumer = KafkaDataConsumer.acquire(
     offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 36b9f04..d225c1e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -149,7 +150,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   }
 
   /**
-   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
+   * Creates a [[ContinuousInputPartitionReader]] to read
    * Kafka data in a continuous streaming query.
    */
   override def createContinuousReader(

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index d2d04b6..871f970 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -678,7 +678,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
           Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
           Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
         )
-        val factories = reader.createUnsafeRowReaderFactories().asScala
+        val factories = reader.planUnsafeInputPartitions().asScala
           .map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
         withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
           assert(factories.size == numPartitionsGenerated)

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
index 209ffa7..7f4a2c9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
@@ -34,7 +34,7 @@ public interface MicroBatchReadSupport extends DataSourceV2 {
    * streaming query.
    *
    * The execution engine will create a micro-batch reader at the start of a streaming query,
-   * alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and
+   * alternate calls to setOffsetRange and planInputPartitions for each batch to process, and
    * then call stop() when the execution is complete. Note that a single query may have multiple
    * executions due to restart or failure recovery.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java
deleted file mode 100644
index a616976..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
-
-/**
- * A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can
- * implement this interface to provide creating {@link DataReader} with particular offset.
- */
-@InterfaceStability.Evolving
-public interface ContinuousDataReaderFactory<T> extends DataReaderFactory<T> {
-  /**
-   * Create a DataReader with particular offset as its startOffset.
-   *
-   * @param offset offset want to set as the DataReader's startOffset.
-   */
-  DataReader<T> createDataReaderWithOffset(PartitionOffset offset);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
new file mode 100644
index 0000000..c24f3b2
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
+
+/**
+ * A mix-in interface for {@link InputPartition}. Continuous input partitions can
+ * implement this interface to provide creating {@link InputPartitionReader} with particular offset.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousInputPartition<T> extends InputPartition<T> {
+  /**
+   * Create a DataReader with particular offset as its startOffset.
+   *
+   * @param offset offset want to set as the DataReader's startOffset.
+   */
+  InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
deleted file mode 100644
index bb9790a..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * A data reader returned by {@link DataReaderFactory#createDataReader()} and is responsible for
- * outputting data for a RDD partition.
- *
- * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
- * source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
- * readers that mix in {@link SupportsScanUnsafeRow}.
- */
-@InterfaceStability.Evolving
-public interface DataReader<T> extends Closeable {
-
-  /**
-   * Proceed to next record, returns false if there is no more records.
-   *
-   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
-   * get retried until hitting the maximum retry times.
-   *
-   * @throws IOException if failure happens during disk/network IO like reading files.
-   */
-  boolean next() throws IOException;
-
-  /**
-   * Return the current record. This method should return same value until `next` is called.
-   *
-   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
-   * get retried until hitting the maximum retry times.
-   */
-  T get();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
deleted file mode 100644
index 32e98e8..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.io.Serializable;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is
- * responsible for creating the actual data reader. The relationship between
- * {@link DataReaderFactory} and {@link DataReader}
- * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
- *
- * Note that, the reader factory will be serialized and sent to executors, then the data reader
- * will be created on executors and do the actual reading. So {@link DataReaderFactory} must be
- * serializable and {@link DataReader} doesn't need to be.
- */
-@InterfaceStability.Evolving
-public interface DataReaderFactory<T> extends Serializable {
-
-  /**
-   * The preferred locations where the data reader returned by this reader factory can run faster,
-   * but Spark does not guarantee to run the data reader on these locations.
-   * The implementations should make sure that it can be run on any location.
-   * The location is a string representing the host name.
-   *
-   * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in
-   * the returned locations. By default this method returns empty string array, which means this
-   * task has no location preference.
-   *
-   * If this method fails (by throwing an exception), the action would fail and no Spark job was
-   * submitted.
-   */
-  default String[] preferredLocations() {
-    return new String[0];
-  }
-
-  /**
-   * Returns a data reader to do the actual reading work.
-   *
-   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
-   * get retried until hitting the maximum retry times.
-   */
-  DataReader<T> createDataReader();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
index a470bcc..f898c29 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
@@ -31,8 +31,8 @@ import org.apache.spark.sql.types.StructType;
  * {@link ReadSupport#createReader(DataSourceOptions)} or
  * {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
  * It can mix in various query optimization interfaces to speed up the data scan. The actual scan
- * logic is delegated to {@link DataReaderFactory}s that are returned by
- * {@link #createDataReaderFactories()}.
+ * logic is delegated to {@link InputPartition}s that are returned by
+ * {@link #planInputPartitions()}.
  *
  * There are mainly 3 kinds of query optimizations:
  *   1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
@@ -65,8 +65,8 @@ public interface DataSourceReader {
   StructType readSchema();
 
   /**
-   * Returns a list of reader factories. Each factory is responsible for creating a data reader to
-   * output data for one RDD partition. That means the number of factories returned here is same as
+   * Returns a list of read tasks. Each task is responsible for creating a data reader to
+   * output data for one RDD partition. That means the number of tasks returned here is same as
    * the number of RDD partitions this scan outputs.
    *
    * Note that, this may not be a full scan if the data source reader mixes in other optimization
@@ -76,5 +76,5 @@ public interface DataSourceReader {
    * If this method fails (by throwing an exception), the action would fail and no Spark job was
    * submitted.
    */
-  List<DataReaderFactory<Row>> createDataReaderFactories();
+  List<InputPartition<Row>> planInputPartitions();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
new file mode 100644
index 0000000..c581e3b
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An input partition returned by {@link DataSourceReader#planInputPartitions()} and is
+ * responsible for creating the actual data reader. The relationship between
+ * {@link InputPartition} and {@link InputPartitionReader}
+ * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
+ *
+ * Note that input partitions will be serialized and sent to executors, then the partition reader
+ * will be created on executors and do the actual reading. So {@link InputPartition} must be
+ * serializable and {@link InputPartitionReader} doesn't need to be.
+ */
+@InterfaceStability.Evolving
+public interface InputPartition<T> extends Serializable {
+
+  /**
+   * The preferred locations where the data reader returned by this partition can run faster,
+   * but Spark does not guarantee to run the data reader on these locations.
+   * The implementations should make sure that it can be run on any location.
+   * The location is a string representing the host name.
+   *
+   * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in
+   * the returned locations. By default this method returns empty string array, which means this
+   * task has no location preference.
+   *
+   * If this method fails (by throwing an exception), the action would fail and no Spark job was
+   * submitted.
+   */
+  default String[] preferredLocations() {
+    return new String[0];
+  }
+
+  /**
+   * Returns a data reader to do the actual reading work.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  InputPartitionReader<T> createPartitionReader();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
new file mode 100644
index 0000000..1b7051f
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
+ * outputting data for a RDD partition.
+ *
+ * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
+ * source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
+ * readers that mix in {@link SupportsScanUnsafeRow}.
+ */
+@InterfaceStability.Evolving
+public interface InputPartitionReader<T> extends Closeable {
+
+  /**
+   * Proceed to next record, returns false if there is no more records.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
+   * get retried until hitting the maximum retry times.
+   *
+   * @throws IOException if failure happens during disk/network IO like reading files.
+   */
+  boolean next() throws IOException;
+
+  /**
+   * Return the current record. This method should return same value until `next` is called.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  T get();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index 6076287..6b60da7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
  * A mix in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to report data partitioning and try to avoid shuffle at Spark side.
  *
- * Note that, when the reader creates exactly one {@link DataReaderFactory}, Spark may avoid
+ * Note that, when the reader creates exactly one {@link InputPartition}, Spark may avoid
  * adding a shuffle even if the reader does not implement this interface.
  */
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
index 2e5cfa7..0faf81d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
@@ -30,22 +30,22 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 @InterfaceStability.Evolving
 public interface SupportsScanColumnarBatch extends DataSourceReader {
   @Override
-  default List<DataReaderFactory<Row>> createDataReaderFactories() {
+  default List<InputPartition<Row>> planInputPartitions() {
     throw new IllegalStateException(
-      "createDataReaderFactories not supported by default within SupportsScanColumnarBatch.");
+      "planInputPartitions not supported by default within SupportsScanColumnarBatch.");
   }
 
   /**
-   * Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data
+   * Similar to {@link DataSourceReader#planInputPartitions()}, but returns columnar data
    * in batches.
    */
-  List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories();
+  List<InputPartition<ColumnarBatch>> planBatchInputPartitions();
 
   /**
    * Returns true if the concrete data source reader can read data in batch according to the scan
    * properties like required columns, pushes filters, etc. It's possible that the implementation
    * can only support some certain columns with certain types. Users can overwrite this method and
-   * {@link #createDataReaderFactories()} to fallback to normal read path under some conditions.
+   * {@link #planInputPartitions()} to fallback to normal read path under some conditions.
    */
   default boolean enableBatchRead() {
     return true;

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
index 9cd749e..f2220f6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
@@ -33,14 +33,14 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 public interface SupportsScanUnsafeRow extends DataSourceReader {
 
   @Override
-  default List<DataReaderFactory<Row>> createDataReaderFactories() {
+  default List<InputPartition<Row>> planInputPartitions() {
     throw new IllegalStateException(
-      "createDataReaderFactories not supported by default within SupportsScanUnsafeRow");
+      "planInputPartitions not supported by default within SupportsScanUnsafeRow");
   }
 
   /**
-   * Similar to {@link DataSourceReader#createDataReaderFactories()},
+   * Similar to {@link DataSourceReader#planInputPartitions()},
    * but returns data in unsafe row format.
    */
-  List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();
+  List<InputPartition<UnsafeRow>> planUnsafeInputPartitions();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
index 2d0ee50..38ca5fc 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
@@ -18,12 +18,12 @@
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 /**
  * A concrete implementation of {@link Distribution}. Represents a distribution where records that
  * share the same values for the {@link #clusteredColumns} will be produced by the same
- * {@link DataReader}.
+ * {@link InputPartitionReader}.
  */
 @InterfaceStability.Evolving
 public class ClusteredDistribution implements Distribution {

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
index f6b111f..d2ee951 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
@@ -18,13 +18,13 @@
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 /**
  * An interface to represent data distribution requirement, which specifies how the records should
- * be distributed among the data partitions(one {@link DataReader} outputs data for one partition).
+ * be distributed among the data partitions(one {@link InputPartitionReader} outputs data for one partition).
  * Note that this interface has nothing to do with the data ordering inside one
- * partition(the output records of a single {@link DataReader}).
+ * partition(the output records of a single {@link InputPartitionReader}).
  *
  * The instance of this interface is created and provided by Spark, then consumed by
  * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
index 309d9e5..f460f6b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
 
 /**
@@ -31,7 +31,7 @@ import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
 public interface Partitioning {
 
   /**
-   * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs.
+   * Returns the number of partitions(i.e., {@link InputPartition}s) the data source outputs.
    */
   int numPartitions();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
deleted file mode 100644
index 47d2644..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
-
-/**
- * A variation on {@link DataReader} for use with streaming in continuous processing mode.
- */
-@InterfaceStability.Evolving
-public interface ContinuousDataReader<T> extends DataReader<T> {
-    /**
-     * Get the offset of the current record, or the start offset if no records have been read.
-     *
-     * The execution engine will call this method along with get() to keep track of the current
-     * offset. When an epoch ends, the offset of the previous record in each partition will be saved
-     * as a restart checkpoint.
-     */
-    PartitionOffset getOffset();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
new file mode 100644
index 0000000..7b0ba0b
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+
+/**
+ * A variation on {@link InputPartitionReader} for use with streaming in continuous processing mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousInputPartitionReader<T> extends InputPartitionReader<T> {
+    /**
+     * Get the offset of the current record, or the start offset if no records have been read.
+     *
+     * The execution engine will call this method along with get() to keep track of the current
+     * offset. When an epoch ends, the offset of the previous record in each partition will be saved
+     * as a restart checkpoint.
+     */
+    PartitionOffset getOffset();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
index 7fe7f00..716c5c0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
@@ -27,7 +27,7 @@ import java.util.Optional;
  * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to allow reading in a continuous processing mode stream.
  *
- * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}.
+ * Implementations must ensure each partition reader is a {@link ContinuousInputPartitionReader}.
  *
  * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
  * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
@@ -35,7 +35,7 @@ import java.util.Optional;
 @InterfaceStability.Evolving
 public interface ContinuousReader extends BaseStreamingSource, DataSourceReader {
     /**
-     * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
+     * Merge partitioned offsets coming from {@link ContinuousInputPartitionReader} instances for each
      * partition to a single global offset.
      */
     Offset mergeOffsets(PartitionOffset[] offsets);
@@ -47,7 +47,7 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
     Offset deserializeOffset(String json);
 
     /**
-     * Set the desired start offset for reader factories created from this reader. The scan will
+     * Set the desired start offset for partitions created from this reader. The scan will
      * start from the first record after the provided offset, or from an implementation-defined
      * inferred starting point if no offset is provided.
      */
@@ -61,8 +61,8 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
     Offset getStartOffset();
 
     /**
-     * The execution engine will call this method in every epoch to determine if new reader
-     * factories need to be generated, which may be required if for example the underlying
+     * The execution engine will call this method in every epoch to determine if new input
+     * partitions need to be generated, which may be required if for example the underlying
      * source system has had partitions added or removed.
      *
      * If true, the query will be shut down and restarted with a new reader.

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
index 67ebde3..0159c73 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
@@ -33,7 +33,7 @@ import java.util.Optional;
 @InterfaceStability.Evolving
 public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource {
     /**
-     * Set the desired offset range for reader factories created from this reader. Reader factories
+     * Set the desired offset range for input partitions created from this reader. Partition readers
      * will generate only data within (`start`, `end`]; that is, from the first record after `start`
      * to the record with offset `end`.
      *

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index f85971b..1a6b324 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -22,14 +22,14 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 
-class DataSourceRDDPartition[T : ClassTag](val index: Int, val readerFactory: DataReaderFactory[T])
+class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: InputPartition[T])
   extends Partition with Serializable
 
 class DataSourceRDD[T: ClassTag](
     sc: SparkContext,
-    @transient private val readerFactories: Seq[DataReaderFactory[T]])
+    @transient private val readerFactories: Seq[InputPartition[T]])
   extends RDD[T](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
@@ -39,7 +39,8 @@ class DataSourceRDD[T: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    val reader = split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.createDataReader()
+    val reader = split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition
+        .createPartitionReader()
     context.addTaskCompletionListener(_ => reader.close())
     val iter = new Iterator[T] {
       private[this] var valuePrepared = false
@@ -63,6 +64,6 @@ class DataSourceRDD[T: ClassTag](
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.preferredLocations()
+    split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 77cb707..c6a7684 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -59,13 +59,13 @@ case class DataSourceV2ScanExec(
   }
 
   override def outputPartitioning: physical.Partitioning = reader match {
-    case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchReaderFactories.size == 1 =>
+    case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 =>
       SinglePartition
 
-    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && readerFactories.size == 1 =>
+    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 =>
       SinglePartition
 
-    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && readerFactories.size == 1 =>
+    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 =>
       SinglePartition
 
     case s: SupportsReportPartitioning =>
@@ -75,19 +75,19 @@ case class DataSourceV2ScanExec(
     case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: Seq[DataReaderFactory[UnsafeRow]] = reader match {
-    case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories().asScala
+  private lazy val partitions: Seq[InputPartition[UnsafeRow]] = reader match {
+    case r: SupportsScanUnsafeRow => r.planUnsafeInputPartitions().asScala
     case _ =>
-      reader.createDataReaderFactories().asScala.map {
-        new RowToUnsafeRowDataReaderFactory(_, reader.readSchema()): DataReaderFactory[UnsafeRow]
+      reader.planInputPartitions().asScala.map {
+        new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[UnsafeRow]
       }
   }
 
-  private lazy val batchReaderFactories: Seq[DataReaderFactory[ColumnarBatch]] = reader match {
+  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match {
     case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
       assert(!reader.isInstanceOf[ContinuousReader],
         "continuous stream reader does not support columnar read yet.")
-      r.createBatchDataReaderFactories().asScala
+      r.planBatchInputPartitions().asScala
   }
 
   private lazy val inputRDD: RDD[InternalRow] = reader match {
@@ -95,19 +95,18 @@ case class DataSourceV2ScanExec(
       EpochCoordinatorRef.get(
           sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
           sparkContext.env)
-        .askSync[Unit](SetReaderPartitions(readerFactories.size))
+        .askSync[Unit](SetReaderPartitions(partitions.size))
       new ContinuousDataSourceRDD(
         sparkContext,
         sqlContext.conf.continuousStreamingExecutorQueueSize,
         sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
-        readerFactories)
-        .asInstanceOf[RDD[InternalRow]]
+        partitions).asInstanceOf[RDD[InternalRow]]
 
     case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-      new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]]
+      new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]]
 
     case _ =>
-      new DataSourceRDD(sparkContext, readerFactories).asInstanceOf[RDD[InternalRow]]
+      new DataSourceRDD(sparkContext, partitions).asInstanceOf[RDD[InternalRow]]
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
@@ -132,19 +131,22 @@ case class DataSourceV2ScanExec(
   }
 }
 
-class RowToUnsafeRowDataReaderFactory(rowReaderFactory: DataReaderFactory[Row], schema: StructType)
-  extends DataReaderFactory[UnsafeRow] {
+class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType)
+  extends InputPartition[UnsafeRow] {
 
-  override def preferredLocations: Array[String] = rowReaderFactory.preferredLocations
+  override def preferredLocations: Array[String] = partition.preferredLocations
 
-  override def createDataReader: DataReader[UnsafeRow] = {
-    new RowToUnsafeDataReader(
-      rowReaderFactory.createDataReader, RowEncoder.apply(schema).resolveAndBind())
+  override def createPartitionReader: InputPartitionReader[UnsafeRow] = {
+    new RowToUnsafeInputPartitionReader(
+      partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind())
   }
 }
 
-class RowToUnsafeDataReader(val rowReader: DataReader[Row], encoder: ExpressionEncoder[Row])
-  extends DataReader[UnsafeRow] {
+class RowToUnsafeInputPartitionReader(
+    val rowReader: InputPartitionReader[Row],
+    encoder: ExpressionEncoder[Row])
+
+  extends InputPartitionReader[UnsafeRow] {
 
   override def next: Boolean = rowReader.next
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
index 0a3b9dc..a7ccce1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
@@ -21,14 +21,14 @@ import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeInputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, PartitionOffset}
 import org.apache.spark.util.{NextIterator, ThreadUtils}
 
 class ContinuousDataSourceRDDPartition(
     val index: Int,
-    val readerFactory: DataReaderFactory[UnsafeRow])
+    val inputPartition: InputPartition[UnsafeRow])
   extends Partition with Serializable {
 
   // This is semantically a lazy val - it's initialized once the first time a call to
@@ -51,12 +51,12 @@ class ContinuousDataSourceRDD(
     sc: SparkContext,
     dataQueueSize: Int,
     epochPollIntervalMs: Long,
-    @transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]])
+    @transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
   extends RDD[UnsafeRow](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
     readerFactories.zipWithIndex.map {
-      case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory)
+      case (inputPartition, index) => new ContinuousDataSourceRDDPartition(index, inputPartition)
     }.toArray
   }
 
@@ -75,7 +75,7 @@ class ContinuousDataSourceRDD(
       if (partition.queueReader == null) {
         partition.queueReader =
           new ContinuousQueuedDataReader(
-            partition.readerFactory, context, dataQueueSize, epochPollIntervalMs)
+            partition.inputPartition, context, dataQueueSize, epochPollIntervalMs)
       }
 
       partition.queueReader
@@ -96,17 +96,17 @@ class ContinuousDataSourceRDD(
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    split.asInstanceOf[ContinuousDataSourceRDDPartition].readerFactory.preferredLocations()
+    split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
   }
 }
 
 object ContinuousDataSourceRDD {
   private[continuous] def getContinuousReader(
-      reader: DataReader[UnsafeRow]): ContinuousDataReader[_] = {
+      reader: InputPartitionReader[UnsafeRow]): ContinuousInputPartitionReader[_] = {
     reader match {
-      case r: ContinuousDataReader[UnsafeRow] => r
-      case wrapped: RowToUnsafeDataReader =>
-        wrapped.rowReader.asInstanceOf[ContinuousDataReader[Row]]
+      case r: ContinuousInputPartitionReader[UnsafeRow] => r
+      case wrapped: RowToUnsafeInputPartitionReader =>
+        wrapped.rowReader.asInstanceOf[ContinuousInputPartitionReader[Row]]
       case _ =>
         throw new IllegalStateException(s"Unknown continuous reader type ${reader.getClass}")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
index 01a999f..d864557 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
@@ -18,15 +18,14 @@
 package org.apache.spark.sql.execution.streaming.continuous
 
 import java.io.Closeable
-import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext}
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
 import org.apache.spark.util.ThreadUtils
 
@@ -38,11 +37,11 @@ import org.apache.spark.util.ThreadUtils
  * offsets across epochs. Each compute() should call the next() method here until null is returned.
  */
 class ContinuousQueuedDataReader(
-    factory: DataReaderFactory[UnsafeRow],
+    partition: InputPartition[UnsafeRow],
     context: TaskContext,
     dataQueueSize: Int,
     epochPollIntervalMs: Long) extends Closeable {
-  private val reader = factory.createDataReader()
+  private val reader = partition.createPartitionReader()
 
   // Important sequencing - we must get our starting point before the provider threads start running
   private var currentOffset: PartitionOffset =
@@ -132,7 +131,7 @@ class ContinuousQueuedDataReader(
 
   /**
    * The data component of [[ContinuousQueuedDataReader]]. Pushes (row, offset) to the queue when
-   * a new row arrives to the [[DataReader]].
+   * a new row arrives to the [[InputPartitionReader]].
    */
   class DataReaderThread extends Thread(
       s"continuous-reader--${context.partitionId()}--" +

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 2f0de26..8d25d9c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeM
 import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
 
 case class RateStreamPartitionOffset(
@@ -67,7 +67,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
 
   override def getStartOffset(): Offset = offset
 
-  override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): java.util.List[InputPartition[Row]] = {
     val partitionStartMap = offset match {
       case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
       case off =>
@@ -91,7 +91,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
         i,
         numPartitions,
         perPartitionRate)
-        .asInstanceOf[DataReaderFactory[Row]]
+        .asInstanceOf[InputPartition[Row]]
     }.asJava
   }
 
@@ -119,13 +119,13 @@ case class RateStreamContinuousDataReaderFactory(
     partitionIndex: Int,
     increment: Long,
     rowsPerSecond: Double)
-  extends ContinuousDataReaderFactory[Row] {
+  extends ContinuousInputPartition[Row] {
 
-  override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[Row] = {
+  override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[Row] = {
     val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset]
     require(rateStreamOffset.partition == partitionIndex,
       s"Expected partitionIndex: $partitionIndex, but got: ${rateStreamOffset.partition}")
-    new RateStreamContinuousDataReader(
+    new RateStreamContinuousInputPartitionReader(
       rateStreamOffset.currentValue,
       rateStreamOffset.currentTimeMs,
       partitionIndex,
@@ -133,18 +133,18 @@ case class RateStreamContinuousDataReaderFactory(
       rowsPerSecond)
   }
 
-  override def createDataReader(): DataReader[Row] =
-    new RateStreamContinuousDataReader(
+  override def createPartitionReader(): InputPartitionReader[Row] =
+    new RateStreamContinuousInputPartitionReader(
       startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
 }
 
-class RateStreamContinuousDataReader(
+class RateStreamContinuousInputPartitionReader(
     startValue: Long,
     startTimeMs: Long,
     partitionIndex: Int,
     increment: Long,
     rowsPerSecond: Double)
-  extends ContinuousDataReader[Row] {
+  extends ContinuousInputPartitionReader[Row] {
   private var nextReadTime: Long = startTimeMs
   private val readTimeIncrement: Long = (1000 / rowsPerSecond).toLong
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 6720cdd..daa2963 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -139,7 +139,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     if (endOffset.offset == -1) null else endOffset
   }
 
-  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
     synchronized {
       // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
       val startOrdinal = startOffset.offset.toInt + 1
@@ -156,7 +156,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))
 
       newBlocks.map { block =>
-        new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]]
+        new MemoryStreamDataReaderFactory(block).asInstanceOf[InputPartition[UnsafeRow]]
       }.asJava
     }
   }
@@ -202,9 +202,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
 
 
 class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])
-  extends DataReaderFactory[UnsafeRow] {
-  override def createDataReader(): DataReader[UnsafeRow] = {
-    new DataReader[UnsafeRow] {
+  extends InputPartition[UnsafeRow] {
+  override def createPartitionReader(): InputPartitionReader[UnsafeRow] = {
+    new InputPartitionReader[UnsafeRow] {
       private var currentIndex = -1
 
       override def next(): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index a8fca3c..fef792e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -34,8 +34,8 @@ import org.apache.spark.sql.{Encoder, Row, SQLContext}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream.GetRecord
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions}
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.RpcUtils
 
@@ -99,7 +99,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     )
   }
 
-  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): ju.List[InputPartition[Row]] = {
     synchronized {
       val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
       endpointRef =
@@ -108,7 +108,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       startOffset.partitionNums.map {
         case (part, index) =>
           new ContinuousMemoryStreamDataReaderFactory(
-            endpointName, part, index): DataReaderFactory[Row]
+            endpointName, part, index): InputPartition[Row]
       }.toList.asJava
     }
   }
@@ -160,9 +160,9 @@ object ContinuousMemoryStream {
 class ContinuousMemoryStreamDataReaderFactory(
     driverEndpointName: String,
     partition: Int,
-    startOffset: Int) extends DataReaderFactory[Row] {
-  override def createDataReader: ContinuousMemoryStreamDataReader =
-    new ContinuousMemoryStreamDataReader(driverEndpointName, partition, startOffset)
+    startOffset: Int) extends InputPartition[Row] {
+  override def createPartitionReader: ContinuousMemoryStreamInputPartitionReader =
+    new ContinuousMemoryStreamInputPartitionReader(driverEndpointName, partition, startOffset)
 }
 
 /**
@@ -170,10 +170,10 @@ class ContinuousMemoryStreamDataReaderFactory(
  *
  * Polls the driver endpoint for new records.
  */
-class ContinuousMemoryStreamDataReader(
+class ContinuousMemoryStreamInputPartitionReader(
     driverEndpointName: String,
     partition: Int,
-    startOffset: Int) extends ContinuousDataReader[Row] {
+    startOffset: Int) extends ContinuousInputPartitionReader[Row] {
   private val endpoint = RpcUtils.makeDriverRef(
     driverEndpointName,
     SparkEnv.get.conf,

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
index f54291b..723cc3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
@@ -134,7 +134,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
     LongOffset(json.toLong)
   }
 
-  override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): java.util.List[InputPartition[Row]] = {
     val startSeconds = LongOffset.convert(start).map(_.offset).getOrElse(0L)
     val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L)
     assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
@@ -169,7 +169,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
     (0 until numPartitions).map { p =>
       new RateStreamMicroBatchDataReaderFactory(
         p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
-        : DataReaderFactory[Row]
+        : InputPartition[Row]
     }.toList.asJava
   }
 
@@ -188,19 +188,20 @@ class RateStreamMicroBatchDataReaderFactory(
     rangeStart: Long,
     rangeEnd: Long,
     localStartTimeMs: Long,
-    relativeMsPerValue: Double) extends DataReaderFactory[Row] {
+    relativeMsPerValue: Double) extends InputPartition[Row] {
 
-  override def createDataReader(): DataReader[Row] = new RateStreamMicroBatchDataReader(
-    partitionId, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
+  override def createPartitionReader(): InputPartitionReader[Row] =
+    new RateStreamMicroBatchInputPartitionReader(partitionId, numPartitions, rangeStart, rangeEnd,
+      localStartTimeMs, relativeMsPerValue)
 }
 
-class RateStreamMicroBatchDataReader(
+class RateStreamMicroBatchInputPartitionReader(
     partitionId: Int,
     numPartitions: Int,
     rangeStart: Long,
     rangeEnd: Long,
     localStartTimeMs: Long,
-    relativeMsPerValue: Double) extends DataReader[Row] {
+    relativeMsPerValue: Double) extends InputPartitionReader[Row] {
   private var count = 0
 
   override def next(): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
index 90f4a5b..8240e06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming.LongOffset
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport}
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
@@ -140,7 +140,7 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR
     }
   }
 
-  override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): JList[InputPartition[Row]] = {
     assert(startOffset != null && endOffset != null,
       "start offset and end offset should already be set before create read tasks.")
 
@@ -165,21 +165,22 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR
 
     (0 until numPartitions).map { i =>
       val slice = slices(i)
-      new DataReaderFactory[Row] {
-        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
-          private var currentIdx = -1
+      new InputPartition[Row] {
+        override def createPartitionReader(): InputPartitionReader[Row] =
+          new InputPartitionReader[Row] {
+            private var currentIdx = -1
+
+            override def next(): Boolean = {
+              currentIdx += 1
+              currentIdx < slice.size
+            }
 
-          override def next(): Boolean = {
-            currentIdx += 1
-            currentIdx < slice.size
-          }
+            override def get(): Row = {
+              Row(slice(currentIdx)._1, slice(currentIdx)._2)
+            }
 
-          override def get(): Row = {
-            Row(slice(currentIdx)._1, slice(currentIdx)._2)
+            override def close(): Unit = {}
           }
-
-          override def close(): Unit = {}
-        }
       }
     }.toList.asJava
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index 172e5d5..714638e 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -79,8 +79,8 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
-      List<DataReaderFactory<Row>> res = new ArrayList<>();
+    public List<InputPartition<Row>> planInputPartitions() {
+      List<InputPartition<Row>> res = new ArrayList<>();
 
       Integer lowerBound = null;
       for (Filter filter : filters) {
@@ -94,33 +94,33 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
       }
 
       if (lowerBound == null) {
-        res.add(new JavaAdvancedDataReaderFactory(0, 5, requiredSchema));
-        res.add(new JavaAdvancedDataReaderFactory(5, 10, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(0, 5, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(5, 10, requiredSchema));
       } else if (lowerBound < 4) {
-        res.add(new JavaAdvancedDataReaderFactory(lowerBound + 1, 5, requiredSchema));
-        res.add(new JavaAdvancedDataReaderFactory(5, 10, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(lowerBound + 1, 5, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(5, 10, requiredSchema));
       } else if (lowerBound < 9) {
-        res.add(new JavaAdvancedDataReaderFactory(lowerBound + 1, 10, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(lowerBound + 1, 10, requiredSchema));
       }
 
       return res;
     }
   }
 
-  static class JavaAdvancedDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
+  static class JavaAdvancedInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
     private int start;
     private int end;
     private StructType requiredSchema;
 
-    JavaAdvancedDataReaderFactory(int start, int end, StructType requiredSchema) {
+    JavaAdvancedInputPartition(int start, int end, StructType requiredSchema) {
       this.start = start;
       this.end = end;
       this.requiredSchema = requiredSchema;
     }
 
     @Override
-    public DataReader<Row> createDataReader() {
-      return new JavaAdvancedDataReaderFactory(start - 1, end, requiredSchema);
+    public InputPartitionReader<Row> createPartitionReader() {
+      return new JavaAdvancedInputPartition(start - 1, end, requiredSchema);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
index c550937..97d6176 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
@@ -42,14 +42,14 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
+    public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
       return java.util.Arrays.asList(
-               new JavaBatchDataReaderFactory(0, 50), new JavaBatchDataReaderFactory(50, 90));
+               new JavaBatchInputPartition(0, 50), new JavaBatchInputPartition(50, 90));
     }
   }
 
-  static class JavaBatchDataReaderFactory
-      implements DataReaderFactory<ColumnarBatch>, DataReader<ColumnarBatch> {
+  static class JavaBatchInputPartition
+      implements InputPartition<ColumnarBatch>, InputPartitionReader<ColumnarBatch> {
     private int start;
     private int end;
 
@@ -59,13 +59,13 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
     private OnHeapColumnVector j;
     private ColumnarBatch batch;
 
-    JavaBatchDataReaderFactory(int start, int end) {
+    JavaBatchInputPartition(int start, int end) {
       this.start = start;
       this.end = end;
     }
 
     @Override
-    public DataReader<ColumnarBatch> createDataReader() {
+    public InputPartitionReader<ColumnarBatch> createPartitionReader() {
       this.i = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
       this.j = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
       ColumnVector[] vectors = new ColumnVector[2];

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index 32fad59..e49c8cf 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -43,10 +43,10 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+    public List<InputPartition<Row>> planInputPartitions() {
       return java.util.Arrays.asList(
-        new SpecificDataReaderFactory(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
-        new SpecificDataReaderFactory(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
+        new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
+        new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
     }
 
     @Override
@@ -73,12 +73,12 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
   }
 
-  static class SpecificDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
+  static class SpecificInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
     private int[] i;
     private int[] j;
     private int current = -1;
 
-    SpecificDataReaderFactory(int[] i, int[] j) {
+    SpecificInputPartition(int[] i, int[] j) {
       assert i.length == j.length;
       this.i = i;
       this.j = j;
@@ -101,7 +101,7 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public DataReader<Row> createDataReader() {
+    public InputPartitionReader<Row> createPartitionReader() {
       return this;
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org