You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/01/17 15:33:52 UTC

[spark] branch master updated: [SPARK-23817][SQL] Create file source V2 framework and migrate ORC read path

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c0632ce  [SPARK-23817][SQL] Create file source V2 framework and migrate ORC read path
c0632ce is described below

commit c0632cec04e5b0f3fb3c3f27c21a2d3f3fbb4f7e
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Thu Jan 17 23:33:29 2019 +0800

    [SPARK-23817][SQL] Create file source V2 framework and migrate ORC read path
    
    ## What changes were proposed in this pull request?
    Create a framework for file source V2 based on data source V2 API.
    As a good example for demonstrating the framework, this PR also migrate ORC source. This is because ORC file source supports both row scan and columnar scan, and the implementation is simpler comparing with Parquet.
    
    Note: Currently only read path of V2 API is done, this framework and migration are only for the read path.
    Supports the following scan:
    - Scan ColumnarBatch
    - Scan UnsafeRow
    - Push down filters
    - Push down required columns
    
    Not supported( due to the limitation of data source V2 API):
    - Stats metrics
    - Catalog table
    - Writes
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #23383 from gengliangwang/latest_orcV2.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  11 +-
 .../spark/sql/sources/v2/DataSourceOptions.java    |  10 ++
 .../org/apache/spark/sql/DataFrameReader.scala     |  18 ++-
 .../org/apache/spark/sql/DataFrameWriter.scala     |   3 +-
 .../spark/sql/execution/DataSourceScanExec.scala   | 107 +++----------
 .../spark/sql/execution/PartitionedFileUtil.scala  |  94 ++++++++++++
 .../spark/sql/execution/command/tables.scala       |   5 +-
 .../sql/execution/datasources/DataSource.scala     |  97 +++++++-----
 .../datasources/FallbackOrcDataSourceV2.scala      |  42 +++++
 .../sql/execution/datasources/FilePartition.scala  |  97 ++++++++++++
 .../sql/execution/datasources/FileScanRDD.scala    |  24 +--
 .../execution/datasources/HadoopFsRelation.scala   |  22 +--
 .../execution/datasources/PartitioningUtils.scala  |  36 ++++-
 .../sql/execution/datasources/orc/OrcFilters.scala |  20 +--
 .../datasources/v2/EmptyPartitionReader.scala      |  34 +++++
 .../datasources/v2/FileDataSourceV2.scala          |  54 +++++++
 .../datasources/v2/FilePartitionReader.scala       |  77 ++++++++++
 .../v2/FilePartitionReaderFactory.scala            |  61 ++++++++
 .../sql/execution/datasources/v2/FileScan.scala    |  61 ++++++++
 .../execution/datasources/v2/FileScanBuilder.scala |  31 ++++
 .../sql/execution/datasources/v2/FileTable.scala   |  51 +++++++
 .../datasources/v2/PartitionRecordReader.scala     |  41 +++++
 .../datasources/v2/orc/OrcDataSourceV2.scala       |  46 ++++++
 .../v2/orc/OrcPartitionReaderFactory.scala         | 169 +++++++++++++++++++++
 .../sql/execution/datasources/v2/orc/OrcScan.scala |  43 ++++++
 .../datasources/v2/orc/OrcScanBuilder.scala        |  62 ++++++++
 .../execution/datasources/v2/orc/OrcTable.scala    |  39 +++++
 .../sql/internal/BaseSessionStateBuilder.scala     |   1 +
 .../scala/org/apache/spark/sql/QueryTest.scala     |   5 +-
 .../execution/datasources/orc/OrcFilterSuite.scala |  85 ++++++-----
 .../orc/OrcPartitionDiscoverySuite.scala           |   7 +
 .../execution/datasources/orc/OrcQuerySuite.scala  |   9 +-
 .../datasources/orc/OrcV1FilterSuite.scala         | 104 +++++++++++++
 .../sources/v2/FileDataSourceV2FallBackSuite.scala |  97 ++++++++++++
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   1 +
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala |   2 +-
 36 files changed, 1436 insertions(+), 230 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c1b885a..ebc8c37 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1419,8 +1419,15 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(100)
 
-  val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
+  val USE_V1_SOURCE_READER_LIST = buildConf("spark.sql.sources.read.useV1SourceList")
     .internal()
+    .doc("A comma-separated list of data source short names or fully qualified data source" +
+      " register class names for which data source V2 read paths are disabled. Reads from these" +
+      " sources will fall back to the V1 sources.")
+    .stringConf
+    .createWithDefault("")
+
+  val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
     .doc("A comma-separated list of fully qualified data source register class names for which" +
       " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.")
     .stringConf
@@ -2002,6 +2009,8 @@ class SQLConf extends Serializable with Logging {
   def continuousStreamingExecutorPollIntervalMs: Long =
     getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)
 
+  def userV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST)
+
   def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
 
   def disabledV2StreamingMicroBatchReaders: String =
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
index 1c5e3a0..00af0bf 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
@@ -164,6 +164,11 @@ public class DataSourceOptions {
   public static final String DATABASE_KEY = "database";
 
   /**
+   * The option key for whether to check existence of files for a table.
+   */
+  public static final String CHECK_FILES_EXIST_KEY = "check_files_exist";
+
+  /**
    * Returns all the paths specified by both the singular path option and the multiple
    * paths option.
    */
@@ -197,4 +202,9 @@ public class DataSourceOptions {
   public Optional<String> databaseName() {
     return get(DATABASE_KEY);
   }
+
+  public Boolean checkFilesExist() {
+    Optional<String> result = get(CHECK_FILES_EXIST_KEY);
+    return result.isPresent() && result.get().equals("true");
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index af369a5..2b15217 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -37,8 +37,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.csv._
 import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, FileTable}
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
@@ -193,7 +192,16 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         "read files of Hive data source directly.")
     }
 
-    val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
+    val useV1Sources =
+      sparkSession.sessionState.conf.userV1SourceReaderList.toLowerCase(Locale.ROOT).split(",")
+    val lookupCls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
+    val cls = lookupCls.newInstance() match {
+      case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) ||
+        useV1Sources.contains(lookupCls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
+        f.fallBackFileFormat
+      case _ => lookupCls
+    }
+
     if (classOf[TableProvider].isAssignableFrom(cls)) {
       val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
       val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
@@ -202,7 +210,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         val objectMapper = new ObjectMapper()
         DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
       }
-      val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption
+      val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "true"
+      val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + checkFilesExistsOption
       val dsOptions = new DataSourceOptions(finalOptions.asJava)
       val table = userSpecifiedSchema match {
         case Some(schema) => provider.getTable(dsOptions, schema)
@@ -211,6 +220,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
       table match {
         case _: SupportsBatchRead =>
           Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions))
