You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/04/21 08:17:48 UTC

[spark] branch branch-3.3 updated: [SPARK-38432][SQL][FOLLOWUP] Fix problems in And/Or/Not to V2 Filter

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new c2fa3b80e68 [SPARK-38432][SQL][FOLLOWUP] Fix problems in And/Or/Not to V2 Filter
c2fa3b80e68 is described below

commit c2fa3b80e6807d4f66d23795d54c2ee59478358b
Author: huaxingao <hu...@apple.com>
AuthorDate: Thu Apr 21 16:16:47 2022 +0800

    [SPARK-38432][SQL][FOLLOWUP] Fix problems in And/Or/Not to V2 Filter
    
    ### What changes were proposed in this pull request?
    Instead of having
    ```
    override def toV2: Predicate = new Predicate("AND", Seq(left, right).map(_.toV2).toArray)
    ```
    I think we should construct a V2 `And` directly.
    ```
    override def toV2: Predicate = new org.apache.spark.sql.connector.expressions.filter.And(left.toV2, right.toV2)
    ```
    same for `Or` and `Not`.
    
    ### Why are the changes needed?
    bug fixing
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New tests
    
    Closes #36290 from huaxingao/toV1.
    
    Authored-by: huaxingao <hu...@apple.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 36fc8bd185da99b64954ca0dd393b452fb788226)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/sources/filters.scala     |  8 +-
 .../datasources/v2/V2PredicateSuite.scala          | 93 ++++++++++++++++++++++
 2 files changed, 97 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 9954821e7ce..66ec4a6c7b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.{Evolving, Stable}
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
 import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
