You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/25 04:38:08 UTC

spark git commit: [SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV parser

Repository: spark
Updated Branches:
  refs/heads/master 3b20b34ab -> 64fad0b51


[SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV parser

## What changes were proposed in this pull request?

uniVocity parser allows to specify only required column names or indexes for [parsing](https://www.univocity.com/pages/parsers-tutorial) like:

```
// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);
```
In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns:

```
Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2
```

**Note**: Comparing to current implementation, the changes can return different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes if only subset of all columns is requested. To have previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.

## How was this patch tested?

It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks.

Author: Maxim Gekk <ma...@databricks.com>
Author: Maxim Gekk <ma...@gmail.com>

Closes #21415 from MaxGekk/csv-column-pruning2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64fad0b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64fad0b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64fad0b5

Branch: refs/heads/master
Commit: 64fad0b519cf35b8c0a0dec18dd3df9488a5ed25
Parents: 3b20b34
Author: Maxim Gekk <ma...@databricks.com>
Authored: Thu May 24 21:38:04 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Thu May 24 21:38:04 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  1 +
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 ++++
 .../org/apache/spark/sql/DataFrameReader.scala  |  1 +
 .../datasources/csv/CSVFileFormat.scala         | 18 +++++---
 .../execution/datasources/csv/CSVOptions.scala  |  3 ++
 .../datasources/csv/UnivocityParser.scala       | 26 ++++++-----
 .../datasources/csv/CSVBenchmarks.scala         | 46 ++++++++++++++++++++
 .../datasources/csv/CSVInferSchemaSuite.scala   | 22 +++++-----
 .../execution/datasources/csv/CSVSuite.scala    | 41 ++++++++++++++---
 .../datasources/csv/UnivocityParserSuite.scala  | 37 ++++++++--------
 10 files changed, 152 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index f1ed316..fc26562 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1825,6 +1825,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
   - In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary w
 orkaround.
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
+  - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
 
 ## Upgrading From Spark SQL 2.2 to 2.3
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 93d356f..8d2320d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1307,6 +1307,13 @@ object SQLConf {
   object Replaced {
     val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
   }
+
+  val CSV_PARSER_COLUMN_PRUNING = buildConf("spark.sql.csv.parser.columnPruning.enabled")
+    .internal()
+    .doc("If it is set to true, column names of the requested schema are passed to CSV parser. " +
+      "Other column values can be ignored during parsing even if they are malformed.")
+    .booleanConf
+    .createWithDefault(true)
 }
 
 /**
@@ -1664,6 +1671,8 @@ class SQLConf extends Serializable with Logging {
   def partitionOverwriteMode: PartitionOverwriteMode.Value =
     PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
 
+  def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 917f0cb..ac4580a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -480,6 +480,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
   def csv(csvDataset: Dataset[String]): DataFrame = {
     val parsedOptions: CSVOptions = new CSVOptions(
       extraOptions.toMap,
+      sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone)
     val filteredLines: Dataset[String] =
       CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions)

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index e20977a..21279d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -41,8 +41,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
       sparkSession: SparkSession,
       options: Map[String, String],
       path: Path): Boolean = {
-    val parsedOptions =
-      new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
+    val parsedOptions = new CSVOptions(
+      options,
+      columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
     val csvDataSource = CSVDataSource(parsedOptions)
     csvDataSource.isSplitable && super.isSplitable(sparkSession, options, path)
   }
@@ -51,8 +53,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
       sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
-    val parsedOptions =
-      new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
+    val parsedOptions = new CSVOptions(
+      options,
+      columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
 
     CSVDataSource(parsedOptions).inferSchema(sparkSession, files, parsedOptions)
   }
@@ -64,7 +68,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
       dataSchema: StructType): OutputWriterFactory = {
     CSVUtils.verifySchema(dataSchema)
     val conf = job.getConfiguration
-    val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
+    val csvOptions = new CSVOptions(
+      options,
+      columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
     csvOptions.compressionCodec.foreach { codec =>
       CompressionCodecs.setCodecConfiguration(conf, codec)
     }
@@ -97,6 +104,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
     val parsedOptions = new CSVOptions(
       options,
+      sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone,
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 1066d15..7119189 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -28,16 +28,19 @@ import org.apache.spark.sql.catalyst.util._
 
 class CSVOptions(
     @transient val parameters: CaseInsensitiveMap[String],
+    val columnPruning: Boolean,
     defaultTimeZoneId: String,
     defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable {
 
   def this(
     parameters: Map[String, String],
+    columnPruning: Boolean,
     defaultTimeZoneId: String,
     defaultColumnNameOfCorruptRecord: String = "") = {
       this(
         CaseInsensitiveMap(parameters),
+        columnPruning,
         defaultTimeZoneId,
         defaultColumnNameOfCorruptRecord)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 99557a1..4f00cc5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -34,10 +34,10 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 class UnivocityParser(
-    schema: StructType,
+    dataSchema: StructType,
     requiredSchema: StructType,
     val options: CSVOptions) extends Logging {
-  require(requiredSchema.toSet.subsetOf(schema.toSet),
+  require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
     "requiredSchema should be the subset of schema.")
 
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)
@@ -45,9 +45,17 @@ class UnivocityParser(
   // A `ValueConverter` is responsible for converting the given value to a desired type.
   private type ValueConverter = String => Any
 
-  private val tokenizer = new CsvParser(options.asParserSettings)
+  private val tokenizer = {
+    val parserSetting = options.asParserSettings
+    if (options.columnPruning && requiredSchema.length < dataSchema.length) {
+      val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f)))
+      parserSetting.selectIndexes(tokenIndexArr: _*)
+    }
+    new CsvParser(parserSetting)
+  }
+  private val schema = if (options.columnPruning) requiredSchema else dataSchema
 
-  private val row = new GenericInternalRow(requiredSchema.length)
+  private val row = new GenericInternalRow(schema.length)
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
@@ -73,11 +81,8 @@ class UnivocityParser(
   // Each input token is placed in each output row's position by mapping these. In this case,
   //
   //   output row - ["A", 2]
-  private val valueConverters: Array[ValueConverter] =
+  private val valueConverters: Array[ValueConverter] = {
     schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
-
-  private val tokenIndexArr: Array[Int] = {
-    requiredSchema.map(f => schema.indexOf(f)).toArray
   }
 
   /**
@@ -210,9 +215,8 @@ class UnivocityParser(
     } else {
       try {
         var i = 0
-        while (i < requiredSchema.length) {
-          val from = tokenIndexArr(i)
-          row(i) = valueConverters(from).apply(tokens(from))
+        while (i < schema.length) {
+          row(i) = valueConverters(i).apply(tokens(i))
           i += 1
         }
         row

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
index d442ba7..1a3dacb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
@@ -74,7 +74,53 @@ object CSVBenchmarks {
     }
   }
 
+  def multiColumnsBenchmark(rowsNum: Int): Unit = {
+    val colsNum = 1000
+    val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum)
+
+    withTempPath { path =>
+      val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
+      val schema = StructType(fields)
+      val values = (0 until colsNum).map(i => i.toString).mkString(",")
+      val columnNames = schema.fieldNames
+
+      spark.range(rowsNum)
+        .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
+        .write.option("header", true)
+        .csv(path.getAbsolutePath)
+
+      val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
+
+      benchmark.addCase(s"Select $colsNum columns", 3) { _ =>
+        ds.select("*").filter((row: Row) => true).count()
+      }
+      val cols100 = columnNames.take(100).map(Column(_))
+      benchmark.addCase(s"Select 100 columns", 3) { _ =>
+        ds.select(cols100: _*).filter((row: Row) => true).count()
+      }
+      benchmark.addCase(s"Select one column", 3) { _ =>
+        ds.select($"col1").filter((row: Row) => true).count()
+      }
+      benchmark.addCase(s"count()", 3) { _ =>
+        ds.count()
+      }
+
+      /*
+      Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
+
+      Wide rows with 1000 columns:         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+      --------------------------------------------------------------------------------------------
+      Select 1000 columns                     81091 / 81692          0.0       81090.7       1.0X
+      Select 100 columns                      30003 / 34448          0.0       30003.0       2.7X
+      Select one column                       24792 / 24855          0.0       24792.0       3.3X
+      count()                                 24344 / 24642          0.0       24343.8       3.3X
+      */
+      benchmark.run()
+    }
+  }
+
   def main(args: Array[String]): Unit = {
     quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3)
+    multiColumnsBenchmark(rowsNum = 1000 * 1000)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index 6617420..842251b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types._
 class CSVInferSchemaSuite extends SparkFunSuite {
 
   test("String fields types are inferred correctly from null types") {
-    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
     assert(CSVInferSchema.inferField(NullType, "", options) == NullType)
     assert(CSVInferSchema.inferField(NullType, null, options) == NullType)
     assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType)
@@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("String fields types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
     assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType)
     assert(CSVInferSchema.inferField(LongType, "test", options) == StringType)
     assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType)
@@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("Timestamp field types are inferred correctly via custom data format") {
-    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
+    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), false, "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
-    options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
+    options = new CSVOptions(Map("timestampFormat" -> "yyyy"), false, "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
   }
 
   test("Timestamp field types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
     assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
     assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType)
     assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType)
   }
 
   test("Boolean fields types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
     assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType)
     assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType)
   }
