You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/24 17:46:41 UTC

spark git commit: [SPARK-23325] Use InternalRow when reading with DataSourceV2.

Repository: spark
Updated Branches:
  refs/heads/master 3d5c61e5f -> 9d27541a8


[SPARK-23325] Use InternalRow when reading with DataSourceV2.

## What changes were proposed in this pull request?

This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

## How was this patch tested?

This uses existing tests.

Author: Ryan Blue <bl...@apache.org>

Closes #21118 from rdblue/SPARK-23325-datasource-v2-internal-row.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d27541a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d27541a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d27541a

Branch: refs/heads/master
Commit: 9d27541a856d95635386cbc98f2bb1f1f2f30c13
Parents: 3d5c61e
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Jul 24 10:46:36 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Tue Jul 24 10:46:36 2018 -0700

----------------------------------------------------------------------
 .../sql/kafka010/KafkaContinuousReader.scala    | 16 ++++---
 .../sql/kafka010/KafkaMicroBatchReader.scala    | 21 ++++-----
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  2 +-
 .../sql/sources/v2/reader/DataSourceReader.java |  6 +--
 .../sources/v2/reader/InputPartitionReader.java |  7 +--
 .../v2/reader/SupportsDeprecatedScanRow.java    | 39 +++++++++++++++++
 .../v2/reader/SupportsScanColumnarBatch.java    |  4 +-
 .../v2/reader/SupportsScanUnsafeRow.java        | 46 --------------------
 .../datasources/v2/DataSourceRDD.scala          |  1 -
 .../datasources/v2/DataSourceV2ScanExec.scala   | 17 ++++----
 .../datasources/v2/DataSourceV2Strategy.scala   | 13 +++---
 .../continuous/ContinuousDataSourceRDD.scala    | 26 +++++------
 .../continuous/ContinuousQueuedDataReader.scala |  8 ++--
 .../continuous/ContinuousRateStreamSource.scala |  4 +-
 .../spark/sql/execution/streaming/memory.scala  | 16 +++----
 .../sources/ContinuousMemoryStream.scala        |  7 +--
 .../sources/RateStreamMicroBatchReader.scala    |  4 +-
 .../execution/streaming/sources/socket.scala    |  7 +--
 .../sources/v2/JavaAdvancedDataSourceV2.java    |  4 +-
 .../v2/JavaPartitionAwareDataSource.java        |  4 +-
 .../v2/JavaSchemaRequiredDataSource.java        |  5 ++-
 .../sql/sources/v2/JavaSimpleDataSourceV2.java  |  5 ++-
 .../sources/v2/JavaUnsafeRowDataSourceV2.java   |  9 ++--
 .../sources/RateStreamProviderSuite.scala       |  6 +--
 .../sql/sources/v2/DataSourceV2Suite.scala      | 30 +++++++------
 .../sources/v2/SimpleWritableDataSource.scala   |  7 +--
 .../sql/streaming/StreamingQuerySuite.scala     |  6 +--
 .../ContinuousQueuedDataReaderSuite.scala       |  6 +--
 .../sources/StreamingDataSourceV2Suite.scala    |  7 +--
 29 files changed, 168 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index badaa69..48b91df 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.reader._
@@ -53,7 +54,7 @@ class KafkaContinuousReader(
     metadataPath: String,
     initialOffsets: KafkaOffsetRangeLimit,
     failOnDataLoss: Boolean)
