You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/31 03:21:11 UTC

spark git commit: [SPARK-14206][SQL] buildReader() implementation for CSV

Repository: spark
Updated Branches:
  refs/heads/master da54abfd8 -> 26445c2e4


[SPARK-14206][SQL] buildReader() implementation for CSV

## What changes were proposed in this pull request?

Major changes:

1. Implement `FileFormat.buildReader()` for the CSV data source.
1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema.

   This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file.

## How was this patch tested?

Existing tests should do the work.

Author: Cheng Lian <li...@databricks.com>

Closes #12002 from liancheng/spark-14206-csv-build-reader.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26445c2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26445c2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26445c2e

Branch: refs/heads/master
Commit: 26445c2e472bad137fd350e4089dd0ff43a42039
Parents: da54abf
Author: Cheng Lian <li...@databricks.com>
Authored: Wed Mar 30 18:21:06 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 30 18:21:06 2016 -0700

----------------------------------------------------------------------
 .../datasources/FileSourceStrategy.scala        | 16 ++----
 .../execution/datasources/csv/CSVRelation.scala | 41 +++++++++++++---
 .../datasources/csv/DefaultSource.scala         | 51 ++++++++++++++++++--
 .../datasources/json/JSONRelation.scala         |  7 +--
 .../datasources/parquet/ParquetRelation.scala   | 26 +++-------
 .../datasources/text/DefaultSource.scala        |  3 +-
 .../apache/spark/sql/sources/interfaces.scala   | 18 ++++---
 .../datasources/FileSourceStrategySuite.scala   |  5 +-
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 15 +++---
 9 files changed, 119 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index d653408..5542987 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -59,8 +59,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       if (files.fileFormat.toString == "TestFileFormat" ||
          files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
          files.fileFormat.toString == "ORC" ||
-         files.fileFormat.isInstanceOf[json.DefaultSource] ||
-         files.fileFormat.isInstanceOf[text.DefaultSource]) &&
+         files.fileFormat.isInstanceOf[csv.DefaultSource] ||
+         files.fileFormat.isInstanceOf[text.DefaultSource] ||
+         files.fileFormat.isInstanceOf[json.DefaultSource]) &&
          files.sqlContext.conf.useFileScan =>
       // Filters on this relation fall into four categories based on where we can use them to avoid
       // reading unneeded data:
@@ -80,14 +81,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       val dataColumns =
         l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
 
-      val bucketColumns =
-        AttributeSet(
-          files.bucketSpec
-            .map(_.bucketColumnNames)
-            .getOrElse(Nil)
-            .map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
-              .getOrElse(sys.error(""))))
-
       // Partition keys are not available in the statistics of the files.
       val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
 
@@ -113,8 +106,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
 
       val readFile = files.fileFormat.buildReader(
         sqlContext = files.sqlContext,
+        dataSchema = files.dataSchema,
         partitionSchema = files.partitionSchema,
-        dataSchema = prunedDataSchema,
+        requiredSchema = prunedDataSchema,
         filters = pushedDownFilters,
         options = files.options)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 5501015..b47328a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.csv
 
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.RecordWriter
 import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 
@@ -49,14 +50,10 @@ object CSVRelation extends Logging {
     }, true)
   }
 
