You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by HyukjinKwon <gi...@git.apache.org> on 2018/10/09 07:55:45 UTC

[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/22676

    [SPARK-25684][SQL] Organize header related codes in CSV datasource

    ## What changes were proposed in this pull request?
    
    1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is).
      Rationale:
        - Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc.
        - See `JsonDataSource`. Now `CSVDataSource` is quite consistent with `JsonDataSource`. Since CSV's code path is quite complicated, we might better match them as possible as we can.
    
    2. Move `CSVDataSource.checkHeaderColumnNames` to `CSVHeaderChecker.checkHeaderColumnNames` (as is).
        Rationale:
          - Similar reasons above with 1.
    
    3. Put `enforceSchema` logics into `CSVHeaderChecker`.
          - The checking header and column pruning stuff were added (per https://github.com/apache/spark/pull/20894 and https://github.com/apache/spark/pull/21296) but some of codes such as https://github.com/apache/spark/pull/21296 are duplicated
          - Also, checking header code is basically here and there. We better put them in a single place, which is quite error-prone. See (https://github.com/apache/spark/pull/22656).
    
    ## How was this patch tested?
    
    Existing tests should cover this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark refactoring-csv

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22676.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22676
    
----
commit 56906680ab7d5d63be04bac2c3a19bb52baa3025
Author: hyukjinkwon <gu...@...>
Date:   2018-10-09T07:26:08Z

    Organize header related codes in CSV datasource

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3871/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223593838
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -273,44 +274,47 @@ private[csv] object UnivocityParser {
           inputStream: InputStream,
           shouldDropHeader: Boolean,
           tokenizer: CsvParser): Iterator[Array[String]] = {
    -    convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
    +    val handleHeader: () => Unit =
    +      () => if (shouldDropHeader) tokenizer.parseNext
    +
    +    convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
       }
     
       /**
        * Parses a stream that contains CSV strings and turns it into an iterator of rows.
        */
       def parseStream(
           inputStream: InputStream,
    -      shouldDropHeader: Boolean,
           parser: UnivocityParser,
    -      schema: StructType,
    -      checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
    +      headerChecker: CSVHeaderChecker,
    +      schema: StructType): Iterator[InternalRow] = {
         val tokenizer = parser.tokenizer
         val safeParser = new FailureSafeParser[Array[String]](
           input => Seq(parser.convert(input)),
           parser.options.parseMode,
           schema,
           parser.options.columnNameOfCorruptRecord,
           parser.options.multiLine)
    -    convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>
    +
    +    val handleHeader: () => Unit =
    +      () => headerChecker.checkHeaderColumnNames(tokenizer)
    --- End diff --
    
    This matches the code structure with `parseStream` and `parseIterator` which are used in multimode and non-multimode.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223698787
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
    @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         val actualSchema =
           StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
     
    -    val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) {
    -      val firstLine = maybeFirstLine.get
    -      val parser = new CsvParser(parsedOptions.asParserSettings)
    -      val columnNames = parser.parseLine(firstLine)
    -      CSVDataSource.checkHeaderColumnNames(
    +    val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
    +      val headerChecker = new CSVHeaderChecker(
             actualSchema,
    -        columnNames,
    -        csvDataset.getClass.getCanonicalName,
    -        parsedOptions.enforceSchema,
    -        sparkSession.sessionState.conf.caseSensitiveAnalysis)
    +        parsedOptions,
    +        source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
    --- End diff --
    
    Is it better to output more concrete info about the dataset. For example, `toString` outputs field names at least. I think it will help in log analysis.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97147 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97147/testReport)** for PR 22676 at commit [`5690668`](https://github.com/apache/spark/commit/56906680ab7d5d63be04bac2c3a19bb52baa3025).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97235/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223730392
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
    @@ -139,14 +138,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
             StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
             StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
             parsedOptions)
    +      val schema = if (columnPruning) requiredSchema else dataSchema
    +      val headerChecker = new CSVHeaderChecker(
    +        schema, parsedOptions, source = s"CSV file: ${file.filePath}", file.start == 0)
    --- End diff --
    
    `isStartOfFile = file.start == 0`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223594011
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.csv
    +
    +import com.univocity.parsers.csv.CsvParser
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * Checks that column names in a CSV header and field names in the schema are the same
    + * by taking into account case sensitivity.
    + *
    + * @param schema provided (or inferred) schema to which CSV must conform.
    + * @param options parsed CSV options.
    + * @param source name of CSV source that are currently checked. It is used in error messages.
    + * @param isStartOfFile indicates if the currently processing partition is the start of the file.
    + *                      if unknown or not applicable (for instance when the input is a dataset),
    + *                      can be omitted.
    + */
    +class CSVHeaderChecker(
    +    schema: StructType,
    +    options: CSVOptions,
    +    source: String,
    +    isStartOfFile: Boolean = false) extends Logging {
    +
    +  // Indicates if it is set to `false`, comparison of column names and schema field
    +  // names is not case sensitive.
    +  private val caseSensitive = SQLConf.get.caseSensitiveAnalysis
    +
    +  // Indicates if it is `true`, column names are ignored otherwise the CSV column
    +  // names are checked for conformance to the schema. In the case if
    +  // the column name don't conform to the schema, an exception is thrown.
    +  private val enforceSchema = options.enforceSchema
    +
    +  /**
    +   * Checks that column names in a CSV header and field names in the schema are the same
    +   * by taking into account case sensitivity.
    +   *
    +   * @param columnNames names of CSV columns that must be checked against to the schema.
    +   */
    +  private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
    --- End diff --
    
    It's moved as was except the parameters at its signature.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223741059
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -330,7 +333,10 @@ private[csv] object UnivocityParser {
       def parseIterator(
           lines: Iterator[String],
           parser: UnivocityParser,
    +      headerChecker: CSVHeaderChecker,
           schema: StructType): Iterator[InternalRow] = {
    +    headerChecker.checkHeaderColumnNames(lines, parser.tokenizer)
    --- End diff --
    
    The same question here. I would prefer to consume the input iterator lazily. This is the one of advantage of iterators , it performs an action when you explicitly call it (`hasNext` or `next`) comparing to collections, for example.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97235 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97235/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Thank you @cloud-fan and @MaxGekk for reviewing this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97235 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97235/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97149/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223744025
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
    @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         val actualSchema =
           StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
     
    -    val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) {
    -      val firstLine = maybeFirstLine.get
    -      val parser = new CsvParser(parsedOptions.asParserSettings)
    -      val columnNames = parser.parseLine(firstLine)
    -      CSVDataSource.checkHeaderColumnNames(
    +    val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
    +      val headerChecker = new CSVHeaderChecker(
             actualSchema,
    -        columnNames,
    -        csvDataset.getClass.getCanonicalName,
    -        parsedOptions.enforceSchema,
    -        sparkSession.sessionState.conf.caseSensitiveAnalysis)
    +        parsedOptions,
    +        source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
    --- End diff --
    
    Makes sense. If that's just `toString`, of course I can fix it here since the change is small although it's orthogonal.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97240 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97240/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97147 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97147/testReport)** for PR 22676 at commit [`5690668`](https://github.com/apache/spark/commit/56906680ab7d5d63be04bac2c3a19bb52baa3025).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class CSVHeaderChecker(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223737261
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -273,44 +273,47 @@ private[csv] object UnivocityParser {
           inputStream: InputStream,
           shouldDropHeader: Boolean,
           tokenizer: CsvParser): Iterator[Array[String]] = {
    -    convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
    +    val handleHeader: () => Unit =
    +      () => if (shouldDropHeader) tokenizer.parseNext
    +
    +    convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
       }
     
       /**
        * Parses a stream that contains CSV strings and turns it into an iterator of rows.
        */
       def parseStream(
           inputStream: InputStream,
    -      shouldDropHeader: Boolean,
           parser: UnivocityParser,
    -      schema: StructType,
    -      checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
    +      headerChecker: CSVHeaderChecker,
    +      schema: StructType): Iterator[InternalRow] = {
         val tokenizer = parser.tokenizer
         val safeParser = new FailureSafeParser[Array[String]](
           input => Seq(parser.convert(input)),
           parser.options.parseMode,
           schema,
           parser.options.columnNameOfCorruptRecord,
           parser.options.multiLine)
    -    convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>
    +
    +    val handleHeader: () => Unit =
    +      () => headerChecker.checkHeaderColumnNames(tokenizer)
    +
    +    convertStream(inputStream, tokenizer, handleHeader) { tokens =>
           safeParser.parse(tokens)
         }.flatten
       }
     
       private def convertStream[T](
           inputStream: InputStream,
    -      shouldDropHeader: Boolean,
           tokenizer: CsvParser,
    -      checkHeader: Array[String] => Unit = _ => ())(
    +      handleHeader: () => Unit)(
           convert: Array[String] => T) = new Iterator[T] {
         tokenizer.beginParsing(inputStream)
    -    private var nextRecord = {
    -      if (shouldDropHeader) {
    -        val firstRecord = tokenizer.parseNext()
    -        checkHeader(firstRecord)
    -      }
    -      tokenizer.parseNext()
    -    }
    +
    +    // We can handle header here since here the stream is open.
    +    handleHeader()
    --- End diff --
    
    It looks slightly strange that we consume data from the input before the upper layer starts reading it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3816/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97162 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97162/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223728765
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.csv
    +
    +import com.univocity.parsers.csv.CsvParser
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * Checks that column names in a CSV header and field names in the schema are the same
    + * by taking into account case sensitivity.
    + *
    + * @param schema provided (or inferred) schema to which CSV must conform.
    + * @param options parsed CSV options.
    + * @param source name of CSV source that are currently checked. It is used in error messages.
    + * @param isStartOfFile indicates if the currently processing partition is the start of the file.
    + *                      if unknown or not applicable (for instance when the input is a dataset),
    + *                      can be omitted.
    + */
    +class CSVHeaderChecker(
    --- End diff --
    
    It's under execution package which is meant to be private. Since it's accessed in DataFrameReader, it should be `private[sql]` which is removed in SPARK-16964 for this reason.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223722902
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.csv
    +
    +import com.univocity.parsers.csv.CsvParser
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * Checks that column names in a CSV header and field names in the schema are the same
    + * by taking into account case sensitivity.
    + *
    + * @param schema provided (or inferred) schema to which CSV must conform.
    + * @param options parsed CSV options.
    + * @param source name of CSV source that are currently checked. It is used in error messages.
    + * @param isStartOfFile indicates if the currently processing partition is the start of the file.
    + *                      if unknown or not applicable (for instance when the input is a dataset),
    + *                      can be omitted.
    + */
    +class CSVHeaderChecker(
    --- End diff --
    
    Can this be private to csv or spark packages? or is this now part of a public API? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97149 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97149/testReport)** for PR 22676 at commit [`89f7911`](https://github.com/apache/spark/commit/89f79113e6dc5da1903fed9dc04b1c7b5769d8f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97162 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97162/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223749425
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.csv
    +
    +import com.univocity.parsers.csv.CsvParser
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * Checks that column names in a CSV header and field names in the schema are the same
    + * by taking into account case sensitivity.
    + *
    + * @param schema provided (or inferred) schema to which CSV must conform.
    + * @param options parsed CSV options.
    + * @param source name of CSV source that are currently checked. It is used in error messages.
    + * @param isStartOfFile indicates if the currently processing partition is the start of the file.
    + *                      if unknown or not applicable (for instance when the input is a dataset),
    + *                      can be omitted.
    + */
    +class CSVHeaderChecker(
    --- End diff --
    
    Let's leave as is. It's kind of existing naming convention within each datasource.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3814/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97162/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223751038
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -330,7 +333,10 @@ private[csv] object UnivocityParser {
       def parseIterator(
           lines: Iterator[String],
           parser: UnivocityParser,
    +      headerChecker: CSVHeaderChecker,
           schema: StructType): Iterator[InternalRow] = {
    +    headerChecker.checkHeaderColumnNames(lines, parser.tokenizer)
    --- End diff --
    
    ditto. It was already doing in this way. Let's keep the original path as is since it targets to organize it..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223741951
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala ---
    @@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource {
         maybeFirstLine.map(csvParser.parseLine(_)) match {
           case Some(firstRow) if firstRow != null =>
             val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
    -        val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
    +        val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions)
    --- End diff --
    
    What about to import it from `CSVUtils`? What is the reason to have the prefix here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223593894
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala ---
    @@ -90,6 +89,49 @@ object CSVUtils {
           None
         }
       }
    +
    +  /**
    +   * Generates a header from the given row which is null-safe and duplicate-safe.
    +   */
    +  def makeSafeHeader(
    --- End diff --
    
    It's moved as was.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97147/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223749938
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -273,44 +273,47 @@ private[csv] object UnivocityParser {
           inputStream: InputStream,
           shouldDropHeader: Boolean,
           tokenizer: CsvParser): Iterator[Array[String]] = {
    -    convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
    +    val handleHeader: () => Unit =
    +      () => if (shouldDropHeader) tokenizer.parseNext
    +
    +    convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
       }
     
       /**
        * Parses a stream that contains CSV strings and turns it into an iterator of rows.
        */
       def parseStream(
           inputStream: InputStream,
    -      shouldDropHeader: Boolean,
           parser: UnivocityParser,
    -      schema: StructType,
    -      checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
    +      headerChecker: CSVHeaderChecker,
    +      schema: StructType): Iterator[InternalRow] = {
         val tokenizer = parser.tokenizer
         val safeParser = new FailureSafeParser[Array[String]](
           input => Seq(parser.convert(input)),
           parser.options.parseMode,
           schema,
           parser.options.columnNameOfCorruptRecord,
           parser.options.multiLine)
    -    convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>
    +
    +    val handleHeader: () => Unit =
    +      () => headerChecker.checkHeaderColumnNames(tokenizer)
    +
    +    convertStream(inputStream, tokenizer, handleHeader) { tokens =>
           safeParser.parse(tokens)
         }.flatten
       }
     
       private def convertStream[T](
           inputStream: InputStream,
    -      shouldDropHeader: Boolean,
           tokenizer: CsvParser,
    -      checkHeader: Array[String] => Unit = _ => ())(
    +      handleHeader: () => Unit)(
           convert: Array[String] => T) = new Iterator[T] {
         tokenizer.beginParsing(inputStream)
    -    private var nextRecord = {
    -      if (shouldDropHeader) {
    -        val firstRecord = tokenizer.parseNext()
    -        checkHeader(firstRecord)
    -      }
    -      tokenizer.parseNext()
    -    }
    +
    +    // We can handle header here since here the stream is open.
    +    handleHeader()
    --- End diff --
    
    It is but I guess it was already doing in this way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223748041
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
    @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         val actualSchema =
           StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
     
    -    val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) {
    -      val firstLine = maybeFirstLine.get
    -      val parser = new CsvParser(parsedOptions.asParserSettings)
    -      val columnNames = parser.parseLine(firstLine)
    -      CSVDataSource.checkHeaderColumnNames(
    +    val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
    +      val headerChecker = new CSVHeaderChecker(
             actualSchema,
    -        columnNames,
    -        csvDataset.getClass.getCanonicalName,
    -        parsedOptions.enforceSchema,
    -        sparkSession.sessionState.conf.caseSensitiveAnalysis)
    +        parsedOptions,
    +        source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
    +      headerChecker.checkHeaderColumnNames(firstLine)
           filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
    -    } else {
    -      filteredLines.rdd
    -    }
    +    }.getOrElse(filteredLines.rdd)
    --- End diff --
    
    I don't exactly remember. Looks we can change it to `Dataset`. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22676


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97240/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97240 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97240/testReport)** for PR 22676 at commit [`c504356`](https://github.com/apache/spark/commit/c504356b847e183f571a09ce5f808d4a7f229255).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    cc @cloud-fan and @MaxGekk 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223594430
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.csv
    +
    +import com.univocity.parsers.csv.CsvParser
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * Checks that column names in a CSV header and field names in the schema are the same
    + * by taking into account case sensitivity.
    + *
    + * @param schema provided (or inferred) schema to which CSV must conform.
    + * @param options parsed CSV options.
    + * @param source name of CSV source that are currently checked. It is used in error messages.
    + * @param isStartOfFile indicates if the currently processing partition is the start of the file.
    + *                      if unknown or not applicable (for instance when the input is a dataset),
    + *                      can be omitted.
    + */
    +class CSVHeaderChecker(
    +    schema: StructType,
    +    options: CSVOptions,
    +    source: String,
    +    isStartOfFile: Boolean = false) extends Logging {
    +
    +  // Indicates if it is set to `false`, comparison of column names and schema field
    +  // names is not case sensitive.
    +  private val caseSensitive = SQLConf.get.caseSensitiveAnalysis
    +
    +  // Indicates if it is `true`, column names are ignored otherwise the CSV column
    +  // names are checked for conformance to the schema. In the case if
    +  // the column name don't conform to the schema, an exception is thrown.
    +  private val enforceSchema = options.enforceSchema
    +
    +  /**
    +   * Checks that column names in a CSV header and field names in the schema are the same
    +   * by taking into account case sensitivity.
    +   *
    +   * @param columnNames names of CSV columns that must be checked against to the schema.
    +   */
    +  private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
    +    if (columnNames != null) {
    +      val fieldNames = schema.map(_.name).toIndexedSeq
    +      val (headerLen, schemaSize) = (columnNames.size, fieldNames.length)
    +      var errorMessage: Option[String] = None
    +
    +      if (headerLen == schemaSize) {
    +        var i = 0
    +        while (errorMessage.isEmpty && i < headerLen) {
    +          var (nameInSchema, nameInHeader) = (fieldNames(i), columnNames(i))
    +          if (!caseSensitive) {
    +            // scalastyle:off caselocale
    +            nameInSchema = nameInSchema.toLowerCase
    +            nameInHeader = nameInHeader.toLowerCase
    +            // scalastyle:on caselocale
    +          }
    +          if (nameInHeader != nameInSchema) {
    +            errorMessage = Some(
    +              s"""|CSV header does not conform to the schema.
    +                  | Header: ${columnNames.mkString(", ")}
    +                  | Schema: ${fieldNames.mkString(", ")}
    +                  |Expected: ${fieldNames(i)} but found: ${columnNames(i)}
    +                  |$source""".stripMargin)
    --- End diff --
    
    only this diff. 
    
    Previously it was 
    
    ```
     |CSV file: $fileName""".stripMargin)
    ```
    
    which ends up with producing the class of source here. See (https://github.com/apache/spark/pull/22676/files#diff-f70bda59304588cc3abfa3a9840653f4R512)
    
    This is only the diff in this method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    **[Test build #97149 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97149/testReport)** for PR 22676 at commit [`89f7911`](https://github.com/apache/spark/commit/89f79113e6dc5da1903fed9dc04b1c7b5769d8f2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223743548
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala ---
    @@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource {
         maybeFirstLine.map(csvParser.parseLine(_)) match {
           case Some(firstRow) if firstRow != null =>
             val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
    -        val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
    +        val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions)
    --- End diff --
    
    Because mostly in this codes use `CSVUtils...` one. I just followed it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3825/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223593701
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -273,44 +274,47 @@ private[csv] object UnivocityParser {
           inputStream: InputStream,
           shouldDropHeader: Boolean,
           tokenizer: CsvParser): Iterator[Array[String]] = {
    -    convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
    +    val handleHeader: () => Unit =
    +      () => if (shouldDropHeader) tokenizer.parseNext
    --- End diff --
    
    This is used in schema inference path, where we don't check header. Here only it drops  the header.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22676
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3874/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223729530
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.csv
    +
    +import com.univocity.parsers.csv.CsvParser
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * Checks that column names in a CSV header and field names in the schema are the same
    + * by taking into account case sensitivity.
    + *
    + * @param schema provided (or inferred) schema to which CSV must conform.
    + * @param options parsed CSV options.
    + * @param source name of CSV source that are currently checked. It is used in error messages.
    + * @param isStartOfFile indicates if the currently processing partition is the start of the file.
    + *                      if unknown or not applicable (for instance when the input is a dataset),
    + *                      can be omitted.
    + */
    +class CSVHeaderChecker(
    --- End diff --
    
    Is this prefix of `CSVHeaderChecker` necessary? The class is in `csv` package already. It should be clear that it checks CSV headers.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223727251
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
    @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         val actualSchema =
           StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
     
    -    val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) {
    -      val firstLine = maybeFirstLine.get
    -      val parser = new CsvParser(parsedOptions.asParserSettings)
    -      val columnNames = parser.parseLine(firstLine)
    -      CSVDataSource.checkHeaderColumnNames(
    +    val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
    +      val headerChecker = new CSVHeaderChecker(
             actualSchema,
    -        columnNames,
    -        csvDataset.getClass.getCanonicalName,
    -        parsedOptions.enforceSchema,
    -        sparkSession.sessionState.conf.caseSensitiveAnalysis)
    +        parsedOptions,
    +        source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
    +      headerChecker.checkHeaderColumnNames(firstLine)
           filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
    -    } else {
    -      filteredLines.rdd
    -    }
    +    }.getOrElse(filteredLines.rdd)
    --- End diff --
    
    It is not directly related to your changes. Just in case, why do we convert `Dataset` to `RDD` here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org