-  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+  extends ContinuousReader with Logging {
 
   private lazy val session = SparkSession.getActiveSession.get
   private lazy val sc = session.sparkContext
@@ -86,7 +87,7 @@ class KafkaContinuousReader(
     KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
 
-  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
+  override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
     import scala.collection.JavaConverters._
 
     val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
@@ -107,8 +108,8 @@ class KafkaContinuousReader(
     startOffsets.toSeq.map {
       case (topicPartition, start) =>
         KafkaContinuousInputPartition(
-          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
-          .asInstanceOf[InputPartition[UnsafeRow]]
+          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss
+        ): InputPartition[InternalRow]
     }.asJava
   }
 
@@ -161,9 +162,10 @@ case class KafkaContinuousInputPartition(
     startOffset: Long,
     kafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ContinuousInputPartition[UnsafeRow] {
+    failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow] {
 
-  override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[UnsafeRow] = {
+  override def createContinuousReader(
+      offset: PartitionOffset): InputPartitionReader[InternalRow] = {
     val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
     require(kafkaOffset.topicPartition == topicPartition,
       s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
@@ -192,7 +194,7 @@ class KafkaContinuousInputPartitionReader(
     startOffset: Long,
     kafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[UnsafeRow] {
+    failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[InternalRow] {
   private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
   private val converter = new KafkaRecordToUnsafeRowConverter
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 737da2e..6c95b2b 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -29,11 +29,12 @@ import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.UninterruptibleThread
@@ -61,7 +62,7 @@ private[kafka010] class KafkaMicroBatchReader(
     metadataPath: String,
     startingOffsets: KafkaOffsetRangeLimit,
     failOnDataLoss: Boolean)
-  extends MicroBatchReader with SupportsScanUnsafeRow with Logging {
+  extends MicroBatchReader with Logging {
 
   private var startPartitionOffsets: PartitionOffsetMap = _
   private var endPartitionOffsets: PartitionOffsetMap = _
@@ -101,7 +102,7 @@ private[kafka010] class KafkaMicroBatchReader(
         }
   }
 
-  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
+  override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
     // Find the new partitions, and get their earliest offsets
     val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
     val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
@@ -142,11 +143,11 @@ private[kafka010] class KafkaMicroBatchReader(
     val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size
 
     // Generate factories based on the offset ranges
-    val factories = offsetRanges.map { range =>
+    offsetRanges.map { range =>
       new KafkaMicroBatchInputPartition(
-        range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
-    }
-    factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
+        range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer
+      ): InputPartition[InternalRow]
+    }.asJava
   }
 
   override def getStartOffset: Offset = {
@@ -305,11 +306,11 @@ private[kafka010] case class KafkaMicroBatchInputPartition(
     executorKafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends InputPartition[UnsafeRow] {
+    reuseKafkaConsumer: Boolean) extends InputPartition[InternalRow] {
 
   override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray
 
-  override def createPartitionReader(): InputPartitionReader[UnsafeRow] =
+  override def createPartitionReader(): InputPartitionReader[InternalRow] =
     new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs,
       failOnDataLoss, reuseKafkaConsumer)
 }
@@ -320,7 +321,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader(
     executorKafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends InputPartitionReader[UnsafeRow] with Logging {
+    reuseKafkaConsumer: Boolean) extends InputPartitionReader[InternalRow] with Logging {
 
   private val consumer = KafkaDataConsumer.acquire(
     offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index c6412ea..5d5e573 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -678,7 +678,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
           Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
           Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
         )
-        val factories = reader.planUnsafeInputPartitions().asScala
+        val factories = reader.planInputPartitions().asScala
           .map(_.asInstanceOf[KafkaMicroBatchInputPartition])
         withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
           assert(factories.size == numPartitionsGenerated)

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
index 36a3e54..ad9c838 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.reader;
 import java.util.List;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
@@ -43,7 +43,7 @@ import org.apache.spark.sql.types.StructType;
  *      Names of these interfaces start with `SupportsScan`. Note that a reader should only
  *      implement at most one of the special scans, if more than one special scans are implemented,
  *      only one of them would be respected, according to the priority list from high to low:
- *      {@link SupportsScanColumnarBatch}, {@link SupportsScanUnsafeRow}.
+ *      {@link SupportsScanColumnarBatch}, {@link SupportsDeprecatedScanRow}.
  *
  * If an exception was throw when applying any of these query optimizations, the action will fail
  * and no Spark job will be submitted.
@@ -76,5 +76,5 @@ public interface DataSourceReader {
    * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    * submitted.
    */
-  List<InputPartition<Row>> planInputPartitions();
+  List<InputPartition<InternalRow>> planInputPartitions();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
index 33fa7be..7cf382e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
@@ -26,9 +26,10 @@ import org.apache.spark.annotation.InterfaceStability;
  * An input partition reader returned by {@link InputPartition#createPartitionReader()} and is
  * responsible for outputting data for a RDD partition.
  *
- * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal input
- * partition readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for input
- * partition readers that mix in {@link SupportsScanUnsafeRow}.
+ * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}
+ * for normal data source readers, {@link org.apache.spark.sql.vectorized.ColumnarBatch} for data
+ * source readers that mix in {@link SupportsScanColumnarBatch}, or {@link org.apache.spark.sql.Row}
+ * for data source readers that mix in {@link SupportsDeprecatedScanRow}.
  */
 @InterfaceStability.Evolving
 public interface InputPartitionReader<T> extends Closeable {

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java
new file mode 100644
index 0000000..595943c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+import java.util.List;
+
+/**
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
+ * interface to output {@link Row} instead of {@link InternalRow}.
+ * This is an experimental and unstable interface.
+ */
+@InterfaceStability.Unstable
+public interface SupportsDeprecatedScanRow extends DataSourceReader {
+  default List<InputPartition<InternalRow>> planInputPartitions() {
+    throw new IllegalStateException(
+        "planInputPartitions not supported by default within SupportsDeprecatedScanRow");
+  }
+
+  List<InputPartition<Row>> planRowInputPartitions();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
index 0faf81d..f4da686 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.reader;
 import java.util.List;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 /**
@@ -30,7 +30,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 @InterfaceStability.Evolving
 public interface SupportsScanColumnarBatch extends DataSourceReader {
   @Override
-  default List<InputPartition<Row>> planInputPartitions() {
+  default List<InputPartition<InternalRow>> planInputPartitions() {
     throw new IllegalStateException(
       "planInputPartitions not supported by default within SupportsScanColumnarBatch.");
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
deleted file mode 100644
index f2220f6..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.util.List;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
- * interface to output {@link UnsafeRow} directly and avoid the row copy at Spark side.
- * This is an experimental and unstable interface, as {@link UnsafeRow} is not public and may get
- * changed in the future Spark versions.
- */
-@InterfaceStability.Unstable
-public interface SupportsScanUnsafeRow extends DataSourceReader {
-
-  @Override
-  default List<InputPartition<Row>> planInputPartitions() {
-    throw new IllegalStateException(
-      "planInputPartitions not supported by default within SupportsScanUnsafeRow");
-  }
-
-  /**
-   * Similar to {@link DataSourceReader#planInputPartitions()},
-   * but returns data in unsafe row format.
-   */
-  List<InputPartition<UnsafeRow>> planUnsafeInputPartitions();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index 8d6fb38..7ea5342 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index c6a7684..b030b9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -75,12 +75,13 @@ case class DataSourceV2ScanExec(
     case _ => super.outputPartitioning
   }
 
-  private lazy val partitions: Seq[InputPartition[UnsafeRow]] = reader match {
-    case r: SupportsScanUnsafeRow => r.planUnsafeInputPartitions().asScala
-    case _ =>
-      reader.planInputPartitions().asScala.map {
-        new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[UnsafeRow]
+  private lazy val partitions: Seq[InputPartition[InternalRow]] = reader match {
+    case r: SupportsDeprecatedScanRow =>
+      r.planRowInputPartitions().asScala.map {
+        new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[InternalRow]
       }
+    case _ =>
+      reader.planInputPartitions().asScala
   }
 
   private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match {
@@ -132,11 +133,11 @@ case class DataSourceV2ScanExec(
 }
 
 class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType)
-  extends InputPartition[UnsafeRow] {
+  extends InputPartition[InternalRow] {
 
   override def preferredLocations: Array[String] = partition.preferredLocations
 
-  override def createPartitionReader: InputPartitionReader[UnsafeRow] = {
+  override def createPartitionReader: InputPartitionReader[InternalRow] = {
     new RowToUnsafeInputPartitionReader(
       partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind())
   }
@@ -146,7 +147,7 @@ class RowToUnsafeInputPartitionReader(
     val rowReader: InputPartitionReader[Row],
     encoder: ExpressionEncoder[Row])
 
-  extends InputPartitionReader[UnsafeRow] {
+  extends InputPartitionReader[InternalRow] {
 
   override def next: Boolean = rowReader.next
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 2a7f1de..9414e68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -125,16 +125,13 @@ object DataSourceV2Strategy extends Strategy {
       val filterCondition = postScanFilters.reduceLeftOption(And)
       val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
 
-      val withProjection = if (withFilter.output != project) {
-        ProjectExec(project, withFilter)
-      } else {
-        withFilter
-      }
-
-      withProjection :: Nil
+      // always add the projection, which will produce unsafe rows required by some operators
+      ProjectExec(project, withFilter) :: Nil
 
     case r: StreamingDataSourceV2Relation =>
-      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
+      // ensure there is a projection, which will produce unsafe rows required by some operators
+      ProjectExec(r.output,
+        DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader)) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
index 73868d5..1ffa1d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
@@ -19,16 +19,16 @@ package org.apache.spark.sql.execution.streaming.continuous
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeInputPartitionReader}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.v2.RowToUnsafeInputPartitionReader
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, PartitionOffset}
-import org.apache.spark.util.{NextIterator, ThreadUtils}
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
+import org.apache.spark.util.NextIterator
 
 class ContinuousDataSourceRDDPartition(
     val index: Int,
-    val inputPartition: InputPartition[UnsafeRow])
+    val inputPartition: InputPartition[InternalRow])
   extends Partition with Serializable {
 
   // This is semantically a lazy val - it's initialized once the first time a call to
@@ -51,8 +51,8 @@ class ContinuousDataSourceRDD(
     sc: SparkContext,
     dataQueueSize: Int,
     epochPollIntervalMs: Long,
-    private val readerInputPartitions: Seq[InputPartition[UnsafeRow]])
-  extends RDD[UnsafeRow](sc, Nil) {
+    private val readerInputPartitions: Seq[InputPartition[InternalRow]])
+  extends RDD[InternalRow](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
     readerInputPartitions.zipWithIndex.map {
@@ -64,7 +64,7 @@ class ContinuousDataSourceRDD(
    * Initialize the shared reader for this partition if needed, then read rows from it until
    * it returns null to signal the end of the epoch.
    */
-  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     // If attempt number isn't 0, this is a task retry, which we don't support.
     if (context.attemptNumber() != 0) {
       throw new ContinuousTaskRetryException()
@@ -80,8 +80,8 @@ class ContinuousDataSourceRDD(
       partition.queueReader
     }
 
-    new NextIterator[UnsafeRow] {
-      override def getNext(): UnsafeRow = {
+    new NextIterator[InternalRow] {
+      override def getNext(): InternalRow = {
         readerForPartition.next() match {
           case null =>
             finished = true
@@ -101,9 +101,9 @@ class ContinuousDataSourceRDD(
 
 object ContinuousDataSourceRDD {
   private[continuous] def getContinuousReader(
-      reader: InputPartitionReader[UnsafeRow]): ContinuousInputPartitionReader[_] = {
+      reader: InputPartitionReader[InternalRow]): ContinuousInputPartitionReader[_] = {
     reader match {
-      case r: ContinuousInputPartitionReader[UnsafeRow] => r
+      case r: ContinuousInputPartitionReader[InternalRow] => r
       case wrapped: RowToUnsafeInputPartitionReader =>
         wrapped.rowReader.asInstanceOf[ContinuousInputPartitionReader[Row]]
       case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
index 8c74b82..bfb8705 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
@@ -24,7 +24,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
 import org.apache.spark.util.ThreadUtils
@@ -52,7 +52,7 @@ class ContinuousQueuedDataReader(
    */
   sealed trait ContinuousRecord
   case object EpochMarker extends ContinuousRecord
-  case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord
+  case class ContinuousRow(row: InternalRow, offset: PartitionOffset) extends ContinuousRecord
 
   private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize)
 
@@ -79,12 +79,12 @@ class ContinuousQueuedDataReader(
   }
 
   /**
-   * Return the next UnsafeRow to be read in the current epoch, or null if the epoch is done.
+   * Return the next row to be read in the current epoch, or null if the epoch is done.
    *
    * After returning null, the [[ContinuousDataSourceRDD]] compute() for the following epoch
    * will call next() again to start getting rows.
    */
-  def next(): UnsafeRow = {
+  def next(): InternalRow = {
     val POLL_TIMEOUT_MS = 1000
     var currentEntry: ContinuousRecord = null
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 516a563..55ce3ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -35,7 +35,7 @@ case class RateStreamPartitionOffset(
    partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
 
 class RateStreamContinuousReader(options: DataSourceOptions)
-  extends ContinuousReader {
+  extends ContinuousReader with SupportsDeprecatedScanRow {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 
   val creationTime = System.currentTimeMillis()
@@ -67,7 +67,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
 
   override def getStartOffset(): Offset = offset
 
-  override def planInputPartitions(): java.util.List[InputPartition[Row]] = {
+  override def planRowInputPartitions(): java.util.List[InputPartition[Row]] = {
     val partitionStartMap = offset match {
       case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
       case off =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index b137f98..f81abdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -28,12 +28,13 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -79,8 +80,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas
  * available.
  */
 case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
-    extends MemoryStreamBase[A](sqlContext)
-      with MicroBatchReader with SupportsScanUnsafeRow with Logging {
+    extends MemoryStreamBase[A](sqlContext) with MicroBatchReader with Logging {
 
   protected val logicalPlan: LogicalPlan =
     StreamingExecutionRelation(this, attributes)(sqlContext.sparkSession)
@@ -139,7 +139,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     if (endOffset.offset == -1) null else endOffset
   }
 
-  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
+  override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
     synchronized {
       // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
       val startOrdinal = startOffset.offset.toInt + 1
@@ -156,7 +156,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))
 
       newBlocks.map { block =>
-        new MemoryStreamInputPartition(block).asInstanceOf[InputPartition[UnsafeRow]]
+        new MemoryStreamInputPartition(block): InputPartition[InternalRow]
       }.asJava
     }
   }
@@ -202,9 +202,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
 
 
 class MemoryStreamInputPartition(records: Array[UnsafeRow])
-  extends InputPartition[UnsafeRow] {
-  override def createPartitionReader(): InputPartitionReader[UnsafeRow] = {
-    new InputPartitionReader[UnsafeRow] {
+  extends InputPartition[InternalRow] {
+  override def createPartitionReader(): InputPartitionReader[InternalRow] = {
+    new InputPartitionReader[InternalRow] {
       private var currentIndex = -1
 
       override def next(): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index 0bf90b8..e776ebc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.{Encoder, Row, SQLContext}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream.GetRecord
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions}
-import org.apache.spark.sql.sources.v2.reader.InputPartition
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsDeprecatedScanRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.RpcUtils
@@ -49,7 +49,8 @@ import org.apache.spark.util.RpcUtils
  *    the specified offset within the list, or null if that offset doesn't yet have a record.
  */
 class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
-  extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport {
+  extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport
+      with SupportsDeprecatedScanRow {
   private implicit val formats = Serialization.formats(NoTypeHints)
 
   protected val logicalPlan =
@@ -99,7 +100,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
     )
   }
 
-  override def planInputPartitions(): ju.List[InputPartition[Row]] = {
+  override def planRowInputPartitions(): ju.List[InputPartition[Row]] = {
     synchronized {
       val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
       endpointRef =

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
index b393c48..7a3452a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{ManualClock, SystemClock}
 
 class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: String)
-  extends MicroBatchReader with Logging {
+  extends MicroBatchReader with SupportsDeprecatedScanRow with Logging {
   import RateStreamProvider._
 
   private[sources] val clock = {
@@ -134,7 +134,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
     LongOffset(json.toLong)
   }
 
-  override def planInputPartitions(): java.util.List[InputPartition[Row]] = {
+  override def planRowInputPartitions(): java.util.List[InputPartition[Row]] = {
     val startSeconds = LongOffset.convert(start).map(_.offset).getOrElse(0L)
     val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L)
     assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
index 91e3b71..e3a2c00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming.LongOffset
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport}
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
@@ -50,7 +50,8 @@ object TextSocketMicroBatchReader {
  * debugging. This MicroBatchReader will *not* work in production applications due to multiple
  * reasons, including no support for fault recovery.
  */
-class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchReader with Logging {
+class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchReader
+    with SupportsDeprecatedScanRow with Logging {
 
   private var startOffset: Offset = _
   private var endOffset: Offset = _
@@ -141,7 +142,7 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR
     }
   }
 
-  override def planInputPartitions(): JList[InputPartition[Row]] = {
+  override def planRowInputPartitions(): JList[InputPartition[Row]] = {
     assert(startOffset != null && endOffset != null,
       "start offset and end offset should already be set before create read tasks.")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index 445cb29..c130b5f 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types.StructType;
 public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
 
   public class Reader implements DataSourceReader, SupportsPushDownRequiredColumns,
-      SupportsPushDownFilters {
+      SupportsPushDownFilters, SupportsDeprecatedScanRow {
 
     // Exposed for testing.
     public StructType requiredSchema = new StructType().add("i", "int").add("j", "int");
@@ -79,7 +79,7 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<InputPartition<Row>> planInputPartitions() {
+    public List<InputPartition<Row>> planRowInputPartitions() {
       List<InputPartition<Row>> res = new ArrayList<>();
 
       Integer lowerBound = null;

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index e49c8cf..35aafb5 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StructType;
 
 public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
 
-  class Reader implements DataSourceReader, SupportsReportPartitioning {
+  class Reader implements DataSourceReader, SupportsReportPartitioning, SupportsDeprecatedScanRow {
     private final StructType schema = new StructType().add("a", "int").add("b", "int");
 
     @Override
@@ -43,7 +43,7 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<InputPartition<Row>> planInputPartitions() {
+    public List<InputPartition<Row>> planRowInputPartitions() {
       return java.util.Arrays.asList(
         new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
         new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2}));

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index 80eeffd..6dee94c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -25,11 +25,12 @@ import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
 
-  class Reader implements DataSourceReader {
+  class Reader implements DataSourceReader, SupportsDeprecatedScanRow {
     private final StructType schema;
 
     Reader(StructType schema) {
@@ -42,7 +43,7 @@ public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWi
     }
 
     @Override
-    public List<InputPartition<Row>> planInputPartitions() {
+    public List<InputPartition<Row>> planRowInputPartitions() {
       return java.util.Collections.emptyList();
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 8522a63..5c2f351 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -28,11 +28,12 @@ import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
 
-  class Reader implements DataSourceReader {
+  class Reader implements DataSourceReader, SupportsDeprecatedScanRow {
     private final StructType schema = new StructType().add("i", "int").add("j", "int");
 
     @Override
@@ -41,7 +42,7 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<InputPartition<Row>> planInputPartitions() {
+    public List<InputPartition<Row>> planRowInputPartitions() {
       return java.util.Arrays.asList(
         new JavaSimpleInputPartition(0, 5),
         new JavaSimpleInputPartition(5, 10));

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
index 3ad8e7a..25b89c7 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
@@ -20,6 +20,7 @@ package test.org.apache.spark.sql.sources.v2;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
@@ -29,7 +30,7 @@ import org.apache.spark.sql.types.StructType;
 
 public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
 
-  class Reader implements DataSourceReader, SupportsScanUnsafeRow {
+  class Reader implements DataSourceReader {
     private final StructType schema = new StructType().add("i", "int").add("j", "int");
 
     @Override
@@ -38,7 +39,7 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<InputPartition<UnsafeRow>> planUnsafeInputPartitions() {
+    public List<InputPartition<InternalRow>> planInputPartitions() {
       return java.util.Arrays.asList(
         new JavaUnsafeRowInputPartition(0, 5),
         new JavaUnsafeRowInputPartition(5, 10));
@@ -46,7 +47,7 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
   }
 
   static class JavaUnsafeRowInputPartition
-      implements InputPartition<UnsafeRow>, InputPartitionReader<UnsafeRow> {
+      implements InputPartition<InternalRow>, InputPartitionReader<InternalRow> {
     private int start;
     private int end;
     private UnsafeRow row;
@@ -59,7 +60,7 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public InputPartitionReader<UnsafeRow> createPartitionReader() {
+    public InputPartitionReader<InternalRow> createPartitionReader() {
       return new JavaUnsafeRowInputPartition(start - 1, end);
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 9115a38..260a037 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -146,7 +146,7 @@ class RateSourceSuite extends StreamTest {
       val startOffset = LongOffset(0L)
       val endOffset = LongOffset(1L)
       reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-      val tasks = reader.planInputPartitions()
+      val tasks = reader.planRowInputPartitions()
       assert(tasks.size == 1)
       val dataReader = tasks.get(0).createPartitionReader()
       val data = ArrayBuffer[Row]()
@@ -165,7 +165,7 @@ class RateSourceSuite extends StreamTest {
       val startOffset = LongOffset(0L)
       val endOffset = LongOffset(1L)
       reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-      val tasks = reader.planInputPartitions()
+      val tasks = reader.planRowInputPartitions()
       assert(tasks.size == 11)
 
       val readData = tasks.asScala
@@ -311,7 +311,7 @@ class RateSourceSuite extends StreamTest {
     val reader = new RateStreamContinuousReader(
       new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
     reader.setStartOffset(Optional.empty())
-    val tasks = reader.planInputPartitions()
+    val tasks = reader.planRowInputPartitions()
     assert(tasks.size == 2)
 
     val data = scala.collection.mutable.ListBuffer[Row]()

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index e96cd45..d73eebb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -23,6 +23,7 @@ import test.org.apache.spark.sql.sources.v2._
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec}
 import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
@@ -344,10 +345,10 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
 
 class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceReader {
+  class Reader extends DataSourceReader with SupportsDeprecatedScanRow {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def planInputPartitions(): JList[InputPartition[Row]] = {
+    override def planRowInputPartitions(): JList[InputPartition[Row]] = {
       java.util.Arrays.asList(new SimpleInputPartition(0, 5))
     }
   }
@@ -357,10 +358,10 @@ class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
 
 class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceReader {
+  class Reader extends DataSourceReader with SupportsDeprecatedScanRow {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def planInputPartitions(): JList[InputPartition[Row]] = {
+    override def planRowInputPartitions(): JList[InputPartition[Row]] = {
       java.util.Arrays.asList(new SimpleInputPartition(0, 5), new SimpleInputPartition(5, 10))
     }
   }
@@ -390,7 +391,7 @@ class SimpleInputPartition(start: Int, end: Int)
 
 class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceReader
+  class Reader extends DataSourceReader with SupportsDeprecatedScanRow
     with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
 
     var requiredSchema = new StructType().add("i", "int").add("j", "int")
@@ -415,7 +416,7 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
       requiredSchema
     }
 
-    override def planInputPartitions(): JList[InputPartition[Row]] = {
+    override def planRowInputPartitions(): JList[InputPartition[Row]] = {
       val lowerBound = filters.collect {
         case GreaterThan("i", v: Int) => v
       }.headOption
@@ -467,10 +468,10 @@ class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
 
 class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceReader with SupportsScanUnsafeRow {
+  class Reader extends DataSourceReader {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def planUnsafeInputPartitions(): JList[InputPartition[UnsafeRow]] = {
+    override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
       java.util.Arrays.asList(new UnsafeRowInputPartitionReader(0, 5),
         new UnsafeRowInputPartitionReader(5, 10))
     }
@@ -480,14 +481,14 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
 }
 
 class UnsafeRowInputPartitionReader(start: Int, end: Int)
-  extends InputPartition[UnsafeRow] with InputPartitionReader[UnsafeRow] {
+  extends InputPartition[InternalRow] with InputPartitionReader[InternalRow] {
 
   private val row = new UnsafeRow(2)
   row.pointTo(new Array[Byte](8 * 3), 8 * 3)
 
   private var current = start - 1
 
-  override def createPartitionReader(): InputPartitionReader[UnsafeRow] = this
+  override def createPartitionReader(): InputPartitionReader[InternalRow] = this
 
   override def next(): Boolean = {
     current += 1
@@ -504,8 +505,8 @@ class UnsafeRowInputPartitionReader(start: Int, end: Int)
 
 class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
 
-  class Reader(val readSchema: StructType) extends DataSourceReader {
-    override def planInputPartitions(): JList[InputPartition[Row]] =
+  class Reader(val readSchema: StructType) extends DataSourceReader with SupportsDeprecatedScanRow {
+    override def planRowInputPartitions(): JList[InputPartition[Row]] =
       java.util.Collections.emptyList()
   }
 
@@ -568,10 +569,11 @@ class BatchInputPartitionReader(start: Int, end: Int)
 
 class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceReader with SupportsReportPartitioning {
+  class Reader extends DataSourceReader with SupportsReportPartitioning
+      with SupportsDeprecatedScanRow {
     override def readSchema(): StructType = new StructType().add("a", "int").add("b", "int")
 
-    override def planInputPartitions(): JList[InputPartition[Row]] = {
+    override def planRowInputPartitions(): JList[InputPartition[Row]] = {
       // Note that we don't have same value of column `a` across partitions.
       java.util.Arrays.asList(
         new SpecificInputPartitionReader(Array(1, 1, 3), Array(4, 4, 6)),

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 1334cf7..98d7eed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -42,10 +42,11 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
 
   private val schema = new StructType().add("i", "long").add("j", "long")
 
-  class Reader(path: String, conf: Configuration) extends DataSourceReader {
+  class Reader(path: String, conf: Configuration) extends DataSourceReader
+      with SupportsDeprecatedScanRow {
     override def readSchema(): StructType = schema
 
-    override def planInputPartitions(): JList[InputPartition[Row]] = {
+    override def planRowInputPartitions(): JList[InputPartition[Row]] = {
       val dataPath = new Path(path)
       val fs = dataPath.getFileSystem(conf)
       if (fs.exists(dataPath)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 936a076..78199b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.mockito.MockitoSugar
 import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset}
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
 import org.apache.spark.sql.functions._
@@ -227,10 +227,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
       }
 
       // getBatch should take 100 ms the first time it is called
-      override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
+      override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
         synchronized {
           clock.waitTillTime(1350)
-          super.planUnsafeInputPartitions()
+          super.planInputPartitions()
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index 0e7e6fe..4f19881 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.streaming.continuous
 
 import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
 
-import org.mockito.{ArgumentCaptor, Matchers}
 import org.mockito.Mockito._
 import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark.{SparkEnv, SparkFunSuite, TaskContext}
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.reader.InputPartition
@@ -73,8 +73,8 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
    */
   private def setup(): (BlockingQueue[UnsafeRow], ContinuousQueuedDataReader) = {
     val queue = new ArrayBlockingQueue[UnsafeRow](1024)
-    val factory = new InputPartition[UnsafeRow] {
-      override def createPartitionReader() = new ContinuousInputPartitionReader[UnsafeRow] {
+    val factory = new InputPartition[InternalRow] {
+      override def createPartitionReader() = new ContinuousInputPartitionReader[InternalRow] {
         var index = -1
         var curr: UnsafeRow = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d27541a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index c1a28b9..7c01215 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,14 +26,15 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.reader.InputPartition
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsDeprecatedScanRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
-case class FakeReader() extends MicroBatchReader with ContinuousReader {
+case class FakeReader() extends MicroBatchReader with ContinuousReader
+    with SupportsDeprecatedScanRow {
   def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {}
   def getStartOffset: Offset = RateStreamOffset(Map())
   def getEndOffset: Offset = RateStreamOffset(Map())
@@ -44,7 +45,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
   def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
   def setStartOffset(start: Optional[Offset]): Unit = {}
 
-  def planInputPartitions(): java.util.ArrayList[InputPartition[Row]] = {
+  def planRowInputPartitions(): java.util.ArrayList[InputPartition[Row]] = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org