You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/05/10 16:28:50 UTC

spark git commit: [SPARK-24068][BACKPORT-2.3] Propagating DataFrameReader's options to Text datasource on schema inferring

Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8889d7864 -> eab10f994


[SPARK-24068][BACKPORT-2.3] Propagating DataFrameReader's options to Text datasource on schema inferring

## What changes were proposed in this pull request?

While reading CSV or JSON files, DataFrameReader's options are converted to Hadoop's parameters, for example there:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L302

but the options are not propagated to Text datasource on schema inferring, for instance:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala#L184-L188

The PR proposes propagation of user's options to Text datasource on scheme inferring in similar way as user's options are converted to Hadoop parameters if schema is specified.

## How was this patch tested?
The changes were tested manually by using https://github.com/twitter/hadoop-lzo:

```
hadoop-lzo> mvn clean package
hadoop-lzo> ln -s ./target/hadoop-lzo-0.4.21-SNAPSHOT.jar ./hadoop-lzo.jar
```
Create 2 test files in JSON and CSV format and compress them:
```shell
$ cat test.csv
col1|col2
a|1
$ lzop test.csv
$ cat test.json
{"col1":"a","col2":1}
$ lzop test.json
```
Run `spark-shell` with hadoop-lzo:
```
bin/spark-shell --jars ~/hadoop-lzo/hadoop-lzo.jar
```
reading compressed CSV and JSON without schema:
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("inferSchema",true).option("header",true).option("sep","|").csv("test.csv.lzo").show()
+----+----+
|col1|col2|
+----+----+
|   a|   1|
+----+----+
```
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("multiLine", true).json("test.json.lzo").printSchema
root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)
```

Author: Maxim Gekk <ma...@databricks.com>
Author: Maxim Gekk <ma...@gmail.com>

Closes #21292 from MaxGekk/text-options-backport-v2.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eab10f99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eab10f99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eab10f99

Branch: refs/heads/branch-2.3
Commit: eab10f9945c1d01daa45c233a39dedfd184f543c
Parents: 8889d78
Author: Maxim Gekk <ma...@databricks.com>
Authored: Fri May 11 00:28:43 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Fri May 11 00:28:43 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/json/JSONOptions.scala       |  2 +-
 .../execution/datasources/csv/CSVDataSource.scala   |  6 ++++--
 .../sql/execution/datasources/csv/CSVOptions.scala  |  2 +-
 .../execution/datasources/csv/UnivocityParser.scala |  2 --
 .../execution/datasources/json/JsonDataSource.scala | 16 ++++++++++------
 5 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 652412b..190fcc6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util._
  * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-    @transient private val parameters: CaseInsensitiveMap[String],
+    @transient val parameters: CaseInsensitiveMap[String],
     defaultTimeZoneId: String,
     defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable  {

http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 4870d75..fffad17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -184,7 +184,8 @@ object TextInputCSVDataSource extends CSVDataSource {
         DataSource.apply(
           sparkSession,
           paths = paths,
-          className = classOf[TextFileFormat].getName
+          className = classOf[TextFileFormat].getName,
+          options = options.parameters
         ).resolveRelation(checkFilesExist = false))
         .select("value").as[String](Encoders.STRING)
     } else {
@@ -248,7 +249,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
       options: CSVOptions): RDD[PortableDataStream] = {
     val paths = inputPaths.map(_.getPath)
     val name = paths.mkString(",")
-    val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
+    val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
+      options.parameters))
     FileInputFormat.setInputPaths(job, paths: _*)
     val conf = job.getConfiguration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index c167906..6347af6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util._
 
 class CSVOptions(
-    @transient private val parameters: CaseInsensitiveMap[String],
+    @transient val parameters: CaseInsensitiveMap[String],
     defaultTimeZoneId: String,
     defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable {

http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 3d6cc30..99557a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.csv
 
 import java.io.InputStream
 import java.math.BigDecimal
-import java.text.NumberFormat
-import java.util.Locale
 
 import scala.util.Try
 import scala.util.control.NonFatal

http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 77e7edc..8a0fe53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -92,7 +92,7 @@ object TextInputJsonDataSource extends JsonDataSource {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType = {
-    val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths)
+    val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
     inferFromDataset(json, parsedOptions)
   }
 
@@ -104,13 +104,15 @@ object TextInputJsonDataSource extends JsonDataSource {
 
   private def createBaseDataset(
       sparkSession: SparkSession,
-      inputPaths: Seq[FileStatus]): Dataset[String] = {
+      inputPaths: Seq[FileStatus],
+      parsedOptions: JSONOptions): Dataset[String] = {
     val paths = inputPaths.map(_.getPath.toString)
     sparkSession.baseRelationToDataFrame(
       DataSource.apply(
         sparkSession,
         paths = paths,
-        className = classOf[TextFileFormat].getName
+        className = classOf[TextFileFormat].getName,
+        options = parsedOptions.parameters
       ).resolveRelation(checkFilesExist = false))
       .select("value").as(Encoders.STRING)
   }
@@ -144,16 +146,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType = {
-    val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
+    val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions)
     val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
     JsonInferSchema.infer(sampled, parsedOptions, createParser)
   }
 
   private def createBaseRdd(
       sparkSession: SparkSession,
-      inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = {
+      inputPaths: Seq[FileStatus],
+      parsedOptions: JSONOptions): RDD[PortableDataStream] = {
     val paths = inputPaths.map(_.getPath)
-    val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
+    val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
+      parsedOptions.parameters))
     val conf = job.getConfiguration
     val name = paths.mkString(",")
     FileInputFormat.setInputPaths(job, paths: _*)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org