You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/14 10:37:14 UTC
[spark] branch branch-3.0 updated: [SPARK-30810][SQL] Parses and
convert a CSV Dataset having different column from 'value' in csv(dataset)
API
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 79ce792 [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API
79ce792 is described below
commit 79ce79234f02092e22fdd79e859d83f5a174ef95
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Fri Feb 14 18:20:18 2020 +0800
[SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API
### What changes were proposed in this pull request?
This PR fixes `DataFrameReader.csv(dataset: Dataset[String])` API to take a `Dataset[String]` originated from a column name different from `value`. This is a long-standing bug started from the very first place.
`CSVUtils.filterCommentAndEmpty` assumed the `Dataset[String]` to be originated with `value` column. This PR changes to use the first column name in the schema.
### Why are the changes needed?
For `DataFrameReader.csv(dataset: Dataset[String])` to support any `Dataset[String]` as the signature indicates.
### Does this PR introduce any user-facing change?
Yes,
```scala
val ds = spark.range(2).selectExpr("concat('a,b,', id) AS text").as[String]
spark.read.option("header", true).option("inferSchema", true).csv(ds).show()
```
Before:
```
org.apache.spark.sql.AnalysisException: cannot resolve '`value`' given input columns: [text];;
'Filter (length(trim('value, None)) > 0)
+- Project [concat(a,b,, cast(id#0L as string)) AS text#2]
+- Range (0, 2, step=1, splits=Some(2))
```
After:
```
+---+---+---+
| a| b| 0|
+---+---+---+
| a| b| 1|
+---+---+---+
```
### How was this patch tested?
Unittest was added.
Closes #27561 from HyukjinKwon/SPARK-30810.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 2a270a731a3b1da9a0fb036d648dd522e5c4d5ad)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala | 7 ++++---
.../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 7 +++++++
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
index 21fabac..d8b52c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
@@ -33,11 +33,12 @@ object CSVUtils {
// with the one below, `filterCommentAndEmpty` but execution path is different. One of them
// might have to be removed in the near future if possible.
import lines.sqlContext.implicits._
- val nonEmptyLines = lines.filter(length(trim($"value")) > 0)
+ val aliased = lines.toDF("value")
+ val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
if (options.isCommentSet) {
- nonEmptyLines.filter(!$"value".startsWith(options.comment.toString))
+ nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
} else {
- nonEmptyLines
+ nonEmptyLines.as[String]
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b1105b4..0be0e1e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2294,6 +2294,13 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
}
}
}
+
+ test("SPARK-30810: parses and convert a CSV Dataset having different column from 'value'") {
+ val ds = spark.range(2).selectExpr("concat('a,b,', id) AS `a.text`").as[String]
+ val csv = spark.read.option("header", true).option("inferSchema", true).csv(ds)
+ assert(csv.schema.fieldNames === Seq("a", "b", "0"))
+ checkAnswer(csv, Row("a", "b", 1))
+ }
}
class CSVv1Suite extends CSVSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org