-import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, Predicate}
+import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate}
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -270,7 +270,7 @@ case class IsNotNull(attribute: String) extends Filter {
 @Stable
 case class And(left: Filter, right: Filter) extends Filter {
   override def references: Array[String] = left.references ++ right.references
-  override def toV2: Predicate = new Predicate("AND", Seq(left, right).map(_.toV2).toArray)
+  override def toV2: Predicate = new V2And(left.toV2, right.toV2)
 }
 
 /**
@@ -281,7 +281,7 @@ case class And(left: Filter, right: Filter) extends Filter {
 @Stable
 case class Or(left: Filter, right: Filter) extends Filter {
   override def references: Array[String] = left.references ++ right.references
-  override def toV2: Predicate = new Predicate("OR", Seq(left, right).map(_.toV2).toArray)
+  override def toV2: Predicate = new V2Or(left.toV2, right.toV2)
 }
 
 /**
@@ -292,7 +292,7 @@ case class Or(left: Filter, right: Filter) extends Filter {
 @Stable
 case class Not(child: Filter) extends Filter {
   override def references: Array[String] = child.references
-  override def toV2: Predicate = new Predicate("NOT", Array(child.toV2))
+  override def toV2: Predicate = new V2Not(child.toV2)
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2PredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2PredicateSuite.scala
index 2d6e6fcf161..2df8b8e56c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2PredicateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2PredicateSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, Literal, LiteralValue}
 import org.apache.spark.sql.connector.expressions.filter._
 import org.apache.spark.sql.execution.datasources.v2.V2PredicateSuite.ref
+import org.apache.spark.sql.sources.{AlwaysFalse => V1AlwaysFalse, AlwaysTrue => V1AlwaysTrue, And => V1And, EqualNullSafe, EqualTo, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not => V1Not, Or => V1Or, StringContains, StringEndsWith, StringStartsWith}
 import org.apache.spark.sql.types.{IntegerType, StringType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -31,16 +32,22 @@ class V2PredicateSuite extends SparkFunSuite {
       new Predicate("=", Array[Expression](ref("a", "B"), LiteralValue(1, IntegerType)))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a.B"))
     assert(predicate1.describe.equals("a.B = 1"))
+    val v1Filter1 = EqualTo(ref("a", "B").describe(), 1)
+    assert(v1Filter1.toV2 == predicate1)
 
     val predicate2 =
       new Predicate("=", Array[Expression](ref("a", "b.c"), LiteralValue(1, IntegerType)))
     assert(predicate2.references.map(_.describe()).toSeq == Seq("a.`b.c`"))
     assert(predicate2.describe.equals("a.`b.c` = 1"))
+    val v1Filter2 = EqualTo(ref("a", "b.c").describe(), 1)
+    assert(v1Filter2.toV2 == predicate2)
 
     val predicate3 =
       new Predicate("=", Array[Expression](ref("`a`.b", "c"), LiteralValue(1, IntegerType)))
     assert(predicate3.references.map(_.describe()).toSeq == Seq("```a``.b`.c"))
     assert(predicate3.describe.equals("```a``.b`.c = 1"))
+    val v1Filter3 = EqualTo(ref("`a`.b", "c").describe(), 1)
+    assert(v1Filter3.toV2 == predicate3)
   }
 
   test("AlwaysTrue") {
@@ -49,6 +56,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).length == 0)
     assert(predicate1.describe.equals("TRUE"))
+
+    val v1Filter = V1AlwaysTrue
+    assert(v1Filter.toV2 == predicate1)
   }
 
   test("AlwaysFalse") {
@@ -57,6 +67,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).length == 0)
     assert(predicate1.describe.equals("FALSE"))
+
+    val v1Filter = V1AlwaysFalse
+    assert(v1Filter.toV2 == predicate1)
   }
 
   test("EqualTo") {
@@ -65,6 +78,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
     assert(predicate1.describe.equals("a = 1"))
+
+    val v1Filter = EqualTo("a", 1)
+    assert(v1Filter.toV2 == predicate1)
   }
 
   test("EqualNullSafe") {
@@ -73,6 +89,53 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
     assert(predicate1.describe.equals("(a = 1) OR (a IS NULL AND 1 IS NULL)"))
+
+    val v1Filter = EqualNullSafe("a", 1)
+    assert(v1Filter.toV2 == predicate1)
+  }
+
+  test("LessThan") {
+    val predicate1 = new Predicate("<", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+    val predicate2 = new Predicate("<", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+    assert(predicate1.equals(predicate2))
+    assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
+    assert(predicate1.describe.equals("a < 1"))
+
+    val v1Filter = LessThan("a", 1)
+    assert(v1Filter.toV2 == predicate1)
+  }
+
+  test("LessThanOrEqual") {
+    val predicate1 = new Predicate("<=", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+    val predicate2 = new Predicate("<=", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+    assert(predicate1.equals(predicate2))
+    assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
+    assert(predicate1.describe.equals("a <= 1"))
+
+    val v1Filter = LessThanOrEqual("a", 1)
+    assert(v1Filter.toV2 == predicate1)
+  }
+
+  test("GreatThan") {
+    val predicate1 = new Predicate(">", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+    val predicate2 = new Predicate(">", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+    assert(predicate1.equals(predicate2))
+    assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
+    assert(predicate1.describe.equals("a > 1"))
+
+    val v1Filter = GreaterThan("a", 1)
+    assert(v1Filter.toV2 == predicate1)
+  }
+
+  test("GreatThanOrEqual") {
+    val predicate1 = new Predicate(">=", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+    val predicate2 = new Predicate(">=", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+    assert(predicate1.equals(predicate2))
+    assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
+    assert(predicate1.describe.equals("a >= 1"))
+
+    val v1Filter = GreaterThanOrEqual("a", 1)
+    assert(v1Filter.toV2 == predicate1)
   }
 
   test("In") {
@@ -95,6 +158,12 @@ class V2PredicateSuite extends SparkFunSuite {
     expected = expected.dropRight(2)  // remove the last ", "
     expected += ")"
     assert(predicate3.describe.equals(expected))
+
+    val v1Filter1 = In("a", Array(1, 2, 3, 4))
+    assert(v1Filter1.toV2 == predicate1)
+
+    val v1Filter2 = In("a", values.map(_.value()))
+    assert(v1Filter2.toV2 == predicate3)
   }
 
   test("IsNull") {
@@ -103,6 +172,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
     assert(predicate1.describe.equals("a IS NULL"))
+
+    val v1Filter = IsNull("a")
+    assert(v1Filter.toV2 == predicate1)
   }
 
   test("IsNotNull") {
@@ -111,6 +183,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
     assert(predicate1.describe.equals("a IS NOT NULL"))
+
+    val v1Filter = IsNotNull("a")
+    assert(v1Filter.toV2 == predicate1)
   }
 
   test("Not") {
@@ -121,6 +196,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
     assert(predicate1.describe.equals("NOT (a < 1)"))
+
+    val v1Filter = V1Not(LessThan("a", 1))
+    assert(v1Filter.toV2 == predicate1)
   }
 
   test("And") {
@@ -133,6 +211,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a", "b"))
     assert(predicate1.describe.equals("(a = 1) AND (b = 1)"))
+
+    val v1Filter = V1And(EqualTo("a", 1), EqualTo("b", 1))
+    assert(v1Filter.toV2 == predicate1)
   }
 
   test("Or") {
@@ -145,6 +226,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a", "b"))
     assert(predicate1.describe.equals("(a = 1) OR (b = 1)"))
+
+    val v1Filter = V1Or(EqualTo("a", 1), EqualTo("b", 1))
+    assert(v1Filter.toV2.equals(predicate1))
   }
 
   test("StringStartsWith") {
@@ -156,6 +240,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
     assert(predicate1.describe.equals("a LIKE 'str%'"))
+
+    val v1Filter = StringStartsWith("a", "str")
+    assert(v1Filter.toV2.equals(predicate1))
   }
 
   test("StringEndsWith") {
@@ -167,6 +254,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
     assert(predicate1.describe.equals("a LIKE '%str'"))
+
+    val v1Filter = StringEndsWith("a", "str")
+    assert(v1Filter.toV2.equals(predicate1))
   }
 
   test("StringContains") {
@@ -178,6 +268,9 @@ class V2PredicateSuite extends SparkFunSuite {
     assert(predicate1.equals(predicate2))
     assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
     assert(predicate1.describe.equals("a LIKE '%str%'"))
+
+    val v1Filter = StringContains("a", "str")
+    assert(v1Filter.toV2.equals(predicate1))
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org