You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by HyukjinKwon <gi...@git.apache.org> on 2018/10/09 07:55:45 UTC
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
GitHub user HyukjinKwon opened a pull request:
https://github.com/apache/spark/pull/22676
[SPARK-25684][SQL] Organize header related codes in CSV datasource
## What changes were proposed in this pull request?
1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is).
Rationale:
- Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc.
- See `JsonDataSource`. Now `CSVDataSource` is quite consistent with `JsonDataSource`. Since CSV's code path is quite complicated, we might better match them as possible as we can.
2. Move `CSVDataSource.checkHeaderColumnNames` to `CSVHeaderChecker.checkHeaderColumnNames` (as is).
Rationale:
- Similar reasons above with 1.
3. Put `enforceSchema` logics into `CSVHeaderChecker`.
- The checking header and column pruning stuff were added (per https://github.com/apache/spark/pull/20894 and https://github.com/apache/spark/pull/21296) but some of codes such as https://github.com/apache/spark/pull/21296 are duplicated
- Also, checking header code is basically here and there. We better put them in a single place, which is quite error-prone. See (https://github.com/apache/spark/pull/22656).
## How was this patch tested?
Existing tests should cover this.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/HyukjinKwon/spark refactoring-csv
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/22676.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #22676
----
commit 56906680ab7d5d63be04bac2c3a19bb52baa3025
Author: hyukjinkwon <gu...@...>
Date: 2018-10-09T07:26:08Z
Organize header related codes in CSV datasource
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3871/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223593838
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
@@ -273,44 +274,47 @@ private[csv] object UnivocityParser {
inputStream: InputStream,
shouldDropHeader: Boolean,
tokenizer: CsvParser): Iterator[Array[String]] = {
- convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
+ val handleHeader: () => Unit =
+ () => if (shouldDropHeader) tokenizer.parseNext
+
+ convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
}
/**
* Parses a stream that contains CSV strings and turns it into an iterator of rows.
*/
def parseStream(
inputStream: InputStream,
- shouldDropHeader: Boolean,
parser: UnivocityParser,
- schema: StructType,
- checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
+ headerChecker: CSVHeaderChecker,
+ schema: StructType): Iterator[InternalRow] = {
val tokenizer = parser.tokenizer
val safeParser = new FailureSafeParser[Array[String]](
input => Seq(parser.convert(input)),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord,
parser.options.multiLine)
- convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>
+
+ val handleHeader: () => Unit =
+ () => headerChecker.checkHeaderColumnNames(tokenizer)
--- End diff --
This matches the code structure with `parseStream` and `parseIterator` which are used in multimode and non-multimode.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223698787
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val actualSchema =
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
- val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) {
- val firstLine = maybeFirstLine.get
- val parser = new CsvParser(parsedOptions.asParserSettings)
- val columnNames = parser.parseLine(firstLine)
- CSVDataSource.checkHeaderColumnNames(
+ val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+ val headerChecker = new CSVHeaderChecker(
actualSchema,
- columnNames,
- csvDataset.getClass.getCanonicalName,
- parsedOptions.enforceSchema,
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ parsedOptions,
+ source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
--- End diff --
Is it better to output more concrete info about the dataset. For example, `toString` outputs field names at least. I think it will help in log analysis.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97147 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97147/testReport)** for PR 22676 at commit [`5690668`](https://github.com/apache/spark/commit/56906680ab7d5d63be04bac2c3a19bb52baa3025).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97235/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:
https://github.com/apache/spark/pull/22676
retest this please
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:
https://github.com/apache/spark/pull/22676
retest this please
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223730392
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
@@ -139,14 +138,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
parsedOptions)
+ val schema = if (columnPruning) requiredSchema else dataSchema
+ val headerChecker = new CSVHeaderChecker(
+ schema, parsedOptions, source = s"CSV file: ${file.filePath}", file.start == 0)
--- End diff --
`isStartOfFile = file.start == 0`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223594011
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is the start of the file.
+ * if unknown or not applicable (for instance when the input is a dataset),
+ * can be omitted.
+ */
+class CSVHeaderChecker(
+ schema: StructType,
+ options: CSVOptions,
+ source: String,
+ isStartOfFile: Boolean = false) extends Logging {
+
+ // Indicates if it is set to `false`, comparison of column names and schema field
+ // names is not case sensitive.
+ private val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+
+ // Indicates if it is `true`, column names are ignored otherwise the CSV column
+ // names are checked for conformance to the schema. In the case if
+ // the column name don't conform to the schema, an exception is thrown.
+ private val enforceSchema = options.enforceSchema
+
+ /**
+ * Checks that column names in a CSV header and field names in the schema are the same
+ * by taking into account case sensitivity.
+ *
+ * @param columnNames names of CSV columns that must be checked against to the schema.
+ */
+ private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
--- End diff --
It's moved as was except the parameters at its signature.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223741059
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
@@ -330,7 +333,10 @@ private[csv] object UnivocityParser {
def parseIterator(
lines: Iterator[String],
parser: UnivocityParser,
+ headerChecker: CSVHeaderChecker,
schema: StructType): Iterator[InternalRow] = {
+ headerChecker.checkHeaderColumnNames(lines, parser.tokenizer)
--- End diff --
The same question here. I would prefer to consume the input iterator lazily. This is the one of advantage of iterators , it performs an action when you explicitly call it (`hasNext` or `next`) comparing to collections, for example.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97235 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97235/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
* This patch **fails due to an unknown error code, -9**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:
https://github.com/apache/spark/pull/22676
Thank you @cloud-fan and @MaxGekk for reviewing this.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97235 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97235/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97149/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223744025
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val actualSchema =
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
- val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) {
- val firstLine = maybeFirstLine.get
- val parser = new CsvParser(parsedOptions.asParserSettings)
- val columnNames = parser.parseLine(firstLine)
- CSVDataSource.checkHeaderColumnNames(
+ val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+ val headerChecker = new CSVHeaderChecker(
actualSchema,
- columnNames,
- csvDataset.getClass.getCanonicalName,
- parsedOptions.enforceSchema,
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ parsedOptions,
+ source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
--- End diff --
Makes sense. If that's just `toString`, of course I can fix it here since the change is small although it's orthogonal.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97240 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97240/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97147 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97147/testReport)** for PR 22676 at commit [`5690668`](https://github.com/apache/spark/commit/56906680ab7d5d63be04bac2c3a19bb52baa3025).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
* `class CSVHeaderChecker(`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223737261
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
@@ -273,44 +273,47 @@ private[csv] object UnivocityParser {
inputStream: InputStream,
shouldDropHeader: Boolean,
tokenizer: CsvParser): Iterator[Array[String]] = {
- convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
+ val handleHeader: () => Unit =
+ () => if (shouldDropHeader) tokenizer.parseNext
+
+ convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
}
/**
* Parses a stream that contains CSV strings and turns it into an iterator of rows.
*/
def parseStream(
inputStream: InputStream,
- shouldDropHeader: Boolean,
parser: UnivocityParser,
- schema: StructType,
- checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
+ headerChecker: CSVHeaderChecker,
+ schema: StructType): Iterator[InternalRow] = {
val tokenizer = parser.tokenizer
val safeParser = new FailureSafeParser[Array[String]](
input => Seq(parser.convert(input)),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord,
parser.options.multiLine)
- convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>
+
+ val handleHeader: () => Unit =
+ () => headerChecker.checkHeaderColumnNames(tokenizer)
+
+ convertStream(inputStream, tokenizer, handleHeader) { tokens =>
safeParser.parse(tokens)
}.flatten
}
private def convertStream[T](
inputStream: InputStream,
- shouldDropHeader: Boolean,
tokenizer: CsvParser,
- checkHeader: Array[String] => Unit = _ => ())(
+ handleHeader: () => Unit)(
convert: Array[String] => T) = new Iterator[T] {
tokenizer.beginParsing(inputStream)
- private var nextRecord = {
- if (shouldDropHeader) {
- val firstRecord = tokenizer.parseNext()
- checkHeader(firstRecord)
- }
- tokenizer.parseNext()
- }
+
+ // We can handle header here since here the stream is open.
+ handleHeader()
--- End diff --
It looks slightly strange that we consume data from the input before the upper layer starts reading it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3816/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97162 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97162/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223728765
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is the start of the file.
+ * if unknown or not applicable (for instance when the input is a dataset),
+ * can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --
It's under execution package which is meant to be private. Since it's accessed in DataFrameReader, it should be `private[sql]` which is removed in SPARK-16964 for this reason.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223722902
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is the start of the file.
+ * if unknown or not applicable (for instance when the input is a dataset),
+ * can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --
Can this be private to csv or spark packages? or is this now part of a public API?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97149 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97149/testReport)** for PR 22676 at commit [`89f7911`](https://github.com/apache/spark/commit/89f79113e6dc5da1903fed9dc04b1c7b5769d8f2).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97162 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97162/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223749425
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is the start of the file.
+ * if unknown or not applicable (for instance when the input is a dataset),
+ * can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --
Let's leave as is. It's kind of existing naming convention within each datasource.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3814/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97162/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223751038
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
@@ -330,7 +333,10 @@ private[csv] object UnivocityParser {
def parseIterator(
lines: Iterator[String],
parser: UnivocityParser,
+ headerChecker: CSVHeaderChecker,
schema: StructType): Iterator[InternalRow] = {
+ headerChecker.checkHeaderColumnNames(lines, parser.tokenizer)
--- End diff --
ditto. It was already doing in this way. Let's keep the original path as is since it targets to organize it..
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223741951
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala ---
@@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource {
maybeFirstLine.map(csvParser.parseLine(_)) match {
case Some(firstRow) if firstRow != null =>
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
- val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+ val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions)
--- End diff --
What about to import it from `CSVUtils`? What is the reason to have the prefix here?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223593894
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala ---
@@ -90,6 +89,49 @@ object CSVUtils {
None
}
}
+
+ /**
+ * Generates a header from the given row which is null-safe and duplicate-safe.
+ */
+ def makeSafeHeader(
--- End diff --
It's moved as was.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97147/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223749938
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
@@ -273,44 +273,47 @@ private[csv] object UnivocityParser {
inputStream: InputStream,
shouldDropHeader: Boolean,
tokenizer: CsvParser): Iterator[Array[String]] = {
- convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
+ val handleHeader: () => Unit =
+ () => if (shouldDropHeader) tokenizer.parseNext
+
+ convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
}
/**
* Parses a stream that contains CSV strings and turns it into an iterator of rows.
*/
def parseStream(
inputStream: InputStream,
- shouldDropHeader: Boolean,
parser: UnivocityParser,
- schema: StructType,
- checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
+ headerChecker: CSVHeaderChecker,
+ schema: StructType): Iterator[InternalRow] = {
val tokenizer = parser.tokenizer
val safeParser = new FailureSafeParser[Array[String]](
input => Seq(parser.convert(input)),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord,
parser.options.multiLine)
- convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>
+
+ val handleHeader: () => Unit =
+ () => headerChecker.checkHeaderColumnNames(tokenizer)
+
+ convertStream(inputStream, tokenizer, handleHeader) { tokens =>
safeParser.parse(tokens)
}.flatten
}
private def convertStream[T](
inputStream: InputStream,
- shouldDropHeader: Boolean,
tokenizer: CsvParser,
- checkHeader: Array[String] => Unit = _ => ())(
+ handleHeader: () => Unit)(
convert: Array[String] => T) = new Iterator[T] {
tokenizer.beginParsing(inputStream)
- private var nextRecord = {
- if (shouldDropHeader) {
- val firstRecord = tokenizer.parseNext()
- checkHeader(firstRecord)
- }
- tokenizer.parseNext()
- }
+
+ // We can handle header here since here the stream is open.
+ handleHeader()
--- End diff --
It is but I guess it was already doing in this way.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223748041
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val actualSchema =
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
- val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) {
- val firstLine = maybeFirstLine.get
- val parser = new CsvParser(parsedOptions.asParserSettings)
- val columnNames = parser.parseLine(firstLine)
- CSVDataSource.checkHeaderColumnNames(
+ val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+ val headerChecker = new CSVHeaderChecker(
actualSchema,
- columnNames,
- csvDataset.getClass.getCanonicalName,
- parsedOptions.enforceSchema,
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ parsedOptions,
+ source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
+ headerChecker.checkHeaderColumnNames(firstLine)
filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
- } else {
- filteredLines.rdd
- }
+ }.getOrElse(filteredLines.rdd)
--- End diff --
I don't exactly remember. Looks we can change it to `Dataset`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/22676
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97240/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97240 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97240/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:
https://github.com/apache/spark/pull/22676
cc @cloud-fan and @MaxGekk
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223594430
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is the start of the file.
+ * if unknown or not applicable (for instance when the input is a dataset),
+ * can be omitted.
+ */
+class CSVHeaderChecker(
+ schema: StructType,
+ options: CSVOptions,
+ source: String,
+ isStartOfFile: Boolean = false) extends Logging {
+
+ // Indicates if it is set to `false`, comparison of column names and schema field
+ // names is not case sensitive.
+ private val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+
+ // Indicates if it is `true`, column names are ignored otherwise the CSV column
+ // names are checked for conformance to the schema. In the case if
+ // the column name don't conform to the schema, an exception is thrown.
+ private val enforceSchema = options.enforceSchema
+
+ /**
+ * Checks that column names in a CSV header and field names in the schema are the same
+ * by taking into account case sensitivity.
+ *
+ * @param columnNames names of CSV columns that must be checked against to the schema.
+ */
+ private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
+ if (columnNames != null) {
+ val fieldNames = schema.map(_.name).toIndexedSeq
+ val (headerLen, schemaSize) = (columnNames.size, fieldNames.length)
+ var errorMessage: Option[String] = None
+
+ if (headerLen == schemaSize) {
+ var i = 0
+ while (errorMessage.isEmpty && i < headerLen) {
+ var (nameInSchema, nameInHeader) = (fieldNames(i), columnNames(i))
+ if (!caseSensitive) {
+ // scalastyle:off caselocale
+ nameInSchema = nameInSchema.toLowerCase
+ nameInHeader = nameInHeader.toLowerCase
+ // scalastyle:on caselocale
+ }
+ if (nameInHeader != nameInSchema) {
+ errorMessage = Some(
+ s"""|CSV header does not conform to the schema.
+ | Header: ${columnNames.mkString(", ")}
+ | Schema: ${fieldNames.mkString(", ")}
+ |Expected: ${fieldNames(i)} but found: ${columnNames(i)}
+ |$source""".stripMargin)
--- End diff --
only this diff.
Previously it was
```
|CSV file: $fileName""".stripMargin)
```
which ends up with producing the class of source here. See (https://github.com/apache/spark/pull/22676/files#diff-f70bda59304588cc3abfa3a9840653f4R512)
This is only the diff in this method.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22676
**[Test build #97149 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97149/testReport)** for PR 22676 at commit [`89f7911`](https://github.com/apache/spark/commit/89f79113e6dc5da1903fed9dc04b1c7b5769d8f2).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223743548
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala ---
@@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource {
maybeFirstLine.map(csvParser.parseLine(_)) match {
case Some(firstRow) if firstRow != null =>
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
- val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+ val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions)
--- End diff --
Because mostly in this codes use `CSVUtils...` one. I just followed it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3825/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:
https://github.com/apache/spark/pull/22676
Merged to master.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223593701
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
@@ -273,44 +274,47 @@ private[csv] object UnivocityParser {
inputStream: InputStream,
shouldDropHeader: Boolean,
tokenizer: CsvParser): Iterator[Array[String]] = {
- convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
+ val handleHeader: () => Unit =
+ () => if (shouldDropHeader) tokenizer.parseNext
--- End diff --
This is used in schema inference path, where we don't check header. Here only it drops the header.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22676
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3874/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223729530
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is the start of the file.
+ * if unknown or not applicable (for instance when the input is a dataset),
+ * can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --
Is this prefix of `CSVHeaderChecker` necessary? The class is in `csv` package already. It should be clear that it checks CSV headers.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22676#discussion_r223727251
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val actualSchema =
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
- val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) {
- val firstLine = maybeFirstLine.get
- val parser = new CsvParser(parsedOptions.asParserSettings)
- val columnNames = parser.parseLine(firstLine)
- CSVDataSource.checkHeaderColumnNames(
+ val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+ val headerChecker = new CSVHeaderChecker(
actualSchema,
- columnNames,
- csvDataset.getClass.getCanonicalName,
- parsedOptions.enforceSchema,
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ parsedOptions,
+ source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
+ headerChecker.checkHeaderColumnNames(firstLine)
filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
- } else {
- filteredLines.rdd
- }
+ }.getOrElse(filteredLines.rdd)
--- End diff --
It is not directly related to your changes. Just in case, why do we convert `Dataset` to `RDD` here?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org