@@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("Null fields are handled properly when a nullValue is specified") {
-    var options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
+    var options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT")
     assert(CSVInferSchema.inferField(NullType, "null", options) == NullType)
     assert(CSVInferSchema.inferField(StringType, "null", options) == StringType)
     assert(CSVInferSchema.inferField(LongType, "null", options) == LongType)
 
-    options = new CSVOptions(Map("nullValue" -> "\\N"), "GMT")
+    options = new CSVOptions(Map("nullValue" -> "\\N"), false, "GMT")
     assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType)
     assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType)
     assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType)
@@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
-    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT")
+    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), false, "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
   }
 
   test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") {
-    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
 
     // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9).
     assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) ==
@@ -134,7 +134,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
 
   test("DoubleType should be infered when user defined nan/inf are provided") {
     val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf",
-      "positiveInf" -> "inf"), "GMT")
+      "positiveInf" -> "inf"), false, "GMT")
     assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType)
     assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType)
     assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType)

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2bac1a3..afe10bd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -260,14 +260,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
   }
 
   test("test for DROPMALFORMED parsing mode") {
-    Seq(false, true).foreach { multiLine =>
-      val cars = spark.read
-        .format("csv")
-        .option("multiLine", multiLine)
-        .options(Map("header" -> "true", "mode" -> "dropmalformed"))
-        .load(testFile(carsFile))
+    withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") {
+      Seq(false, true).foreach { multiLine =>
+        val cars = spark.read
+          .format("csv")
+          .option("multiLine", multiLine)
+          .options(Map("header" -> "true", "mode" -> "dropmalformed"))
+          .load(testFile(carsFile))
 
-      assert(cars.select("year").collect().size === 2)
+        assert(cars.select("year").collect().size === 2)
+      }
     }
   }
 
