You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "Zouxxyy (via GitHub)" <gi...@apache.org> on 2023/11/23 04:02:50 UTC

[PR] [spark] Enhance spark push down filter [incubator-paimon]

Zouxxyy opened a new pull request, #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   
   <!-- Linking this pull request to the issue -->
   Linked issue: close #xxx
   
   <!-- What is the purpose of the change -->
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [spark] Enhance spark push down filter [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376#discussion_r1402948593


##########
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala:
##########
@@ -90,4 +90,8 @@ class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTab
   def loadTable(tableName: String): AbstractFileStoreTable = {
     catalog.getTable(Identifier.create(dbName0, tableName)).asInstanceOf[AbstractFileStoreTable]
   }
+
+  def explain(sql: String): String = {
+    spark.sql(s"EXPLAIN $sql").collect().map(_.mkString).mkString

Review Comment:
   please use `queryExecution` to determine logical/physical plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [spark] Enhance spark push down filter [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376#discussion_r1404134108


##########
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal, Or}
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.assertj.core.api.Assertions.assertThat
+
+class SparkPushDownTest extends PaimonSparkTestBase {
+
+  test(s"Paimon push down: apply partition filter push down with non-partitioned table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2')")
+
+    assertThat(spark.sql("SELECT * FROM T WHERE pt = 'p1'").queryExecution.optimizedPlan.exists {
+      case Filter(c: Expression, _) =>
+        c.exists {
+          case EqualTo(a: AttributeReference, l: Literal) =>
+            a.name.equals("pt") && l.value.toString.equals("p1")
+          case _ => false
+        }
+      case _ => false
+    }).isTrue
+  }
+
+  test(s"Paimon push down: apply partition filter push down with partitioned table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2'), (4, 'd', 'p3')")
+
+    // partition filter push down did not hit
+    assertThat(spark.sql("SELECT * FROM T WHERE id = '1'").queryExecution.optimizedPlan.exists {
+      case Filter(_: Expression, _) => true
+      case _ => false
+    }).isTrue
+    checkAnswer(spark.sql("SELECT * FROM T WHERE id = '1'"), Row(1, "a", "p1") :: Nil)
+
+    assertThat(
+      spark.sql("SELECT * FROM T WHERE id = '1' or pt = 'p1'").queryExecution.optimizedPlan.exists {
+        case Filter(_: Or, _) => true
+        case _ => false
+      }).isTrue
+    checkAnswer(
+      spark.sql("SELECT * FROM T WHERE id = '1' or pt = 'p1'"),
+      Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil)
+
+    // partition filter push down hit
+    assertThat(spark.sql("SELECT * FROM T WHERE pt = 'p1'").queryExecution.optimizedPlan.exists {
+      case Filter(_: Expression, _) => true
+      case _ => false
+    }).isFalse
+    checkAnswer(
+      spark.sql("SELECT * FROM T WHERE pt = 'p1'"),
+      Row(1, "a", "p1") :: Row(2, "b", "p1") :: Nil)
+
+    assertThat(
+      spark
+        .sql("SELECT * FROM T WHERE id = '1' and pt = 'p1'")
+        .queryExecution
+        .optimizedPlan
+        .exists {
+          case Filter(c: Expression, _) =>
+            c.exists {
+              case EqualTo(a: AttributeReference, l: Literal) =>
+                a.name.equals("pt") && l.value.toString.equals("p1")
+              case _ => false
+            }
+          case _ => false
+        }).isFalse
+    checkAnswer(spark.sql("SELECT * FROM T WHERE id = '1' and pt = 'p1'"), Row(1, "a", "p1") :: Nil)
+
+    assertThat(spark.sql("SELECT * FROM T WHERE pt < 'p3'").queryExecution.optimizedPlan.exists {
+      case Filter(_: Expression, _) => true
+      case _ => false
+    }).isFalse
+    checkAnswer(
+      spark.sql("SELECT * FROM T WHERE pt < 'p3'"),
+      Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3, "c", "p2") :: Nil)
+  }
+

Review Comment:
   maybe define two private methods (`checkFilterExists` and `checkFilterEquals`). These can be reused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [spark] Enhance spark push down filter [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376#discussion_r1404130961


##########
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal, Or}
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.assertj.core.api.Assertions.assertThat
+
+class SparkPushDownTest extends PaimonSparkTestBase {
+
+  test(s"Paimon push down: apply partition filter push down with non-partitioned table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2')")
+
+    assertThat(spark.sql("SELECT * FROM T WHERE pt = 'p1'").queryExecution.optimizedPlan.exists {
+      case Filter(c: Expression, _) =>
+        c.exists {
+          case EqualTo(a: AttributeReference, l: Literal) =>
+            a.name.equals("pt") && l.value.toString.equals("p1")
+          case _ => false
+        }
+      case _ => false
+    }).isTrue
+  }
+
+  test(s"Paimon push down: apply partition filter push down with partitioned table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2'), (4, 'd', 'p3')")
+
+    // partition filter push down did not hit
+    assertThat(spark.sql("SELECT * FROM T WHERE id = '1'").queryExecution.optimizedPlan.exists {
+      case Filter(_: Expression, _) => true
+      case _ => false
+    }).isTrue
+    checkAnswer(spark.sql("SELECT * FROM T WHERE id = '1'"), Row(1, "a", "p1") :: Nil)
+
+    assertThat(

Review Comment:
   please add some comments for this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [spark] Enhance spark push down filter [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376#discussion_r1404130095


##########
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal, Or}
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.assertj.core.api.Assertions.assertThat
+
+class SparkPushDownTest extends PaimonSparkTestBase {
+
+  test(s"Paimon push down: apply partition filter push down with non-partitioned table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2')")
+
+    assertThat(spark.sql("SELECT * FROM T WHERE pt = 'p1'").queryExecution.optimizedPlan.exists {

Review Comment:
   Assertions.assertTrue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [spark] Enhance spark push down filter [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [spark] Enhance spark push down filter [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on PR #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376#issuecomment-1829499703

   https://github.com/apache/incubator-paimon/issues/2404


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [spark] Enhance spark push down filter [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376#discussion_r1404135346


##########
paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import java.util.List;
+
+/** Visit the predicate and check if it only contains partition key's predicate. */
+public class PartitionPredicateVisitor implements PredicateVisitor<Boolean> {
+
+    private final List<String> partitionKeys;
+
+    public PartitionPredicateVisitor(List<String> partitionKeys) {
+        this.partitionKeys = partitionKeys;
+    }
+
+    @Override
+    public Boolean visit(LeafPredicate predicate) {
+        return partitionKeys.contains(predicate.fieldName());
+    }
+
+    @Override
+    public Boolean visit(CompoundPredicate predicate) {
+        for (Predicate child : predicate.children()) {

Review Comment:
   Thinking about a case that one child of `CompoundPredicate` is also `CompoundPredicate`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [spark] Enhance spark push down filter [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on PR #2376:
URL: https://github.com/apache/incubator-paimon/pull/2376#issuecomment-1827043785

   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org