You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2024/01/29 22:15:12 UTC

[PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

dtenedor opened a new pull request, #44939:
URL: https://github.com/apache/spark/pull/44939

   ### What changes were proposed in this pull request?
   
   This PR fixes a CSV parsing bug with existence default values and column pruning (https://issues.apache.org/jira/browse/SPARK-46890).
   
   The bug fix includes disabling column pruning specifically when checking the CSV header schema against the required schema expected by Catalyst. This makes the expected schema match what the CSV parser provides, since later we also happen instruct the CSV parser to disable column pruning and instead read each entire row in order to correctly assign the default value(s) during execution.
   
   ### Why are the changes needed?
   
   Before this change, queries from a subset of the columns in a CSV table whose `CREATE TABLE` statement contained default values would return an internal exception. For example:
   
   ```
   CREATE TABLE IF NOT EXISTS products (
     product_id INT,
     name STRING,
     price FLOAT default 0.0,
     quantity INT default 0
   )
   USING CSV
   OPTIONS (
     header 'true',
     inferSchema 'false',
     enforceSchema 'false',
     path '/Users/maximgekk/tmp/products.csv'
   );
   ```
   
   The CSV file products.csv:
   
   ```
   product_id,name,price,quantity
   1,Apple,0.50,100
   2,Banana,0.25,200
   3,Orange,0.75,50
   ```
   
   The query fails:
   
   ```
   spark-sql (default)> SELECT price FROM products;
   24/01/28 11:43:09 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6)
   java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema:
    Header length: 4, schema size: 1
   CSV file: file:///Users/maximgekk/tmp/products.csv
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   This PR adds test coverage.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk closed pull request #44939: [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning
URL: https://github.com/apache/spark/pull/44939


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #44939:
URL: https://github.com/apache/spark/pull/44939#issuecomment-1924577476

   Thanks again @MaxGekk for your reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44939:
URL: https://github.com/apache/spark/pull/44939#discussion_r1475177031


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -105,8 +105,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
       sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone,
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-    val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled

Review Comment:
   👍 I restored this to compute a separate `val` here, like before.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala:
##########
@@ -278,13 +280,28 @@ class CSVOptions(
     .getOrElse(UNESCAPED_QUOTE_HANDLING, "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
 
   /**
+   * Returns true if column pruning is enabled and there are no existence column default values in
+   * the [[schema]].
+   *
    * The column pruning feature can be enabled either via the CSV option `columnPruning` or
    * in non-multiline mode via initialization of CSV options by the SQL config:
    * `spark.sql.csv.parser.columnPruning.enabled`.
    * The feature is disabled in the `multiLine` mode because of the issue:
    * https://github.com/uniVocity/univocity-parsers/issues/529
+   *
+   * We disable column pruning when there are any column defaults, instead preferring to reach in
+   * each row and then post-process it to substitute the default values after.
    */
-  val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && columnPruning)
+  def isColumnPruningEnabled(schema: StructType): Boolean = {
+    var result = !multiLine && columnPruning
+    if (parameters != null) {

Review Comment:
   Good point, I converted this parameter lookup back to a `val` stored in the class and serialized to executors, so we don't try to lookup the CSV options in executors anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #44939:
URL: https://github.com/apache/spark/pull/44939#issuecomment-1920150550

   @MaxGekk I tried several different ways of testing this bug with DSV2 CSV scans, but was unable to use a schema with column defaults and hitting that breakpoint in the DSV2 `CSVPartitionReaderFactory`. I can hit that breakpoint, but not when loading the table schema from Hive which is necessary to get the column default metadata in there.
   
   At any rate, I updated `CSVOptions.isColumnPruningEnabled` only have one overload that takes the required schema and checks for column defaults, which enforces that all V1 and V2 callers use the same code now. So we should be OK.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #44939:
URL: https://github.com/apache/spark/pull/44939#issuecomment-1915670580

   @MaxGekk here is the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44939:
URL: https://github.com/apache/spark/pull/44939#discussion_r1476612504


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -125,7 +125,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
         actualRequiredSchema,
         parsedOptions,
         actualFilters)
-      val schema = if (isColumnPruningEnabled) actualRequiredSchema else actualDataSchema
+      // Use column pruning when specified by Catalyst, except when one or more columns have
+      // existence default value(s), since in that case we instruct the CSV parser to disable column
+      // pruning and instead read each entire row in order to correctly assign the default value(s).
+      val useColumnPruningForCheckingHeader = isColumnPruningEnabled
+      val schema = if (useColumnPruningForCheckingHeader) actualRequiredSchema else actualDataSchema

Review Comment:
   Sounds good, done.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3248,6 +3249,44 @@ abstract class CSVSuite
       }
     }
   }
+
+  test("SPARK-46890: CSV fails on a column with default and without enforcing schema") {
+    withTable("CarsTable") {
+      spark.sql(
+        s"""
+           |CREATE TABLE CarsTable(
+           |  year INT,
+           |  make STRING,
+           |  model STRING,
+           |  comment STRING DEFAULT '',
+           |  blank STRING DEFAULT '')
+           |USING csv
+           |OPTIONS (
+           |  header "true",
+           |  inferSchema "false",
+           |  enforceSchema "false",
+           |  path "${testFile(carsFile)}"
+           |)
+       """.stripMargin)
+      val expected = Seq(
+        Row("No comment"),
+        Row("Go get one now they are going fast"))
+      checkAnswer(
+        sql("SELECT comment FROM CarsTable WHERE year < 2014"),
+        expected)
+      checkAnswer(
+        spark.read.format("csv")
+          .options(
+      Map(

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on PR #44939:
URL: https://github.com/apache/spark/pull/44939#issuecomment-1916370859

   @dtenedor Thanks for the ping. I will review it today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on PR #44939:
URL: https://github.com/apache/spark/pull/44939#issuecomment-1925147440

   @dtenedor Should we backport it to `branch-3.5` and `branch-3.4`? If so, please, open separate PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on PR #44939:
URL: https://github.com/apache/spark/pull/44939#issuecomment-1925146975

   +1, LGTM. Merging to master.
   Thank you, @dtenedor and @cloud-fan for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44939:
URL: https://github.com/apache/spark/pull/44939#discussion_r1471206040


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala:
##########
@@ -286,6 +288,16 @@ class CSVOptions(
    */
   val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && columnPruning)
 
+  /**
+   * Returns true if column pruning is enabled and there are no existence column default values in
+   * the [[schema]]. This is useful when we want to disable column pruning when there are such
+   * defaults, instead preferring to reach in each row and then post-process it to substitute the
+   * default values after.

Review Comment:
   do we know why CSV column pruning doesn't work with default values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #44939:
URL: https://github.com/apache/spark/pull/44939#discussion_r1471368855


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3248,6 +3249,58 @@ abstract class CSVSuite
       }
     }
   }
+
+  test("SPARK-46890: CSV fails on a column with default and without enforcing schema") {
+    withTable("CarsTable") {
+      spark.sql(
+        s"""
+           |CREATE TABLE CarsTable(
+           |  year INT,
+           |  make STRING,
+           |  model STRING,
+           |  comment STRING DEFAULT '',
+           |  blank STRING DEFAULT '')
+           |USING csv
+           |OPTIONS (
+           |  header "true",
+           |  inferSchema "false",
+           |  enforceSchema "false",
+           |  path "${testFile(carsFile)}"
+           |)
+       """.stripMargin)
+      checkAnswer(
+        sql("SELECT comment FROM CarsTable"),
+        Seq(
+          Row("No comment"),
+          Row("Go get one now they are going fast"),
+          Row("")
+        ))
+    }
+    withTable("Products") {
+      spark.sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS Products (
+           |  product_id INT,
+           |  name STRING,
+           |  price FLOAT default 0.0,
+           |  quantity INT default 0
+           |)
+           |USING CSV
+           |OPTIONS (
+           |  header 'true',
+           |  inferSchema 'false',
+           |  enforceSchema 'false',
+           |  path "${testFile(productsFile)}"

Review Comment:
   If this data is used only once, please, create a temporary file as in other tests in the suite.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3248,6 +3249,58 @@ abstract class CSVSuite
       }
     }
   }
+
+  test("SPARK-46890: CSV fails on a column with default and without enforcing schema") {
+    withTable("CarsTable") {
+      spark.sql(
+        s"""
+           |CREATE TABLE CarsTable(
+           |  year INT,
+           |  make STRING,
+           |  model STRING,
+           |  comment STRING DEFAULT '',
+           |  blank STRING DEFAULT '')
+           |USING csv
+           |OPTIONS (
+           |  header "true",
+           |  inferSchema "false",
+           |  enforceSchema "false",
+           |  path "${testFile(carsFile)}"
+           |)
+       """.stripMargin)
+      checkAnswer(

Review Comment:
   Why do you need this check? The next one reproduces the issue, doesn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #44939:
URL: https://github.com/apache/spark/pull/44939#issuecomment-1919526449

   > @dtenedor Could you write or modify your new test to check V2 DS implementation. I put a breakpoint in CSVPartitionReaderFactory.
   And your test didn't hit the breakpoint.
   
   @MaxGekk Good question, reproducing it now, requesting to run this new `test("SPARK-46890: CSV fails on a column with default and without enforcing schema")` in IntelliJ brings up a drop-down for `CSVv1Suite` or `CSVv2Suite`. I am able to hit that breakpoint on the latter but not the former.
   
   <img width="851" alt="image" src="https://github.com/apache/spark/assets/99207096/d57c6901-db41-4740-ab75-0d83dcf877f9">
   
   <img width="934" alt="image" src="https://github.com/apache/spark/assets/99207096/dd0b118c-c142-49cb-a74c-c20920ac6e88">
   
   Interestingly, I copied the unit test to a new PR and it fails for both CSV V1 and CSV V2. So this PR fixes it for both versions, but only V2 hits the breakpoint you suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on PR #44939:
URL: https://github.com/apache/spark/pull/44939#issuecomment-1919608098

   @dtenedor Could you remove the first part of the test:
   ```scala
     test("SPARK-46890: CSV fails on a column with default and without enforcing schema") {
       withTable("Products") {
         spark.sql(
           s"""
              |CREATE TABLE IF NOT EXISTS Products (
              |  product_id INT,
              |  name STRING,
              |  price FLOAT default 0.0,
              |  quantity INT default 0
              |)
              |USING CSV
              |OPTIONS (
              |  header 'true',
              |  inferSchema 'false',
              |  enforceSchema 'false',
              |  path "${testFile(productsFile)}"
              |)
          """.stripMargin)
         checkAnswer(
           sql("SELECT price FROM Products"),
           Seq(
             Row(0.50),
             Row(0.25),
             Row(0.75)))
       }
     }
   ```
   Set a breakpoint inside of the main constructor of `CSVHeaderChecker`:
   <img width="952" alt="Screenshot 2024-01-31 at 20 43 42" src="https://github.com/apache/spark/assets/1580697/47f0e377-7ab7-4d89-b68c-b240b81ae036">
   you should see where we hit the breakpoint from:
   
   <img width="967" alt="Screenshot 2024-01-31 at 20 45 49" src="https://github.com/apache/spark/assets/1580697/09bfe2cf-a3ae-473a-9c93-35bb54a0f924">
   
   The `CSVFileFormat` is v1 implementation. It seems we fallback from V2 to V1 datasource implementation for some reasons.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #44939:
URL: https://github.com/apache/spark/pull/44939#discussion_r1476130520


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3248,6 +3249,44 @@ abstract class CSVSuite
       }
     }
   }
+
+  test("SPARK-46890: CSV fails on a column with default and without enforcing schema") {
+    withTable("CarsTable") {
+      spark.sql(
+        s"""
+           |CREATE TABLE CarsTable(
+           |  year INT,
+           |  make STRING,
+           |  model STRING,
+           |  comment STRING DEFAULT '',
+           |  blank STRING DEFAULT '')
+           |USING csv
+           |OPTIONS (
+           |  header "true",
+           |  inferSchema "false",
+           |  enforceSchema "false",
+           |  path "${testFile(carsFile)}"
+           |)
+       """.stripMargin)
+      val expected = Seq(
+        Row("No comment"),
+        Row("Go get one now they are going fast"))
+      checkAnswer(
+        sql("SELECT comment FROM CarsTable WHERE year < 2014"),
+        expected)
+      checkAnswer(
+        spark.read.format("csv")
+          .options(
+      Map(

Review Comment:
   Could you fix indentation here, please.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3248,6 +3249,44 @@ abstract class CSVSuite
       }
     }
   }
+
+  test("SPARK-46890: CSV fails on a column with default and without enforcing schema") {
+    withTable("CarsTable") {
+      spark.sql(
+        s"""
+           |CREATE TABLE CarsTable(
+           |  year INT,
+           |  make STRING,
+           |  model STRING,
+           |  comment STRING DEFAULT '',
+           |  blank STRING DEFAULT '')
+           |USING csv
+           |OPTIONS (
+           |  header "true",
+           |  inferSchema "false",
+           |  enforceSchema "false",
+           |  path "${testFile(carsFile)}"
+           |)
+       """.stripMargin)
+      val expected = Seq(
+        Row("No comment"),
+        Row("Go get one now they are going fast"))
+      checkAnswer(
+        sql("SELECT comment FROM CarsTable WHERE year < 2014"),
+        expected)
+      checkAnswer(
+        spark.read.format("csv")
+          .options(
+      Map(
+        "header" -> "true",
+        "inferSchema" -> "true",
+        "enforceSchema" -> "false"))
+        .load(testFile(carsFile))
+          .select("comment")
+          .where("year < 2014"),
+        expected)

Review Comment:
   ```suggestion
           spark.read.format("csv")
             .options(Map("header" -> "true", "inferSchema" -> "true", "enforceSchema" -> "false"))
             .load(testFile(carsFile)).select("comment").where("year < 2014"),
           expected)
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -125,7 +125,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
         actualRequiredSchema,
         parsedOptions,
         actualFilters)