@@ -1383,4 +1385,29 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
 
     checkAnswer(ds, Seq(Row(""" "a" """)))
   }
+
+  test("SPARK-24244: Select a subset of all columns") {
+    withTempPath { path =>
+      import collection.JavaConverters._
+      val schema = new StructType()
+        .add("f1", IntegerType).add("f2", IntegerType).add("f3", IntegerType)
+        .add("f4", IntegerType).add("f5", IntegerType).add("f6", IntegerType)
+        .add("f7", IntegerType).add("f8", IntegerType).add("f9", IntegerType)
+        .add("f10", IntegerType).add("f11", IntegerType).add("f12", IntegerType)
+        .add("f13", IntegerType).add("f14", IntegerType).add("f15", IntegerType)
+
+      val odf = spark.createDataFrame(List(
+        Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
+        Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15)
+      ).asJava, schema)
+      odf.write.csv(path.getCanonicalPath)
+      val idf = spark.read
+        .schema(schema)
+        .csv(path.getCanonicalPath)
+        .select('f15, 'f10, 'f5)
+
+      assert(idf.count() == 2)
+      checkAnswer(idf, List(Row(15, 10, 5), Row(-15, -10, -5)))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64fad0b5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
index efbf735..458edb2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
@@ -26,8 +26,9 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 class UnivocityParserSuite extends SparkFunSuite {
-  private val parser =
-    new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String], "GMT"))
+  private val parser = new UnivocityParser(
+    StructType(Seq.empty),
+    new CSVOptions(Map.empty[String, String], false, "GMT"))
 
   private def assertNull(v: Any) = assert(v == null)
 
@@ -38,7 +39,7 @@ class UnivocityParserSuite extends SparkFunSuite {
 
     stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) =>
       val decimalValue = new BigDecimal(decimalVal.toString)
-      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      val options = new CSVOptions(Map.empty[String, String], false, "GMT")
       assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) ===
         Decimal(decimalValue, decimalType.precision, decimalType.scale))
     }
