You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/05/10 16:28:50 UTC
spark git commit: [SPARK-24068][BACKPORT-2.3] Propagating
DataFrameReader's options to Text datasource on schema inferring
Repository: spark
Updated Branches:
refs/heads/branch-2.3 8889d7864 -> eab10f994
[SPARK-24068][BACKPORT-2.3] Propagating DataFrameReader's options to Text datasource on schema inferring
## What changes were proposed in this pull request?
While reading CSV or JSON files, DataFrameReader's options are converted to Hadoop's parameters, for example there:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L302
but the options are not propagated to Text datasource on schema inferring, for instance:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala#L184-L188
The PR proposes propagation of user's options to Text datasource on scheme inferring in similar way as user's options are converted to Hadoop parameters if schema is specified.
## How was this patch tested?
The changes were tested manually by using https://github.com/twitter/hadoop-lzo:
```
hadoop-lzo> mvn clean package
hadoop-lzo> ln -s ./target/hadoop-lzo-0.4.21-SNAPSHOT.jar ./hadoop-lzo.jar
```
Create 2 test files in JSON and CSV format and compress them:
```shell
$ cat test.csv
col1|col2
a|1
$ lzop test.csv
$ cat test.json
{"col1":"a","col2":1}
$ lzop test.json
```
Run `spark-shell` with hadoop-lzo:
```
bin/spark-shell --jars ~/hadoop-lzo/hadoop-lzo.jar
```
reading compressed CSV and JSON without schema:
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("inferSchema",true).option("header",true).option("sep","|").csv("test.csv.lzo").show()
+----+----+
|col1|col2|
+----+----+
| a| 1|
+----+----+
```
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("multiLine", true).json("test.json.lzo").printSchema
root
|-- col1: string (nullable = true)
|-- col2: long (nullable = true)
```
Author: Maxim Gekk <ma...@databricks.com>
Author: Maxim Gekk <ma...@gmail.com>
Closes #21292 from MaxGekk/text-options-backport-v2.3.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eab10f99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eab10f99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eab10f99
Branch: refs/heads/branch-2.3
Commit: eab10f9945c1d01daa45c233a39dedfd184f543c
Parents: 8889d78
Author: Maxim Gekk <ma...@databricks.com>
Authored: Fri May 11 00:28:43 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Fri May 11 00:28:43 2018 +0800
----------------------------------------------------------------------
.../spark/sql/catalyst/json/JSONOptions.scala | 2 +-
.../execution/datasources/csv/CSVDataSource.scala | 6 ++++--
.../sql/execution/datasources/csv/CSVOptions.scala | 2 +-
.../execution/datasources/csv/UnivocityParser.scala | 2 --
.../execution/datasources/json/JsonDataSource.scala | 16 ++++++++++------
5 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 652412b..190fcc6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util._
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
- @transient private val parameters: CaseInsensitiveMap[String],
+ @transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 4870d75..fffad17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -184,7 +184,8 @@ object TextInputCSVDataSource extends CSVDataSource {
DataSource.apply(
sparkSession,
paths = paths,
- className = classOf[TextFileFormat].getName
+ className = classOf[TextFileFormat].getName,
+ options = options.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
} else {
@@ -248,7 +249,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
options: CSVOptions): RDD[PortableDataStream] = {
val paths = inputPaths.map(_.getPath)
val name = paths.mkString(",")
- val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
+ val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
+ options.parameters))
FileInputFormat.setInputPaths(job, paths: _*)
val conf = job.getConfiguration
http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index c167906..6347af6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
class CSVOptions(
- @transient private val parameters: CaseInsensitiveMap[String],
+ @transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 3d6cc30..99557a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.csv
import java.io.InputStream
import java.math.BigDecimal
-import java.text.NumberFormat
-import java.util.Locale
import scala.util.Try
import scala.util.control.NonFatal
http://git-wip-us.apache.org/repos/asf/spark/blob/eab10f99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 77e7edc..8a0fe53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -92,7 +92,7 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
- val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths)
+ val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
inferFromDataset(json, parsedOptions)
}
@@ -104,13 +104,15 @@ object TextInputJsonDataSource extends JsonDataSource {
private def createBaseDataset(
sparkSession: SparkSession,
- inputPaths: Seq[FileStatus]): Dataset[String] = {
+ inputPaths: Seq[FileStatus],
+ parsedOptions: JSONOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
- className = classOf[TextFileFormat].getName
+ className = classOf[TextFileFormat].getName,
+ options = parsedOptions.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as(Encoders.STRING)
}
@@ -144,16 +146,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
- val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
+ val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions)
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
JsonInferSchema.infer(sampled, parsedOptions, createParser)
}
private def createBaseRdd(
sparkSession: SparkSession,
- inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = {
+ inputPaths: Seq[FileStatus],
+ parsedOptions: JSONOptions): RDD[PortableDataStream] = {
val paths = inputPaths.map(_.getPath)
- val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
+ val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
+ parsedOptions.parameters))
val conf = job.getConfiguration
val name = paths.mkString(",")
FileInputFormat.setInputPaths(job, paths: _*)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org