-  def parseCsv(
-      tokenizedRDD: RDD[Array[String]],
+  def csvParser(
       schema: StructType,
       requiredColumns: Array[String],
-      inputs: Seq[FileStatus],
-      sqlContext: SQLContext,
-      params: CSVOptions): RDD[InternalRow] = {
-
+      params: CSVOptions): Array[String] => Option[InternalRow] = {
     val schemaFields = schema.fields
     val requiredFields = StructType(requiredColumns.map(schema(_))).fields
     val safeRequiredFields = if (params.dropMalformed) {
@@ -74,7 +71,8 @@ object CSVRelation extends Logging {
     }
     val requiredSize = requiredFields.length
     val row = new GenericMutableRow(requiredSize)
-    tokenizedRDD.flatMap { tokens =>
+
+    (tokens: Array[String]) => {
       if (params.dropMalformed && schemaFields.length != tokens.length) {
         logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
         None
@@ -118,6 +116,33 @@ object CSVRelation extends Logging {
       }
     }
   }
+
+  def parseCsv(
+      tokenizedRDD: RDD[Array[String]],
+      schema: StructType,
+      requiredColumns: Array[String],
+      options: CSVOptions): RDD[InternalRow] = {
+    val parser = csvParser(schema, requiredColumns, options)
+    tokenizedRDD.flatMap(parser(_).toSeq)
+  }
+
+  // Skips the header line of each file if the `header` option is set to true.
+  def dropHeaderLine(
+      file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = {
+    // TODO What if the first partitioned file consists of only comments and empty lines?
+    if (csvOptions.headerFlag && file.start == 0) {
+      val nonEmptyLines = if (csvOptions.isCommentSet) {
+        val commentPrefix = csvOptions.comment.toString
+        lines.dropWhile { line =>
+          line.trim.isEmpty || line.trim.startsWith(commentPrefix)
+        }
+      } else {
+        lines.dropWhile(_.trim.isEmpty)
+      }
+
+      if (nonEmptyLines.hasNext) nonEmptyLines.drop(1)
+    }
+  }
 }
 
 private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {

http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 54e4c1a..6b6add4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -19,17 +19,19 @@ package org.apache.spark.sql.execution.datasources.csv
 
 import java.nio.charset.{Charset, StandardCharsets}
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce._
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -91,6 +93,46 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     new CSVOutputWriterFactory(csvOptions)
   }
 
+  override def buildReader(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+    val csvOptions = new CSVOptions(options)
+    val headers = requiredSchema.fields.map(_.name)
+
+    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+    (file: PartitionedFile) => {
+      val lineIterator = {
+        val conf = broadcastedConf.value.value
+        new HadoopFileLinesReader(file, conf).map { line =>
+          new String(line.getBytes, 0, line.getLength, csvOptions.charset)
+        }
+      }
+
+      CSVRelation.dropHeaderLine(file, lineIterator, csvOptions)
+
+      val unsafeRowIterator = {
+        val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
+        val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
+        tokenizedIterator.flatMap(parser(_).toSeq)
+      }
+
+      // Appends partition values
+      val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+      val joinedRow = new JoinedRow()
+      val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
+
+      unsafeRowIterator.map { dataRow =>
+        appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
+      }
+    }
+  }
+
   /**
    * This supports to eliminate unneeded columns before producing an RDD
    * containing all of its tuples as Row objects. This reads all the tokens of each line
@@ -113,8 +155,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     val pathsString = csvFiles.map(_.getPath.toUri.toString)
     val header = dataSchema.fields.map(_.name)
     val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString)
-    val rows = CSVRelation.parseCsv(
-      tokenizedRdd, dataSchema, requiredColumns, csvFiles, sqlContext, csvOptions)
+    val rows = CSVRelation.parseCsv(tokenizedRdd, dataSchema, requiredColumns, csvOptions)
 
     val requiredDataSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get))
     rows.mapPartitions { iterator =>

http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 21fc122..42cd25a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -124,8 +124,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
 
   override def buildReader(
       sqlContext: SQLContext,
-      partitionSchema: StructType,
       dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
     val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -136,7 +137,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
       .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
 
-    val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+    val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
     val joinedRow = new JoinedRow()
 
     file => {
@@ -144,7 +145,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
 
       val rows = JacksonParser.parseJson(
         lines,
-        dataSchema,
+        requiredSchema,
         columnNameOfCorruptRecord,
         parsedOptions)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index d6b84be..5b58fa1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -276,38 +276,26 @@ private[sql] class DefaultSource
         file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
-  /**
-   * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
-   *
-   * @param partitionSchema The schema of the partition column row that will be present in each
-   *                        PartitionedFile.  These columns should be prepended to the rows that
-   *                        are produced by the iterator.
-   * @param dataSchema The schema of the data that should be output for each row.  This may be a
-   *                   subset of the columns that are present in the file if  column pruning has
-   *                   occurred.
-   * @param filters A set of filters than can optionally be used to reduce the number of rows output
-   * @param options A set of string -> string configuration options.
-   * @return
-   */
   override def buildReader(
       sqlContext: SQLContext,
-      partitionSchema: StructType,
       dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
     val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
     parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
     parquetConf.set(
       CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+      CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
     parquetConf.set(
       CatalystWriteSupport.SPARK_ROW_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+      CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
 
     // We want to clear this temporary metadata from saving into Parquet file.
     // This metadata is only useful for detecting optional columns when pushdowning filters.
     val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
-      dataSchema).asInstanceOf[StructType]
+      requiredSchema).asInstanceOf[StructType]
     CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
 
     // Sets flags for `CatalystSchemaConverter`
@@ -324,7 +312,7 @@ private[sql] class DefaultSource
         // Collects all converted Parquet filter predicates. Notice that not all predicates can be
         // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
         // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
+        .flatMap(ParquetFilters.createFilter(requiredSchema, _))
         .reduceOption(FilterApi.and)
     } else {
       None
@@ -394,7 +382,7 @@ private[sql] class DefaultSource
           enableVectorizedParquetReader) {
         iter.asInstanceOf[Iterator[InternalRow]]
       } else {
-        val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+        val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
         val joinedRow = new JoinedRow()
         val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index d6ab5fc..99459ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -129,8 +129,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
 
   override def buildReader(
       sqlContext: SQLContext,
-      partitionSchema: StructType,
       dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
     val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)

http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 1e02354..6b95a3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -385,9 +385,9 @@ abstract class OutputWriter {
  *
  * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
  *                 this relation.
- * @param partitionSchema The schmea of the columns (if any) that are used to partition the relation
+ * @param partitionSchema The schema of the columns (if any) that are used to partition the relation
  * @param dataSchema The schema of any remaining columns.  Note that if any partition columns are
- *                   present in the actual data files as well, they are removed.
+ *                   present in the actual data files as well, they are preserved.
  * @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
  * @param fileFormat A file format that can be used to read and write the data in files.
  * @param options Configuration used when reading / writing data.
@@ -462,20 +462,24 @@ trait FileFormat {
   /**
    * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
    *
+   * @param dataSchema The global data schema. It can be either specified by the user, or
+   *                   reconciled/merged from all underlying data files. If any partition columns
+   *                   are contained in the files, they are preserved in this schema.
    * @param partitionSchema The schema of the partition column row that will be present in each
-   *                        PartitionedFile.  These columns should be prepended to the rows that
+   *                        PartitionedFile. These columns should be appended to the rows that
    *                        are produced by the iterator.
-   * @param dataSchema The schema of the data that should be output for each row.  This may be a
-   *                   subset of the columns that are present in the file if  column pruning has
-   *                   occurred.
+   * @param requiredSchema The schema of the data that should be output for each row.  This may be a
+   *                       subset of the columns that are present in the file if column pruning has
+   *                       occurred.
    * @param filters A set of filters than can optionally be used to reduce the number of rows output
    * @param options A set of string -> string configuration options.
    * @return
    */
   def buildReader(
       sqlContext: SQLContext,
-      partitionSchema: StructType,
       dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
     // TODO: Remove this default implementation when the other formats have been ported

http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 45620bc..717a3a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -376,14 +376,15 @@ class TestFileFormat extends FileFormat {
 
   override def buildReader(
       sqlContext: SQLContext,
-      partitionSchema: StructType,
       dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
 
     // Record the arguments so they can be checked in the test case.
     LastArguments.partitionSchema = partitionSchema
-    LastArguments.dataSchema = dataSchema
+    LastArguments.dataSchema = requiredSchema
     LastArguments.filters = filters
     LastArguments.options = options
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 7c4a0a0..43f445e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -126,8 +126,9 @@ private[sql] class DefaultSource
 
   override def buildReader(
       sqlContext: SQLContext,
-      partitionSchema: StructType,
       dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
     val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -145,15 +146,15 @@ private[sql] class DefaultSource
     (file: PartitionedFile) => {
       val conf = broadcastedConf.value.value
 
-      // SPARK-8501: Empty ORC files always have an empty schema stored in their footer.  In this
-      // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty
-      // iterator.
+      // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
+      // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
+      // using the given physical schema. Instead, we simply return an empty iterator.
       val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
       if (maybePhysicalSchema.isEmpty) {
         Iterator.empty
       } else {
         val physicalSchema = maybePhysicalSchema.get
-        OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)
+        OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
 
         val orcRecordReader = {
           val job = Job.getInstance(conf)
@@ -171,11 +172,11 @@ private[sql] class DefaultSource
 
         // Unwraps `OrcStruct`s to `UnsafeRow`s
         val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
-          file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
+          file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
         )
 
         // Appends partition values
-        val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes
+        val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
         val joinedRow = new JoinedRow()
         val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org