-      val schema = if (isColumnPruningEnabled) actualRequiredSchema else actualDataSchema
+      // Use column pruning when specified by Catalyst, except when one or more columns have
+      // existence default value(s), since in that case we instruct the CSV parser to disable column
+      // pruning and instead read each entire row in order to correctly assign the default value(s).
+      val useColumnPruningForCheckingHeader = isColumnPruningEnabled
+      val schema = if (useColumnPruningForCheckingHeader) actualRequiredSchema else actualDataSchema

Review Comment:
   Let's inline `useColumnPruningForCheckingHeader`:
   ```suggestion
         val schema = if (isColumnPruningEnabled) actualRequiredSchema else actualDataSchema
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #44939:
URL: https://github.com/apache/spark/pull/44939#discussion_r1474938302


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -105,8 +105,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
       sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone,
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-    val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled

Review Comment:
   This one is stored to a value to transfer (in a serializable  form) it from the driver to executors because you cannot access to `parameters` on executors.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala:
##########
@@ -278,13 +280,28 @@ class CSVOptions(
     .getOrElse(UNESCAPED_QUOTE_HANDLING, "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
 
   /**
+   * Returns true if column pruning is enabled and there are no existence column default values in
+   * the [[schema]].
+   *
    * The column pruning feature can be enabled either via the CSV option `columnPruning` or
    * in non-multiline mode via initialization of CSV options by the SQL config:
    * `spark.sql.csv.parser.columnPruning.enabled`.
    * The feature is disabled in the `multiLine` mode because of the issue:
    * https://github.com/uniVocity/univocity-parsers/issues/529
+   *
+   * We disable column pruning when there are any column defaults, instead preferring to reach in
+   * each row and then post-process it to substitute the default values after.
    */
-  val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && columnPruning)
+  def isColumnPruningEnabled(schema: StructType): Boolean = {
+    var result = !multiLine && columnPruning
+    if (parameters != null) {

Review Comment:
   We shouldn't even try to access to `parameters` when it is `null` because:
   1. We never pass `null` as `parameters`.
   2. `parameters` are `@transient`, so, it is serialized and apparently not transferred to executors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44939:
URL: https://github.com/apache/spark/pull/44939#discussion_r1476613270


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3248,6 +3249,44 @@ abstract class CSVSuite
       }
     }
   }
+
+  test("SPARK-46890: CSV fails on a column with default and without enforcing schema") {
+    withTable("CarsTable") {
+      spark.sql(
+        s"""
+           |CREATE TABLE CarsTable(
+           |  year INT,
+           |  make STRING,
+           |  model STRING,
+           |  comment STRING DEFAULT '',
+           |  blank STRING DEFAULT '')
+           |USING csv
+           |OPTIONS (
+           |  header "true",
+           |  inferSchema "false",
+           |  enforceSchema "false",
+           |  path "${testFile(carsFile)}"
+           |)
+       """.stripMargin)
+      val expected = Seq(
+        Row("No comment"),
+        Row("Go get one now they are going fast"))
+      checkAnswer(
+        sql("SELECT comment FROM CarsTable WHERE year < 2014"),
+        expected)
+      checkAnswer(
+        spark.read.format("csv")
+          .options(
+      Map(
+        "header" -> "true",
+        "inferSchema" -> "true",
+        "enforceSchema" -> "false"))
+        .load(testFile(carsFile))
+          .select("comment")
+          .where("year < 2014"),
+        expected)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org