You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/04 07:16:52 UTC

[GitHub] [spark] WangGuangxin commented on a diff in pull request #36328: [SPARK-39002][SQL] StringEndsWith/Contains support push down to Parquet

WangGuangxin commented on code in PR #36328:
URL: https://github.com/apache/spark/pull/36328#discussion_r864518551


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala:
##########
@@ -1423,65 +1442,123 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
     }
   }
 
-  test("filter pushdown - StringStartsWith") {
+  private def checkStringFilterPushdown(
+      stringPredicate: String => Expression,
+      sourceFilter: (String, String) => sources.Filter): Unit = {
     withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df =>
       checkFilterPredicate(
-        $"_1".startsWith("").asInstanceOf[Predicate],
+        stringPredicate("").asInstanceOf[Predicate],
         classOf[UserDefinedByInstance[_, _]],
         Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
 
-      Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
+      Seq("2", "2str2").foreach { str =>
         checkFilterPredicate(
-          $"_1".startsWith(prefix).asInstanceOf[Predicate],
+          stringPredicate(str).asInstanceOf[Predicate],
           classOf[UserDefinedByInstance[_, _]],
           "2str2")
       }
 
-      Seq("2S", "null", "2str22").foreach { prefix =>
+      Seq("2S", "null", "2str22").foreach { str =>
         checkFilterPredicate(
-          $"_1".startsWith(prefix).asInstanceOf[Predicate],
+          stringPredicate(str).asInstanceOf[Predicate],
           classOf[UserDefinedByInstance[_, _]],
           Seq.empty[Row])
       }
 
       checkFilterPredicate(
-        !$"_1".startsWith("").asInstanceOf[Predicate],
+        !stringPredicate("").asInstanceOf[Predicate],
         classOf[Operators.Not],
         Seq().map(Row(_)))
 
-      Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
+      Seq("2", "2str2").foreach { str =>
         checkFilterPredicate(
-          !$"_1".startsWith(prefix).asInstanceOf[Predicate],
+          !stringPredicate(str).asInstanceOf[Predicate],
           classOf[Operators.Not],
           Seq("1str1", "3str3", "4str4").map(Row(_)))
       }
 
-      Seq("2S", "null", "2str22").foreach { prefix =>
+      Seq("2S", "null", "2str22").foreach { str =>
         checkFilterPredicate(
-          !$"_1".startsWith(prefix).asInstanceOf[Predicate],
+          !stringPredicate(str).asInstanceOf[Predicate],
           classOf[Operators.Not],
           Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
       }
 
       val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
       assertResult(None) {
-        createParquetFilters(schema).createFilter(sources.StringStartsWith("_1", null))
+        createParquetFilters(schema).createFilter(sourceFilter("_1", null))
       }
     }
 
     // SPARK-28371: make sure filter is null-safe.
     withParquetDataFrame(Seq(Tuple1[String](null))) { implicit df =>
       checkFilterPredicate(
-        $"_1".startsWith("blah").asInstanceOf[Predicate],
+        stringPredicate("blah").asInstanceOf[Predicate],
         classOf[UserDefinedByInstance[_, _]],
         Seq.empty[Row])
     }
+  }
+
+  test("filter pushdown - StringStartsWith") {
+    checkStringFilterPushdown(
+      str => $"_1".startsWith(str),
+      (attr, value) => sources.StringStartsWith(attr, value))
+  }
+
+  test("filter pushdown - StringEndsWith") {
+    checkStringFilterPushdown(
+      str => $"_1".endsWith(str),
+      (attr, value) => sources.StringEndsWith(attr, value))
+  }
+
+  test("filter pushdown - StringContains") {
+    checkStringFilterPushdown(
+      str => $"_1".contains(str),
+      (attr, value) => sources.StringContains(attr, value))
+  }
 
+  test("filter pushdown - StringPredicate") {
     import testImplicits._
-    // Test canDrop() has taken effect
-    testStringStartsWith(spark.range(1024).map(_.toString).toDF(), "value like 'a%'")
-    // Test inverseCanDrop() has taken effect
-    testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not like '10%'")
+    // keep() should take effect on StartsWith/EndsWith/Contains
+    Seq(
+      "value like 'a%'", // StartsWith
+      "value like '%a'", // EndsWith
+      "value like '%a%'" // Contains

Review Comment:
   Hi @sadikovi , the NumRowGroupsAcc is the actually filtered row groups, you can find it here https://github.com/apache/spark/blob/a1aa200bdf32e55ea3b1f220da882b29a7a2bf9b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java#L130.
   
   As to the `keep()` test,  the dictionary filter is enabled and there are duplicated records in test data, so parquet will generate dictionary when writing data and dictionary filter is used when reading it.
   
   When we test `canDrop`, the test data has no duplicate so there is no dictionary generated in parquet, statistics row group filter is used which will call `canDrop`.
   
   Correct me if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org