You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/31 03:21:11 UTC
spark git commit: [SPARK-14206][SQL] buildReader() implementation for
CSV
Repository: spark
Updated Branches:
refs/heads/master da54abfd8 -> 26445c2e4
[SPARK-14206][SQL] buildReader() implementation for CSV
## What changes were proposed in this pull request?
Major changes:
1. Implement `FileFormat.buildReader()` for the CSV data source.
1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema.
This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file.
## How was this patch tested?
Existing tests should do the work.
Author: Cheng Lian <li...@databricks.com>
Closes #12002 from liancheng/spark-14206-csv-build-reader.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26445c2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26445c2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26445c2e
Branch: refs/heads/master
Commit: 26445c2e472bad137fd350e4089dd0ff43a42039
Parents: da54abf
Author: Cheng Lian <li...@databricks.com>
Authored: Wed Mar 30 18:21:06 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 30 18:21:06 2016 -0700
----------------------------------------------------------------------
.../datasources/FileSourceStrategy.scala | 16 ++----
.../execution/datasources/csv/CSVRelation.scala | 41 +++++++++++++---
.../datasources/csv/DefaultSource.scala | 51 ++++++++++++++++++--
.../datasources/json/JSONRelation.scala | 7 +--
.../datasources/parquet/ParquetRelation.scala | 26 +++-------
.../datasources/text/DefaultSource.scala | 3 +-
.../apache/spark/sql/sources/interfaces.scala | 18 ++++---
.../datasources/FileSourceStrategySuite.scala | 5 +-
.../apache/spark/sql/hive/orc/OrcRelation.scala | 15 +++---
9 files changed, 119 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index d653408..5542987 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -59,8 +59,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
files.fileFormat.toString == "ORC" ||
- files.fileFormat.isInstanceOf[json.DefaultSource] ||
- files.fileFormat.isInstanceOf[text.DefaultSource]) &&
+ files.fileFormat.isInstanceOf[csv.DefaultSource] ||
+ files.fileFormat.isInstanceOf[text.DefaultSource] ||
+ files.fileFormat.isInstanceOf[json.DefaultSource]) &&
files.sqlContext.conf.useFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
@@ -80,14 +81,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val dataColumns =
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
- val bucketColumns =
- AttributeSet(
- files.bucketSpec
- .map(_.bucketColumnNames)
- .getOrElse(Nil)
- .map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
- .getOrElse(sys.error(""))))
-
// Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
@@ -113,8 +106,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val readFile = files.fileFormat.buildReader(
sqlContext = files.sqlContext,
+ dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
- dataSchema = prunedDataSchema,
+ requiredSchema = prunedDataSchema,
filters = pushedDownFilters,
options = files.options)
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 5501015..b47328a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.csv
import scala.util.control.NonFatal
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.RecordWriter
import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -49,14 +50,10 @@ object CSVRelation extends Logging {
}, true)
}
- def parseCsv(
- tokenizedRDD: RDD[Array[String]],
+ def csvParser(
schema: StructType,
requiredColumns: Array[String],
- inputs: Seq[FileStatus],
- sqlContext: SQLContext,
- params: CSVOptions): RDD[InternalRow] = {
-
+ params: CSVOptions): Array[String] => Option[InternalRow] = {
val schemaFields = schema.fields
val requiredFields = StructType(requiredColumns.map(schema(_))).fields
val safeRequiredFields = if (params.dropMalformed) {
@@ -74,7 +71,8 @@ object CSVRelation extends Logging {
}
val requiredSize = requiredFields.length
val row = new GenericMutableRow(requiredSize)
- tokenizedRDD.flatMap { tokens =>
+
+ (tokens: Array[String]) => {
if (params.dropMalformed && schemaFields.length != tokens.length) {
logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
None
@@ -118,6 +116,33 @@ object CSVRelation extends Logging {
}
}
}
+
+ def parseCsv(
+ tokenizedRDD: RDD[Array[String]],
+ schema: StructType,
+ requiredColumns: Array[String],
+ options: CSVOptions): RDD[InternalRow] = {
+ val parser = csvParser(schema, requiredColumns, options)
+ tokenizedRDD.flatMap(parser(_).toSeq)
+ }
+
+ // Skips the header line of each file if the `header` option is set to true.
+ def dropHeaderLine(
+ file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = {
+ // TODO What if the first partitioned file consists of only comments and empty lines?
+ if (csvOptions.headerFlag && file.start == 0) {
+ val nonEmptyLines = if (csvOptions.isCommentSet) {
+ val commentPrefix = csvOptions.comment.toString
+ lines.dropWhile { line =>
+ line.trim.isEmpty || line.trim.startsWith(commentPrefix)
+ }
+ } else {
+ lines.dropWhile(_.trim.isEmpty)
+ }
+
+ if (nonEmptyLines.hasNext) nonEmptyLines.drop(1)
+ }
+ }
}
private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 54e4c1a..6b6add4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -19,17 +19,19 @@ package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.{Charset, StandardCharsets}
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -91,6 +93,46 @@ class DefaultSource extends FileFormat with DataSourceRegister {
new CSVOutputWriterFactory(csvOptions)
}
+ override def buildReader(
+ sqlContext: SQLContext,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ val csvOptions = new CSVOptions(options)
+ val headers = requiredSchema.fields.map(_.name)
+
+ val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+ (file: PartitionedFile) => {
+ val lineIterator = {
+ val conf = broadcastedConf.value.value
+ new HadoopFileLinesReader(file, conf).map { line =>
+ new String(line.getBytes, 0, line.getLength, csvOptions.charset)
+ }
+ }
+
+ CSVRelation.dropHeaderLine(file, lineIterator, csvOptions)
+
+ val unsafeRowIterator = {
+ val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
+ val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
+ tokenizedIterator.flatMap(parser(_).toSeq)
+ }
+
+ // Appends partition values
+ val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+ val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
+
+ unsafeRowIterator.map { dataRow =>
+ appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
+ }
+ }
+ }
+
/**
* This supports to eliminate unneeded columns before producing an RDD
* containing all of its tuples as Row objects. This reads all the tokens of each line
@@ -113,8 +155,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val pathsString = csvFiles.map(_.getPath.toUri.toString)
val header = dataSchema.fields.map(_.name)
val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString)
- val rows = CSVRelation.parseCsv(
- tokenizedRdd, dataSchema, requiredColumns, csvFiles, sqlContext, csvOptions)
+ val rows = CSVRelation.parseCsv(tokenizedRdd, dataSchema, requiredColumns, csvOptions)
val requiredDataSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get))
rows.mapPartitions { iterator =>
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 21fc122..42cd25a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -124,8 +124,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -136,7 +137,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
.getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
- val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
file => {
@@ -144,7 +145,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val rows = JacksonParser.parseJson(
lines,
- dataSchema,
+ requiredSchema,
columnNameOfCorruptRecord,
parsedOptions)
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index d6b84be..5b58fa1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -276,38 +276,26 @@ private[sql] class DefaultSource
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
- /**
- * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
- *
- * @param partitionSchema The schema of the partition column row that will be present in each
- * PartitionedFile. These columns should be prepended to the rows that
- * are produced by the iterator.
- * @param dataSchema The schema of the data that should be output for each row. This may be a
- * subset of the columns that are present in the file if column pruning has
- * occurred.
- * @param filters A set of filters than can optionally be used to reduce the number of rows output
- * @param options A set of string -> string configuration options.
- * @return
- */
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
parquetConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+ CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
parquetConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
- CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+ CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
- dataSchema).asInstanceOf[StructType]
+ requiredSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
// Sets flags for `CatalystSchemaConverter`
@@ -324,7 +312,7 @@ private[sql] class DefaultSource
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
- .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .flatMap(ParquetFilters.createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
@@ -394,7 +382,7 @@ private[sql] class DefaultSource
enableVectorizedParquetReader) {
iter.asInstanceOf[Iterator[InternalRow]]
} else {
- val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index d6ab5fc..99459ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -129,8 +129,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 1e02354..6b95a3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -385,9 +385,9 @@ abstract class OutputWriter {
*
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
* this relation.
- * @param partitionSchema The schmea of the columns (if any) that are used to partition the relation
+ * @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
- * present in the actual data files as well, they are removed.
+ * present in the actual data files as well, they are preserved.
* @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
* @param fileFormat A file format that can be used to read and write the data in files.
* @param options Configuration used when reading / writing data.
@@ -462,20 +462,24 @@ trait FileFormat {
/**
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
*
+ * @param dataSchema The global data schema. It can be either specified by the user, or
+ * reconciled/merged from all underlying data files. If any partition columns
+ * are contained in the files, they are preserved in this schema.
* @param partitionSchema The schema of the partition column row that will be present in each
- * PartitionedFile. These columns should be prepended to the rows that
+ * PartitionedFile. These columns should be appended to the rows that
* are produced by the iterator.
- * @param dataSchema The schema of the data that should be output for each row. This may be a
- * subset of the columns that are present in the file if column pruning has
- * occurred.
+ * @param requiredSchema The schema of the data that should be output for each row. This may be a
+ * subset of the columns that are present in the file if column pruning has
+ * occurred.
* @param filters A set of filters than can optionally be used to reduce the number of rows output
* @param options A set of string -> string configuration options.
* @return
*/
def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
// TODO: Remove this default implementation when the other formats have been ported
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 45620bc..717a3a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -376,14 +376,15 @@ class TestFileFormat extends FileFormat {
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
// Record the arguments so they can be checked in the test case.
LastArguments.partitionSchema = partitionSchema
- LastArguments.dataSchema = dataSchema
+ LastArguments.dataSchema = requiredSchema
LastArguments.filters = filters
LastArguments.options = options
http://git-wip-us.apache.org/repos/asf/spark/blob/26445c2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 7c4a0a0..43f445e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -126,8 +126,9 @@ private[sql] class DefaultSource
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -145,15 +146,15 @@ private[sql] class DefaultSource
(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
- // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
- // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty
- // iterator.
+ // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
+ // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
+ // using the given physical schema. Instead, we simply return an empty iterator.
val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
if (maybePhysicalSchema.isEmpty) {
Iterator.empty
} else {
val physicalSchema = maybePhysicalSchema.get
- OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)
+ OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
val orcRecordReader = {
val job = Job.getInstance(conf)
@@ -171,11 +172,11 @@ private[sql] class DefaultSource
// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
- file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
+ file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
)
// Appends partition values
- val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org