@@ -51,21 +52,21 @@ class UnivocityParserSuite extends SparkFunSuite {
     // Nullable field with nullValue option.
     types.foreach { t =>
       // Tests that a custom nullValue.
-      val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), "GMT")
+      val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), false, "GMT")
       val converter =
         parser.makeConverter("_1", t, nullable = true, options = nullValueOptions)
       assertNull(converter.apply("-"))
       assertNull(converter.apply(null))
 
       // Tests that the default nullValue is empty string.
-      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      val options = new CSVOptions(Map.empty[String, String], false, "GMT")
       assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply(""))
     }
 
     // Not nullable field with nullValue option.
     types.foreach { t =>
       // Casts a null to not nullable field should throw an exception.
-      val options = new CSVOptions(Map("nullValue" -> "-"), "GMT")
+      val options = new CSVOptions(Map("nullValue" -> "-"), false, "GMT")
       val converter =
         parser.makeConverter("_1", t, nullable = false, options = options)
       var message = intercept[RuntimeException] {
@@ -81,7 +82,7 @@ class UnivocityParserSuite extends SparkFunSuite {
     // If nullValue is different with empty string, then, empty string should not be casted into
     // null.
     Seq(true, false).foreach { b =>
-      val options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
+      val options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT")
       val converter =
         parser.makeConverter("_1", StringType, nullable = b, options = options)
       assert(converter.apply("") == UTF8String.fromString(""))
@@ -89,7 +90,7 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Throws exception for empty string with non null type") {
-      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      val options = new CSVOptions(Map.empty[String, String], false, "GMT")
     val exception = intercept[RuntimeException]{
       parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("")
     }
@@ -97,7 +98,7 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Types are cast correctly") {
-    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
     assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10)
     assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10)
     assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10)
@@ -107,7 +108,7 @@ class UnivocityParserSuite extends SparkFunSuite {
     assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true)
 
     val timestampsOptions =
-      new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT")
+      new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), false, "GMT")
     val customTimestamp = "31/01/2015 00:00"
     val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
     val castedTimestamp =
@@ -116,7 +117,7 @@ class UnivocityParserSuite extends SparkFunSuite {
     assert(castedTimestamp == expectedTime * 1000L)
 
     val customDate = "31/01/2015"
-    val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT")
+    val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "GMT")
     val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
     val castedDate =
       parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
@@ -131,7 +132,7 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Throws exception for casting an invalid string to Float and Double Types") {
-    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
     val types = Seq(DoubleType, FloatType)
     val input = Seq("10u000", "abc", "1 2/3")
     types.foreach { dt =>
@@ -145,7 +146,7 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Float NaN values are parsed correctly") {
-    val options = new CSVOptions(Map("nanValue" -> "nn"), "GMT")
+    val options = new CSVOptions(Map("nanValue" -> "nn"), false, "GMT")
     val floatVal: Float = parser.makeConverter(
       "_1", FloatType, nullable = true, options = options
     ).apply("nn").asInstanceOf[Float]
@@ -156,7 +157,7 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Double NaN values are parsed correctly") {
-    val options = new CSVOptions(Map("nanValue" -> "-"), "GMT")
+    val options = new CSVOptions(Map("nanValue" -> "-"), false, "GMT")
     val doubleVal: Double = parser.makeConverter(
       "_1", DoubleType, nullable = true, options = options
     ).apply("-").asInstanceOf[Double]
@@ -165,14 +166,14 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Float infinite values can be parsed") {
-    val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
+    val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT")
     val floatVal1 = parser.makeConverter(
       "_1", FloatType, nullable = true, options = negativeInfOptions
     ).apply("max").asInstanceOf[Float]
 
     assert(floatVal1 == Float.NegativeInfinity)
 
-    val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
+    val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT")
     val floatVal2 = parser.makeConverter(
       "_1", FloatType, nullable = true, options = positiveInfOptions
     ).apply("max").asInstanceOf[Float]
@@ -181,14 +182,14 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Double infinite values can be parsed") {
-    val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
+    val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT")
     val doubleVal1 = parser.makeConverter(
       "_1", DoubleType, nullable = true, options = negativeInfOptions
     ).apply("max").asInstanceOf[Double]
 
     assert(doubleVal1 == Double.NegativeInfinity)
 
-    val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
+    val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT")
     val doubleVal2 = parser.makeConverter(
       "_1", DoubleType, nullable = true, options = positiveInfOptions
     ).apply("max").asInstanceOf[Double]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org