+
         case _ => loadV1Source(paths: _*)
       }
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 228dcb9..d9404cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -248,7 +248,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
       val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
         provider, session.sessionState.conf)
-      val options = sessionOptions ++ extraOptions
+      val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "false"
+      val options = sessionOptions ++ extraOptions + checkFilesExistsOption
       val dsOptions = new DataSourceOptions(options.asJava)
       provider.getTable(dsOptions) match {
         case table: SupportsBatchWrite =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 8b84eda..f6f3fb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -373,8 +373,7 @@ case class FileSourceScanExec(
     val filesGroupedToBuckets =
       selectedPartitions.flatMap { p =>
         p.files.filter(_.getLen > 0).map { f =>
-          val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
-          PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
+          PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
         }
       }.groupBy { f =>
         BucketingUtils
@@ -410,107 +409,35 @@ case class FileSourceScanExec(
       readFile: (PartitionedFile) => Iterator[InternalRow],
       selectedPartitions: Seq[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
-    val defaultMaxSplitBytes =
-      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
     val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
-    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
-    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
-    val bytesPerCore = totalBytes / defaultParallelism
-
-    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+    val maxSplitBytes =
+      FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
     val splitFiles = selectedPartitions.flatMap { partition =>
       partition.files.filter(_.getLen > 0).flatMap { file =>
-        val blockLocations = getBlockLocations(file)
-        if (fsRelation.fileFormat.isSplitable(
-            fsRelation.sparkSession, fsRelation.options, file.getPath)) {
-          (0L until file.getLen by maxSplitBytes).map { offset =>
-            val remaining = file.getLen - offset
-            val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
-            val hosts = getBlockHosts(blockLocations, offset, size)
-            PartitionedFile(
-              partition.values, file.getPath.toUri.toString, offset, size, hosts)
-          }
-        } else {
-          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
-          Seq(PartitionedFile(
-            partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
-        }
+        // getPath() is very expensive so we only want to call it once in this block:
+        val filePath = file.getPath
+        val isSplitable = relation.fileFormat.isSplitable(
+          relation.sparkSession, relation.options, filePath)
+        PartitionedFileUtil.splitFiles(
+          sparkSession = relation.sparkSession,
+          file = file,
+          filePath = filePath,
+          isSplitable = isSplitable,
+          maxSplitBytes = maxSplitBytes,
+          partitionValues = partition.values
+        )
       }
     }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
 
-    val partitions = new ArrayBuffer[FilePartition]
-    val currentFiles = new ArrayBuffer[PartitionedFile]
-    var currentSize = 0L
-
-    /** Close the current partition and move to the next. */
-    def closePartition(): Unit = {
-      if (currentFiles.nonEmpty) {
-        val newPartition =
-          FilePartition(
-            partitions.size,
-            currentFiles.toArray.toSeq) // Copy to a new Array.
-        partitions += newPartition
-      }
-      currentFiles.clear()
-      currentSize = 0
-    }
-
-    // Assign files to partitions using "Next Fit Decreasing"
-    splitFiles.foreach { file =>
-      if (currentSize + file.length > maxSplitBytes) {
-        closePartition()
-      }
-      // Add the given file to the current partition.
-      currentSize += file.length + openCostInBytes
-      currentFiles += file
-    }
-    closePartition()
+    val partitions =
+      FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
 
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
   }
 
-  private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
-    case f: LocatedFileStatus => f.getBlockLocations
-    case f => Array.empty[BlockLocation]
-  }
-
-  // Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)`
-  // pair that represents a segment of the same file, find out the block that contains the largest
-  // fraction the segment, and returns location hosts of that block. If no such block can be found,
-  // returns an empty array.
-  private def getBlockHosts(
-      blockLocations: Array[BlockLocation], offset: Long, length: Long): Array[String] = {
-    val candidates = blockLocations.map {
-      // The fragment starts from a position within this block
-      case b if b.getOffset <= offset && offset < b.getOffset + b.getLength =>
-        b.getHosts -> (b.getOffset + b.getLength - offset).min(length)
-
-      // The fragment ends at a position within this block
-      case b if offset <= b.getOffset && offset + length < b.getLength =>
-        b.getHosts -> (offset + length - b.getOffset).min(length)
-
-      // The fragment fully contains this block
-      case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length =>
-        b.getHosts -> b.getLength
-
-      // The fragment doesn't intersect with this block
-      case b =>
-        b.getHosts -> 0L
-    }.filter { case (hosts, size) =>
-      size > 0L
-    }
-
-    if (candidates.isEmpty) {
-      Array.empty[String]
-    } else {
-      val (hosts, _) = candidates.maxBy { case (_, size) => size }
-      hosts
-    }
-  }
-
   override def doCanonicalize(): FileSourceScanExec = {
     FileSourceScanExec(
       relation,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
new file mode 100644
index 0000000..3196624
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+
+object PartitionedFileUtil {
+  def splitFiles(
+      sparkSession: SparkSession,
+      file: FileStatus,
+      filePath: Path,
+      isSplitable: Boolean,
+      maxSplitBytes: Long,
+      partitionValues: InternalRow): Seq[PartitionedFile] = {
+    if (isSplitable) {
+      (0L until file.getLen by maxSplitBytes).map { offset =>
+        val remaining = file.getLen - offset
+        val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
+        val hosts = getBlockHosts(getBlockLocations(file), offset, size)
+        PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
+      }
+    } else {
+      Seq(getPartitionedFile(file, filePath, partitionValues))
+    }
+  }
+
+  def getPartitionedFile(
+      file: FileStatus,
+      filePath: Path,
+      partitionValues: InternalRow): PartitionedFile = {
+    val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
+    PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts)
+  }
+
+  private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
+    case f: LocatedFileStatus => f.getBlockLocations
+    case f => Array.empty[BlockLocation]
+  }
+  // Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)`
+  // pair that represents a segment of the same file, find out the block that contains the largest
+  // fraction the segment, and returns location hosts of that block. If no such block can be found,
+  // returns an empty array.
+  private def getBlockHosts(
+      blockLocations: Array[BlockLocation],
+      offset: Long,
+      length: Long): Array[String] = {
+    val candidates = blockLocations.map {
+      // The fragment starts from a position within this block
+      case b if b.getOffset <= offset && offset < b.getOffset + b.getLength =>
+        b.getHosts -> (b.getOffset + b.getLength - offset).min(length)
+
+      // The fragment ends at a position within this block
+      case b if offset <= b.getOffset && offset + length < b.getLength =>
+        b.getHosts -> (offset + length - b.getOffset).min(length)
+
+      // The fragment fully contains this block
+      case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length =>
+        b.getHosts -> b.getLength
+
+      // The fragment doesn't intersect with this block
+      case b =>
+        b.getHosts -> 0L
+    }.filter { case (hosts, size) =>
+      size > 0L
+    }
+
+    if (candidates.isEmpty) {
+      Array.empty[String]
+    } else {
+      val (hosts, _) = candidates.maxBy { case (_, size) => size }
+      hosts
+    }
+  }
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e2cd409..d24e66e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
@@ -214,7 +215,7 @@ case class AlterTableAddColumnsCommand(
   /**
    * ALTER TABLE ADD COLUMNS command does not support temporary view/table,
    * view, or datasource table with text, orc formats or external provider.
-   * For datasource table, it currently only supports parquet, json, csv.
+   * For datasource table, it currently only supports parquet, json, csv, orc.
    */
   private def verifyAlterTableAddColumn(
       conf: SQLConf,
@@ -237,7 +238,7 @@ case class AlterTableAddColumnsCommand(
         // TextFileFormat only default to one column "value"
         // Hive type is already considered as hive serde table, so the logic will not
         // come in here.
-        case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
+        case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat | _: OrcDataSourceV2 =>
         case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
         case s =>
           throw new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5dad784..d48261e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.language.{existentials, implicitConversions}
 import scala.util.{Failure, Success, Try}
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -40,6 +41,8 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
 import org.apache.spark.sql.internal.SQLConf
@@ -90,8 +93,19 @@ case class DataSource(
 
   case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
 
-  lazy val providingClass: Class[_] =
-    DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
+  lazy val providingClass: Class[_] = {
+    val cls = DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
+    // `providingClass` is used for resolving data source relation for catalog tables.
+    // As now catalog for data source V2 is under development, here we fall back all the
+    // [[FileDataSourceV2]] to [[FileFormat]] to guarantee the current catalog works.
+    // [[FileDataSourceV2]] will still be used if we call the load()/save() method in
+    // [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
+    // instead of `providingClass`.
+    cls.newInstance() match {
+      case f: FileDataSourceV2 => f.fallBackFileFormat
+      case _ => cls
+    }
+  }
   lazy val sourceInfo: SourceInfo = sourceSchema()
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
   private val equality = sparkSession.sessionState.conf.resolver
@@ -543,40 +557,9 @@ case class DataSource(
       checkFilesExist: Boolean): Seq[Path] = {
     val allPaths = caseInsensitiveOptions.get("path") ++ paths
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
-    val allGlobPath = allPaths.flatMap { path =>
-      val hdfsPath = new Path(path)
-      val fs = hdfsPath.getFileSystem(hadoopConf)
-      val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-      val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
-
-      if (checkEmptyGlobPath && globPath.isEmpty) {
-        throw new AnalysisException(s"Path does not exist: $qualified")
-      }
-
-      // Sufficient to check head of the globPath seq for non-glob scenario
-      // Don't need to check once again if files exist in streaming mode
-      if (checkFilesExist && !fs.exists(globPath.head)) {
-        throw new AnalysisException(s"Path does not exist: ${globPath.head}")
-      }
-      globPath
-    }.toSeq
 
-    if (checkFilesExist) {
-      val (filteredOut, filteredIn) = allGlobPath.partition { path =>
-        InMemoryFileIndex.shouldFilterOut(path.getName)
-      }
-      if (filteredOut.nonEmpty) {
-        if (filteredIn.isEmpty) {
-          logWarning(
-            s"All paths were ignored:\n  ${filteredOut.mkString("\n  ")}")
-        } else {
-          logDebug(
-            s"Some paths were ignored:\n  ${filteredOut.mkString("\n  ")}")
-        }
-      }
-    }
-
-    allGlobPath
+    DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
+      checkEmptyGlobPath, checkFilesExist)
   }
 }
 
@@ -632,7 +615,7 @@ object DataSource extends Logging {
     val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
       case name if name.equalsIgnoreCase("orc") &&
           conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
-        classOf[OrcFileFormat].getCanonicalName
+        classOf[OrcDataSourceV2].getCanonicalName
       case name if name.equalsIgnoreCase("orc") &&
           conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
         "org.apache.spark.sql.hive.orc.OrcFileFormat"
@@ -722,6 +705,48 @@ object DataSource extends Logging {
   }
 
   /**
+   * Checks and returns files in all the paths.
+   */
+  private[sql] def checkAndGlobPathIfNecessary(
+      paths: Seq[String],
+      hadoopConf: Configuration,
+      checkEmptyGlobPath: Boolean,
+      checkFilesExist: Boolean): Seq[Path] = {
+    val allGlobPath = paths.flatMap { path =>
+      val hdfsPath = new Path(path)
+      val fs = hdfsPath.getFileSystem(hadoopConf)
+      val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+      val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+
+      if (checkEmptyGlobPath && globPath.isEmpty) {
+        throw new AnalysisException(s"Path does not exist: $qualified")
+      }
+
+      // Sufficient to check head of the globPath seq for non-glob scenario
+      // Don't need to check once again if files exist in streaming mode
+      if (checkFilesExist && !fs.exists(globPath.head)) {
+        throw new AnalysisException(s"Path does not exist: ${globPath.head}")
+      }
+      globPath
+    }
+
+    if (checkFilesExist) {
+      val (filteredOut, filteredIn) = allGlobPath.partition { path =>
+        InMemoryFileIndex.shouldFilterOut(path.getName)
+      }
+      if (filteredIn.isEmpty) {
+        logWarning(
+          s"All paths were ignored:\n  ${filteredOut.mkString("\n  ")}")
+      } else {
+        logDebug(
+          s"Some paths were ignored:\n  ${filteredOut.mkString("\n  ")}")
+      }
+    }
+
+    allGlobPath
+  }
+
+  /**
    * When creating a data source table, the `path` option has a special meaning: the table location.
    * This method extracts the `path` option and treat it as table location to build a
    * [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
new file mode 100644
index 0000000..cefdfd1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
+
+/**
+ * Replace the ORC V2 data source of table in [[InsertIntoTable]] to V1 [[FileFormat]].
+ * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into  view `t` fails
+ * since there is no corresponding physical plan.
+ * SPARK-23817: This is a temporary hack for making current data source V2 work. It should be
+ * removed when write path of file data source v2 is finished.
+ */
+class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
+      val v1FileFormat = new OrcFileFormat
+      val relation = HadoopFsRelation(table.getFileIndex, table.getFileIndex.partitionSchema,
+        table.schema(), None, v1FileFormat, d.options)(sparkSession)
+      i.copy(table = LogicalRelation(relation))
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
new file mode 100644
index 0000000..4b1ade8e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.Partition
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.sources.v2.reader.InputPartition
+
+/**
+ * A collection of file blocks that should be read as a single task
+ * (possibly from multiple partitioned directories).
+ */
+case class FilePartition(index: Int, files: Seq[PartitionedFile])
+  extends Partition with InputPartition {
+  override def preferredLocations(): Array[String] = {
+    // Computes total number of bytes can be retrieved from each host.
+    val hostToNumBytes = mutable.HashMap.empty[String, Long]
+    files.foreach { file =>
+      file.locations.filter(_ != "localhost").foreach { host =>
+        hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + file.length
+      }
+    }
+
+    // Takes the first 3 hosts with the most data to be retrieved
+    hostToNumBytes.toSeq.sortBy {
+      case (host, numBytes) => numBytes
+    }.reverse.take(3).map {
+      case (host, numBytes) => host
+    }.toArray
+  }
+}
+
+object FilePartition extends Logging {
+
+  def getFilePartitions(
+      sparkSession: SparkSession,
+      partitionedFiles: Seq[PartitionedFile],
+      maxSplitBytes: Long): Seq[FilePartition] = {
+    val partitions = new ArrayBuffer[FilePartition]
+    val currentFiles = new ArrayBuffer[PartitionedFile]
+    var currentSize = 0L
+
+    /** Close the current partition and move to the next. */
+    def closePartition(): Unit = {
+      if (currentFiles.nonEmpty) {
+        // Copy to a new Array.
+        val newPartition = FilePartition(partitions.size, currentFiles.toArray.toSeq)
+        partitions += newPartition
+      }
+      currentFiles.clear()
+      currentSize = 0
+    }
+
+    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+    // Assign files to partitions using "Next Fit Decreasing"
+    partitionedFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
+      }
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles += file
+    }
+    closePartition()
+    partitions
+  }
+
+  def maxSplitBytes(
+      sparkSession: SparkSession,
+      selectedPartitions: Seq[PartitionDirectory]): Long = {
+    val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+    val defaultParallelism = sparkSession.sparkContext.defaultParallelism
+    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+
+    Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index ffea33c..d92ea2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -29,6 +29,7 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.NextIterator
 
@@ -54,12 +55,6 @@ case class PartitionedFile(
 }
 
 /**
- * A collection of file blocks that should be read as a single task
- * (possibly from multiple partitioned directories).
- */
-case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends RDDPartition
-
-/**
  * An RDD that scans a list of file partitions.
  */
 class FileScanRDD(
@@ -216,21 +211,6 @@ class FileScanRDD(
   override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray
 
   override protected def getPreferredLocations(split: RDDPartition): Seq[String] = {
-    val files = split.asInstanceOf[FilePartition].files
-
-    // Computes total number of bytes can be retrieved from each host.
-    val hostToNumBytes = mutable.HashMap.empty[String, Long]
-    files.foreach { file =>
-      file.locations.filter(_ != "localhost").foreach { host =>
-        hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + file.length
-      }
-    }
-
-    // Takes the first 3 hosts with the most data to be retrieved
-    hostToNumBytes.toSeq.sortBy {
-      case (host, numBytes) => numBytes
-    }.reverse.take(3).map {
-      case (host, numBytes) => host
-    }
+    split.asInstanceOf[FilePartition].preferredLocations()
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index b2f73b7..d278802 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -52,28 +52,12 @@ case class HadoopFsRelation(
 
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
-  private def getColName(f: StructField): String = {
-    if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
-      f.name
-    } else {
-      f.name.toLowerCase(Locale.ROOT)
-    }
-  }
-
-  val overlappedPartCols = mutable.Map.empty[String, StructField]
-  partitionSchema.foreach { partitionField =>
-    if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
-      overlappedPartCols += getColName(partitionField) -> partitionField
-    }
-  }
-
   // When data and partition schemas have overlapping columns, the output
   // schema respects the order of the data schema for the overlapping columns, and it
   // respects the data types of the partition schema.
-  val schema: StructType = {
-    StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
-      partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
-  }
+  val (schema: StructType, overlappedPartCols: Map[String, StructField]) =
+    PartitioningUtils.mergeDataAndPartitionSchema(dataSchema,
+      partitionSchema, sparkSession.sessionState.conf.caseSensitiveAnalysis)
 
   def partitionSchemaOption: Option[StructType] =
     if (partitionSchema.isEmpty) None else Some(partitionSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index ee77042..a2e0818 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -21,6 +21,7 @@ import java.lang.{Double => JDouble, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 import java.util.{Locale, TimeZone}
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 
@@ -30,7 +31,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
@@ -65,9 +66,7 @@ object PartitioningUtils {
     require(columnNames.size == literals.size)
   }
 
-  import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
-  import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
-  import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
+  import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.{escapePathName, unescapePathName, DEFAULT_PARTITION_NAME}
 
   /**
    * Given a group of qualified paths, tries to parse them and returns a partition specification.
@@ -558,6 +557,35 @@ object PartitioningUtils {
     }).asNullable
   }
 
+  def mergeDataAndPartitionSchema(
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      caseSensitive: Boolean): (StructType, Map[String, StructField]) = {
+    val overlappedPartCols = mutable.Map.empty[String, StructField]
+    partitionSchema.foreach { partitionField =>
+      val partitionFieldName = getColName(partitionField, caseSensitive)
+      if (dataSchema.exists(getColName(_, caseSensitive) == partitionFieldName)) {
+        overlappedPartCols += partitionFieldName -> partitionField
+      }
+    }
+
+    // When data and partition schemas have overlapping columns, the output
+    // schema respects the order of the data schema for the overlapping columns, and it
+    // respects the data types of the partition schema.
+    val fullSchema =
+    StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f, caseSensitive), f)) ++
+      partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f, caseSensitive))))
+    (fullSchema, overlappedPartCols.toMap)
+  }
+
+  def getColName(f: StructField, caseSensitive: Boolean): String = {
+    if (caseSensitive) {
+      f.name
+    } else {
+      f.name.toLowerCase(Locale.ROOT)
+    }
+  }
+
   private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
     if (caseSensitive) {
       org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 0a64981..cd2a68a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -82,22 +82,24 @@ private[sql] object OrcFilters {
    */
   def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
     val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
-
-    // First, tries to convert each filter individually to see whether it's convertible, and then
-    // collect all convertible ones to build the final `SearchArgument`.
-    val convertibleFilters = for {
-      filter <- filters
-      _ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
-    } yield filter
-
     for {
       // Combines all convertible filters using `And` to produce a single conjunction
-      conjunction <- buildTree(convertibleFilters)
+      conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters))
       // Then tries to build a single ORC `SearchArgument` for the conjunction predicate
       builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
     } yield builder.build()
   }
 
+  def convertibleFilters(
+      schema: StructType,
+      dataTypeMap: Map[String, DataType],
+      filters: Seq[Filter]): Seq[Filter] = {
+    for {
+      filter <- filters
+      _ <- buildSearchArgument(dataTypeMap, filter, newBuilder())
+    } yield filter
+  }
+
   /**
    * Return true if this is a searchable type in ORC.
    * Both CharType and VarcharType are cleaned at AstBuilder.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/EmptyPartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/EmptyPartitionReader.scala
new file mode 100644
index 0000000..b177d15
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/EmptyPartitionReader.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.io.IOException
+
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+
+/**
+ * A [[PartitionReader]] with empty output.
+ */
+class EmptyPartitionReader[T] extends PartitionReader[T] {
+  override def next(): Boolean = false
+
+  override def get(): T =
+    throw new IOException("No records should be returned from EmptyDataReader")
+
+  override def close(): Unit = {}
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
new file mode 100644
index 0000000..a0c932c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, SupportsBatchRead, TableProvider}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A base interface for data source v2 implementations of the built-in file-based data sources.
+ */
+trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
+  /**
+   * Returns a V1 [[FileFormat]] class of the same file data source.
+   * This is a solution for the following cases:
+   * 1. File datasource V2 implementations cause regression. Users can disable the problematic data
+   *    source via SQL configuration and fall back to FileFormat.
+   * 2. Catalog support is required, which is still under development for data source V2.
+   */
+  def fallBackFileFormat: Class[_ <: FileFormat]
+
+  lazy val sparkSession = SparkSession.active
+
+  def getFileIndex(
+      options: DataSourceOptions,
+      userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = {
+    val filePaths = options.paths()
+    val hadoopConf =
+      sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
+    val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
+      checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
+    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+    new InMemoryFileIndex(sparkSession, rootPathsSpecified,
+      options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
new file mode 100644
index 0000000..d76d69d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.io.{FileNotFoundException, IOException}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+
+class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]])
+  extends PartitionReader[T] with Logging {
+  private var currentReader: PartitionedFileReader[T] = null
+
+  private val sqlConf = SQLConf.get
+  private def ignoreMissingFiles = sqlConf.ignoreMissingFiles
+  private def ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
+
+  override def next(): Boolean = {
+    if (currentReader == null) {
+      if (readers.hasNext) {
+        if (ignoreMissingFiles || ignoreCorruptFiles) {
+          try {
+            currentReader = readers.next()
+            logInfo(s"Reading file $currentReader")
+          } catch {
+            case e: FileNotFoundException if ignoreMissingFiles =>
+              logWarning(s"Skipped missing file: $currentReader", e)
+              currentReader = null
+              return false
+            // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+            case e: FileNotFoundException if !ignoreMissingFiles => throw e
+            case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
+              logWarning(
+                s"Skipped the rest of the content in the corrupted file: $currentReader", e)
+              currentReader = null
+              return false
+          }
+        } else {
+          currentReader = readers.next()
+          logInfo(s"Reading file $currentReader")
+        }
+      } else {
+        return false
+      }
+    }
+    if (currentReader.next()) {
+      true
+    } else {
+      close()
+      currentReader = null
+      next()
+    }
+  }
+
+  override def get(): T = currentReader.get()
+
+  override def close(): Unit = {
+    if (currentReader != null) {
+      currentReader.close()
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
new file mode 100644
index 0000000..101a70e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
+  override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
+    assert(partition.isInstanceOf[FilePartition])
+    val filePartition = partition.asInstanceOf[FilePartition]
+    val iter = filePartition.files.toIterator.map { file =>
+      new PartitionedFileReader(file, buildReader(file))
+    }
+    new FilePartitionReader[InternalRow](iter)
+  }
+
+  override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = {
+    assert(partition.isInstanceOf[FilePartition])
+    val filePartition = partition.asInstanceOf[FilePartition]
+    val iter = filePartition.files.toIterator.map { file =>
+      new PartitionedFileReader(file, buildColumnarReader(file))
+    }
+    new FilePartitionReader[ColumnarBatch](iter)
+  }
+
+  def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow]
+
+  def buildColumnarReader(partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = {
+    throw new UnsupportedOperationException("Cannot create columnar reader.")
+  }
+}
+
+// A compound class for combining file and its corresponding reader.
+private[v2] class PartitionedFileReader[T](
+    file: PartitionedFile,
+    reader: PartitionReader[T]) extends PartitionReader[T] {
+  override def next(): Boolean = reader.next()
+
+  override def get(): T = reader.get()
+
+  override def close(): Unit = reader.close()
+
+  override def toString: String = file.toString
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
new file mode 100644
index 0000000..3615b15
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.PartitionedFileUtil
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan}
+import org.apache.spark.sql.types.StructType
+
+abstract class FileScan(
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex) extends Scan with Batch {
+  /**
+   * Returns whether a file with `path` could be split or not.
+   */
+  def isSplitable(path: Path): Boolean = {
+    false
+  }
+
+  protected def partitions: Seq[FilePartition] = {
+    val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty)
+    val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      partition.files.flatMap { file =>
+        val filePath = file.getPath
+        PartitionedFileUtil.splitFiles(
+          sparkSession = sparkSession,
+          file = file,
+          filePath = filePath,
+          isSplitable = isSplitable(filePath),
+          maxSplitBytes = maxSplitBytes,
+          partitionValues = partition.values
+        )
+      }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    }
+    FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes)
+  }
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    partitions.toArray
+  }
+
+  override def toBatch: Batch = this
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
new file mode 100644
index 0000000..5dd343b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.types.StructType
+
+abstract class FileScanBuilder(schema: StructType)
+  extends ScanBuilder
+  with SupportsPushDownRequiredColumns
+  with SupportsPushDownFilters {
+  protected var readSchema = schema
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    this.readSchema = requiredSchema
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
new file mode 100644
index 0000000..b178654
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.v2.{SupportsBatchRead, Table}
+import org.apache.spark.sql.types.StructType
+
+abstract class FileTable(
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    userSpecifiedSchema: Option[StructType]) extends Table with SupportsBatchRead {
+  def getFileIndex: PartitioningAwareFileIndex = this.fileIndex
+
+  lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
+    inferSchema(fileIndex.allFiles())
+  }.getOrElse {
+    throw new AnalysisException(
+      s"Unable to infer schema for $name. It must be specified manually.")
+  }.asNullable
+
+  override def schema(): StructType = {
+    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    PartitioningUtils.mergeDataAndPartitionSchema(dataSchema,
+      fileIndex.partitionSchema, caseSensitive)._1
+  }
+
+  /**
+   * When possible, this method should return the schema of the given `files`.  When the format
+   * does not support inference, or no valid files are given should return None.  In these cases
+   * Spark will require that user specify the schema manually.
+   */
+  def inferSchema(files: Seq[FileStatus]): Option[StructType]
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala
new file mode 100644
index 0000000..ff78ef3
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.hadoop.mapreduce.RecordReader
+
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+
+class PartitionRecordReader[T](
+    private[this] var rowReader: RecordReader[_, T]) extends PartitionReader[T] {
+  override def next(): Boolean = rowReader.nextKeyValue()
+
+  override def get(): T = rowReader.getCurrentValue
+
+  override def close(): Unit = rowReader.close()
+}
+
+class PartitionRecordReaderWithProject[X, T](
+    private[this] var rowReader: RecordReader[_, X],
+    project: X => T) extends PartitionReader[T] {
+  override def next(): Boolean = rowReader.nextKeyValue()
+
+  override def get(): T = project(rowReader.getCurrentValue)
+
+  override def close(): Unit = rowReader.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
new file mode 100644
index 0000000..db1f2f7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.orc
+
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, Table}
+import org.apache.spark.sql.types.StructType
+
+class OrcDataSourceV2 extends FileDataSourceV2 {
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[OrcFileFormat]
+
+  override def shortName(): String = "orc"
+
+  private def getTableName(options: DataSourceOptions): String = {
+    shortName() + ":" + options.paths().mkString(";")
+  }
+
+  override def getTable(options: DataSourceOptions): Table = {
+    val tableName = getTableName(options)
+    val fileIndex = getFileIndex(options, None)
+    OrcTable(tableName, sparkSession, fileIndex, None)
+  }
+
+  override def getTable(options: DataSourceOptions, schema: StructType): Table = {
+    val tableName = getTableName(options)
+    val fileIndex = getFileIndex(options, Some(schema))
+    OrcTable(tableName, sparkSession, fileIndex, Some(schema))
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
new file mode 100644
index 0000000..f6fc0ca
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.orc
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.{OrcConf, OrcFile}
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcUtils}
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader}
+import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A factory used to create Orc readers.
+ *
+ * @param sqlConf SQL configuration.
+ * @param broadcastedConf Broadcast serializable Hadoop Configuration.
+ * @param dataSchema Schema of orc files.
+ * @param partitionSchema Schema of partitions.
+ * @param readSchema Required schema in the batch scan.
+ */
+case class OrcPartitionReaderFactory(
+    sqlConf: SQLConf,
+    broadcastedConf: Broadcast[SerializableConfiguration],
+    dataSchema: StructType,
+    partitionSchema: StructType,
+    readSchema: StructType) extends FilePartitionReaderFactory {
+  private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+  private val capacity = sqlConf.orcVectorizedReaderBatchSize
+
+  override def supportColumnarReads(partition: InputPartition): Boolean = {
+    sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
+      readSchema.length <= sqlConf.wholeStageMaxNumFields &&
+      readSchema.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+
+  override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = {
+    val conf = broadcastedConf.value.value
+
+    val filePath = new Path(new URI(file.filePath))
+
+    val fs = filePath.getFileSystem(conf)
+    val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
+    val reader = OrcFile.createReader(filePath, readerOptions)
+
+    val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
+      isCaseSensitive, dataSchema, readSchema, reader, conf)
+
+    if (requestedColIdsOrEmptyFile.isEmpty) {
+      new EmptyPartitionReader[InternalRow]
+    } else {
+      val requestedColIds = requestedColIdsOrEmptyFile.get
+      assert(requestedColIds.length == readSchema.length,
+        "[BUG] requested column IDs do not match required schema")
+      val taskConf = new Configuration(conf)
+      taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute,
+        requestedColIds.filter(_ != -1).sorted.mkString(","))
+
+      val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty)
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+      val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
+
+      val requiredDataSchema = subtractSchema(readSchema, partitionSchema)
+      val orcRecordReader = new OrcInputFormat[OrcStruct]
+        .createRecordReader(fileSplit, taskAttemptContext)
+
+      val fullSchema = requiredDataSchema.toAttributes ++ partitionSchema.toAttributes
+      val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+      val deserializer = new OrcDeserializer(dataSchema, requiredDataSchema, requestedColIds)
+
+      val projection = if (partitionSchema.length == 0) {
+        (value: OrcStruct) => unsafeProjection(deserializer.deserialize(value))
+      } else {
+        val joinedRow = new JoinedRow()
+        (value: OrcStruct) =>
+          unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues))
+      }
+      new PartitionRecordReaderWithProject(orcRecordReader, projection)
+    }
+  }
+
+  override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = {
+    val conf = broadcastedConf.value.value
+
+    val filePath = new Path(new URI(file.filePath))
+
+    val fs = filePath.getFileSystem(conf)
+    val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
+    val reader = OrcFile.createReader(filePath, readerOptions)
+
+    val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
+      isCaseSensitive, dataSchema, readSchema, reader, conf)
+
+    if (requestedColIdsOrEmptyFile.isEmpty) {
+      new EmptyPartitionReader
+    } else {
+      val requestedColIds = requestedColIdsOrEmptyFile.get
+      assert(requestedColIds.length == readSchema.length,
+        "[BUG] requested column IDs do not match required schema")
+      val taskConf = new Configuration(conf)
+      taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute,
+        requestedColIds.filter(_ != -1).sorted.mkString(","))
+
+      val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty)
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+      val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
+
+      val batchReader = new OrcColumnarBatchReader(capacity)
+      batchReader.initialize(fileSplit, taskAttemptContext)
+      val columnNameMap = partitionSchema.fields.map(
+        PartitioningUtils.getColName(_, isCaseSensitive)).zipWithIndex.toMap
+      val requestedPartitionColIds = readSchema.fields.map { field =>
+        columnNameMap.getOrElse(PartitioningUtils.getColName(field, isCaseSensitive), -1)
+      }
+
+      batchReader.initBatch(
+        reader.getSchema,
+        readSchema.fields,
+        requestedColIds,
+        requestedPartitionColIds,
+        file.partitionValues)
+      new PartitionRecordReader(batchReader)
+    }
+  }
+
+  /**
+   * Returns a new StructType that is a copy of the original StructType, removing any items that
+   * also appear in other StructType. The order is preserved from the original StructType.
+   */
+  private def subtractSchema(original: StructType, other: StructType): StructType = {
+    val otherNameSet = other.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet
+    val fields = original.fields.filterNot { field =>
+      otherNameSet.contains(PartitioningUtils.getColName(field, isCaseSensitive))
+    }
+
+    StructType(fields)
+  }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
new file mode 100644
index 0000000..a792ad3
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.orc
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScan
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+case class OrcScan(
+    sparkSession: SparkSession,
+    hadoopConf: Configuration,
+    fileIndex: PartitioningAwareFileIndex,
+    dataSchema: StructType,
+    readSchema: StructType) extends FileScan(sparkSession, fileIndex) {
+  override def isSplitable(path: Path): Boolean = true
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    val broadcastedConf = sparkSession.sparkContext.broadcast(
+      new SerializableConfiguration(hadoopConf))
+    OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
+      dataSchema, fileIndex.partitionSchema, readSchema)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
new file mode 100644
index 0000000..eb27bbd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2.orc
+
+import scala.collection.JavaConverters._
+
+import org.apache.orc.mapreduce.OrcInputFormat
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.orc.OrcFilters
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.types.StructType
+
+case class OrcScanBuilder(
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    schema: StructType,
+    dataSchema: StructType,
+    options: DataSourceOptions) extends FileScanBuilder(schema) {
+  lazy val hadoopConf =
+    sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
+
+  override def build(): Scan = {
+    OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema)
+  }
+
+  private var _pushedFilters: Array[Filter] = Array.empty
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    if (sparkSession.sessionState.conf.orcFilterPushDown) {
+      OrcFilters.createFilter(schema, filters).foreach { f =>
+        // The pushed filters will be set in `hadoopConf`. After that, we can simply use the
+        // changed `hadoopConf` in executors.
+        OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames)
+      }
+      val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
+      _pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray
+    }
+    filters
+  }
+
+  override def pushedFilters(): Array[Filter] = _pushedFilters
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
new file mode 100644
index 0000000..719e757
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.orc
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.orc.OrcUtils
+import org.apache.spark.sql.execution.datasources.v2.FileTable
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.types.StructType
+
+case class OrcTable(
+    name: String,
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    userSpecifiedSchema: Option[StructType])
+  extends FileTable(sparkSession, fileIndex, userSpecifiedSchema) {
+  override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder =
+    new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+
+  override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
+    OrcUtils.readSchema(sparkSession, files)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 319c264..a605dc6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -159,6 +159,7 @@ abstract class BaseSessionStateBuilder(
     override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
       new FindDataSourceTable(session) +:
         new ResolveSQLOnFile(session) +:
+        new FallbackOrcDataSourceV2(session) +:
         customResolutionRules
 
     override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index cf25f1c..d83deb1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 
 
@@ -234,7 +235,9 @@ object QueryTest {
       checkToRDD: Boolean = true): Option[String] = {
     val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
     if (checkToRDD) {
-      df.rdd.count()  // Also attempt to deserialize as an RDD [SPARK-15791]
+      SQLExecution.withSQLConfPropagated(df.sparkSession) {
+        df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791]
+      }
     }
 
     val sparkAnswer = try df.collect().toSeq catch {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index ee12f30..cccd8e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -24,11 +24,15 @@ import scala.collection.JavaConverters._
 
 import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
 
-import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
@@ -41,7 +45,7 @@ import org.apache.spark.sql.types._
  */
 class OrcFilterSuite extends OrcTest with SharedSQLContext {
 
-  private def checkFilterPredicate(
+  protected def checkFilterPredicate(
       df: DataFrame,
       predicate: Predicate,
       checker: (SearchArgument) => Unit): Unit = {
@@ -50,24 +54,24 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
       .select(output.map(e => Column(e)): _*)
       .where(Column(predicate))
 
-    var maybeRelation: Option[HadoopFsRelation] = None
-    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
-      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) =>
-        maybeRelation = Some(orcRelation)
-        filters
-    }.flatten.reduceLeftOption(_ && _)
-    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
-
-    val (_, selectedFilters, _) =
-      DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
-    assert(selectedFilters.nonEmpty, "No filter is pushed down")
-
-    val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters)
-    assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters")
-    checker(maybeFilter.get)
+    query.queryExecution.optimizedPlan match {
+      case PhysicalOperation(_, filters,
+        DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
+        assert(filters.nonEmpty, "No filter is analyzed from the given query")
+        val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava))
+        scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
+        val pushedFilters = scanBuilder.pushedFilters()
+        assert(pushedFilters.nonEmpty, "No filter is pushed down")
+        val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
+        assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pushedFilters")
+        checker(maybeFilter.get)
+
+      case _ =>
+        throw new AnalysisException("Can not match OrcTable in the query.")
+    }
   }
 
-  private def checkFilterPredicate
+  protected def checkFilterPredicate
       (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
       (implicit df: DataFrame): Unit = {
     def checkComparisonOperator(filter: SearchArgument) = {
@@ -77,7 +81,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
     checkFilterPredicate(df, predicate, checkComparisonOperator)
   }
 
-  private def checkFilterPredicate
+  protected def checkFilterPredicate
       (predicate: Predicate, stringExpr: String)
       (implicit df: DataFrame): Unit = {
     def checkLogicalOperator(filter: SearchArgument) = {
@@ -86,28 +90,32 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
     checkFilterPredicate(df, predicate, checkLogicalOperator)
   }
 
-  private def checkNoFilterPredicate
-      (predicate: Predicate)
+  protected def checkNoFilterPredicate
+      (predicate: Predicate, noneSupported: Boolean = false)
       (implicit df: DataFrame): Unit = {
     val output = predicate.collect { case a: Attribute => a }.distinct
     val query = df
       .select(output.map(e => Column(e)): _*)
       .where(Column(predicate))
 
-    var maybeRelation: Option[HadoopFsRelation] = None
-    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
-      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) =>
-        maybeRelation = Some(orcRelation)
-        filters
-    }.flatten.reduceLeftOption(_ && _)
-    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
-
-    val (_, selectedFilters, _) =
-      DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
-    assert(selectedFilters.nonEmpty, "No filter is pushed down")
-
-    val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters)
-    assert(maybeFilter.isEmpty, s"Could generate filter predicate for $selectedFilters")
+    query.queryExecution.optimizedPlan match {
+      case PhysicalOperation(_, filters,
+      DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
+        assert(filters.nonEmpty, "No filter is analyzed from the given query")
+        val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava))
+        scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
+        val pushedFilters = scanBuilder.pushedFilters()
+        if (noneSupported) {
+          assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
+        } else {
+          assert(pushedFilters.nonEmpty, "No filter is pushed down")
+          val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
+          assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
+        }
+
+      case _ =>
+        throw new AnalysisException("Can not match OrcTable in the query.")
+    }
   }
 
   test("filter pushdown - integer") {
@@ -346,15 +354,15 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
     }
     // ArrayType
     withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
-      checkNoFilterPredicate('_1.isNull)
+      checkNoFilterPredicate('_1.isNull, noneSupported = true)
     }
     // BinaryType
     withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
-      checkNoFilterPredicate('_1 <=> 1.b)
+      checkNoFilterPredicate('_1 <=> 1.b, noneSupported = true)
     }
     // MapType
     withOrcDataFrame((1 to 4).map(i => Tuple1(Map(i -> i)))) { implicit df =>
-      checkNoFilterPredicate('_1.isNotNull)
+      checkNoFilterPredicate('_1.isNotNull, noneSupported = true)
     }
   }
 
@@ -419,3 +427,4 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
     }
   }
 }
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
index d1911ea..4a695ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.datasources.orc
 
 import java.io.File
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 
 // The data where the partitioning key exists only in the directory structure.
@@ -227,3 +229,8 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
 }
 
 class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext
