You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/15 12:12:19 UTC

[spark] branch branch-2.4 updated: [SPARK-30826][SQL] Respect reference case in `StringStartsWith` pushed down to parquet

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 6294cb8  [SPARK-30826][SQL] Respect reference case in `StringStartsWith` pushed down to parquet
6294cb8 is described below

commit 6294cb8af552e7e62ad7cafa05eb33763db02dc4
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Sat Feb 15 19:49:58 2020 +0800

    [SPARK-30826][SQL] Respect reference case in `StringStartsWith` pushed down to parquet
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to convert the attribute name of `StringStartsWith` pushed down to the Parquet datasource to column reference via the `nameToParquetField` map. Similar conversions are performed for other source filters pushed down to parquet.
    
    ### Why are the changes needed?
    This fixes the bug described in [SPARK-30826](https://issues.apache.org/jira/browse/SPARK-30826). The query from an external table:
    ```sql
    CREATE TABLE t1 (col STRING)
    USING parquet
    OPTIONS (path '$path')
    ```
    created on top of written parquet files by `Seq("42").toDF("COL").write.parquet(path)` returns wrong empty result:
    ```scala
    spark.sql("SELECT * FROM t1 WHERE col LIKE '4%'").show
    +---+
    |col|
    +---+
    +---+
    ```
    
    ### Does this PR introduce any user-facing change?
    Yes. After the changes the result is correct for the example above:
    ```scala
    spark.sql("SELECT * FROM t1 WHERE col LIKE '4%'").show
    +---+
    |col|
    +---+
    | 42|
    +---+
    ```
    
    ### How was this patch tested?
    Added a test to `ParquetFilterSuite`
    
    Closes #27574 from MaxGekk/parquet-StringStartsWith-case-sens.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 8b73b92aadd685b29ef3d9b33366f5e1fd3dae99)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/parquet/ParquetFilters.scala        |  2 +-
 .../datasources/parquet/ParquetFilterSuite.scala    | 21 +++++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7e420d3..5a833e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -519,7 +519,7 @@ private[parquet] class ParquetFilters(
       case sources.StringStartsWith(name, prefix)
           if pushDownStartWith && canMakeFilterOn(name, prefix) =>
         Option(prefix).map { v =>
-          FilterApi.userDefined(binaryColumn(name),
+          FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldName),
             new UserDefinedPredicate[Binary] with Serializable {
               private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
               private val size = strToBinary.length
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 0f04e82..22cbacb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1142,6 +1142,27 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-30826: case insensitivity of StringStartsWith attribute") {
+    import testImplicits._
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      withTable("t1") {
+        withTempPath { dir =>
+          val path = dir.toURI.toString
+          Seq("42").toDF("COL").write.parquet(path)
+          spark.sql(
+            s"""
+               |CREATE TABLE t1 (col STRING)
+               |USING parquet
+               |OPTIONS (path '$path')
+           """.stripMargin)
+          checkAnswer(
+            spark.sql("SELECT * FROM t1 WHERE col LIKE '4%'"),
+            Row("42"))
+        }
+      }
+    }
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org