+
+class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
+  override protected def sparkConf: SparkConf =
+    super.sparkConf.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 918dbcd..d0b386b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -31,7 +31,7 @@ import org.apache.orc.OrcConf.COMPRESS
 import org.apache.orc.mapred.OrcStruct
 import org.apache.orc.mapreduce.OrcInputFormat
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
@@ -574,7 +574,7 @@ abstract class OrcQueryTest extends OrcTest {
       val m1 = intercept[AnalysisException] {
         testAllCorruptFiles()
       }.getMessage
-      assert(m1.contains("Unable to infer schema for ORC"))
+      assert(m1.contains("Unable to infer schema"))
       testAllCorruptFilesWithoutSchemaInfer()
     }
 
@@ -681,3 +681,8 @@ class OrcQuerySuite extends OrcQueryTest with SharedSQLContext {
     }
   }
 }
+
+class OrcV1QuerySuite extends OrcQuerySuite {
+  override protected def sparkConf: SparkConf =
+    super.sparkConf.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala
new file mode 100644
index 0000000..cf5bbb3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.orc
+
+import scala.collection.JavaConverters._
+
+import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+class OrcV1FilterSuite extends OrcFilterSuite {
+
+  override protected def sparkConf: SparkConf =
+    super.sparkConf.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
+
+  override def checkFilterPredicate(
+      df: DataFrame,
+      predicate: Predicate,
+      checker: (SearchArgument) => Unit): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+    val query = df
+      .select(output.map(e => Column(e)): _*)
+      .where(Column(predicate))
+
+    var maybeRelation: Option[HadoopFsRelation] = None
+    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) =>
+        maybeRelation = Some(orcRelation)
+        filters
+    }.flatten.reduceLeftOption(_ && _)
+    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
+
+    val (_, selectedFilters, _) =
+      DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
+    assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+    val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters)
+    assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters")
+    checker(maybeFilter.get)
+  }
+
+  override def checkFilterPredicate
+      (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
+      (implicit df: DataFrame): Unit = {
+    def checkComparisonOperator(filter: SearchArgument) = {
+      val operator = filter.getLeaves.asScala
+      assert(operator.map(_.getOperator).contains(filterOperator))
+    }
+    checkFilterPredicate(df, predicate, checkComparisonOperator)
+  }
+
+  override def checkFilterPredicate
+      (predicate: Predicate, stringExpr: String)
+      (implicit df: DataFrame): Unit = {
+    def checkLogicalOperator(filter: SearchArgument) = {
+      assert(filter.toString == stringExpr)
+    }
+    checkFilterPredicate(df, predicate, checkLogicalOperator)
+  }
+
+  override def checkNoFilterPredicate
+      (predicate: Predicate, noneSupported: Boolean = false)
+      (implicit df: DataFrame): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+    val query = df
+      .select(output.map(e => Column(e)): _*)
+      .where(Column(predicate))
+
+    var maybeRelation: Option[HadoopFsRelation] = None
+    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) =>
+        maybeRelation = Some(orcRelation)
+        filters
+    }.flatten.reduceLeftOption(_ && _)
+    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
+
+    val (_, selectedFilters, _) =
+      DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
+    assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+    val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters)
+    assert(maybeFilter.isEmpty, s"Could generate filter predicate for $selectedFilters")
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
new file mode 100644
index 0000000..f57c581
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 {
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def getTable(options: DataSourceOptions): Table = {
+    new DummyReadOnlyFileTable
+  }
+}
+
+class DummyReadOnlyFileTable extends Table with SupportsBatchRead {
+  override def name(): String = "dummy"
+
+  override def schema(): StructType = StructType(Nil)
+
+  override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+    throw new AnalysisException("Dummy file reader")
+  }
+}
+
+class FileDataSourceV2FallBackSuite extends QueryTest with ParquetTest with SharedSQLContext {
+
+  private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName
+
+  test("Fall back to v1 when writing to file with read only FileDataSourceV2") {
+    val df = spark.range(10).toDF()
+    withTempPath { file =>
+      val path = file.getCanonicalPath
+      // Writing file should fall back to v1 and succeed.
+      df.write.format(dummyParquetReaderV2).save(path)
+
+      // Validate write result with [[ParquetFileFormat]].
+      checkAnswer(spark.read.parquet(path), df)
+
+      // Dummy File reader should fail as expected.
+      val exception = intercept[AnalysisException] {
+        spark.read.format(dummyParquetReaderV2).load(path).collect()
+      }
+      assert(exception.message.equals("Dummy file reader"))
+    }
+  }
+
+  test("Fall back read path to v1 with configuration USE_V1_SOURCE_READER_LIST") {
+    val df = spark.range(10).toDF()
+    withTempPath { file =>
+      val path = file.getCanonicalPath
+      df.write.parquet(path)
+      Seq(
+        "foo,parquet,bar",
+        "ParQuet,bar,foo",
+        s"foobar,$dummyParquetReaderV2"
+      ).foreach { fallbackReaders =>
+        withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> fallbackReaders) {
+          // Reading file should fall back to v1 and succeed.
+          checkAnswer(spark.read.format(dummyParquetReaderV2).load(path), df)
+          checkAnswer(sql(s"SELECT * FROM parquet.`$path`"), df)
+        }
+      }
+
+      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "foo,bar") {
+        // Dummy File reader should fail as DISABLED_V2_FILE_DATA_SOURCE_READERS doesn't include it.
+        val exception = intercept[AnalysisException] {
+          spark.read.format(dummyParquetReaderV2).load(path).collect()
+        }
+        assert(exception.message.equals("Dummy file reader"))
+      }
+    }
+  }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 4f39147..132b0e4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -71,6 +71,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
       new ResolveHiveSerdeTable(session) +:
         new FindDataSourceTable(session) +:
         new ResolveSQLOnFile(session) +:
+        new FallbackOrcDataSourceV2(session) +:
         customResolutionRules
 
     override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d93215f..3402ed2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -908,7 +908,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       }
       assert(e.getMessage.contains(
         "The format of the existing table default.appendOrcToParquet is `ParquetFileFormat`. " +
-          "It doesn't match the specified format `OrcFileFormat`"))
+          "It doesn't match the specified format"))
     }
 
     withTable("appendParquetToJson") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org