You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/04/21 08:17:48 UTC
[spark] branch branch-3.3 updated: [SPARK-38432][SQL][FOLLOWUP] Fix problems in And/Or/Not to V2 Filter
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new c2fa3b80e68 [SPARK-38432][SQL][FOLLOWUP] Fix problems in And/Or/Not to V2 Filter
c2fa3b80e68 is described below
commit c2fa3b80e6807d4f66d23795d54c2ee59478358b
Author: huaxingao <hu...@apple.com>
AuthorDate: Thu Apr 21 16:16:47 2022 +0800
[SPARK-38432][SQL][FOLLOWUP] Fix problems in And/Or/Not to V2 Filter
### What changes were proposed in this pull request?
Instead of having
```
override def toV2: Predicate = new Predicate("AND", Seq(left, right).map(_.toV2).toArray)
```
I think we should construct a V2 `And` directly.
```
override def toV2: Predicate = new org.apache.spark.sql.connector.expressions.filter.And(left.toV2, right.toV2)
```
same for `Or` and `Not`.
### Why are the changes needed?
bug fixing
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New tests
Closes #36290 from huaxingao/toV1.
Authored-by: huaxingao <hu...@apple.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 36fc8bd185da99b64954ca0dd393b452fb788226)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/sources/filters.scala | 8 +-
.../datasources/v2/V2PredicateSuite.scala | 93 ++++++++++++++++++++++
2 files changed, 97 insertions(+), 4 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 9954821e7ce..66ec4a6c7b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.{Evolving, Stable}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
-import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, Predicate}
+import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate}
import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String
@@ -270,7 +270,7 @@ case class IsNotNull(attribute: String) extends Filter {
@Stable
case class And(left: Filter, right: Filter) extends Filter {
override def references: Array[String] = left.references ++ right.references
- override def toV2: Predicate = new Predicate("AND", Seq(left, right).map(_.toV2).toArray)
+ override def toV2: Predicate = new V2And(left.toV2, right.toV2)
}
/**
@@ -281,7 +281,7 @@ case class And(left: Filter, right: Filter) extends Filter {
@Stable
case class Or(left: Filter, right: Filter) extends Filter {
override def references: Array[String] = left.references ++ right.references
- override def toV2: Predicate = new Predicate("OR", Seq(left, right).map(_.toV2).toArray)
+ override def toV2: Predicate = new V2Or(left.toV2, right.toV2)
}
/**
@@ -292,7 +292,7 @@ case class Or(left: Filter, right: Filter) extends Filter {
@Stable
case class Not(child: Filter) extends Filter {
override def references: Array[String] = child.references
- override def toV2: Predicate = new Predicate("NOT", Array(child.toV2))
+ override def toV2: Predicate = new V2Not(child.toV2)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2PredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2PredicateSuite.scala
index 2d6e6fcf161..2df8b8e56c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2PredicateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2PredicateSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, Literal, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter._
import org.apache.spark.sql.execution.datasources.v2.V2PredicateSuite.ref
+import org.apache.spark.sql.sources.{AlwaysFalse => V1AlwaysFalse, AlwaysTrue => V1AlwaysTrue, And => V1And, EqualNullSafe, EqualTo, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not => V1Not, Or => V1Or, StringContains, StringEndsWith, StringStartsWith}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.unsafe.types.UTF8String
@@ -31,16 +32,22 @@ class V2PredicateSuite extends SparkFunSuite {
new Predicate("=", Array[Expression](ref("a", "B"), LiteralValue(1, IntegerType)))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a.B"))
assert(predicate1.describe.equals("a.B = 1"))
+ val v1Filter1 = EqualTo(ref("a", "B").describe(), 1)
+ assert(v1Filter1.toV2 == predicate1)
val predicate2 =
new Predicate("=", Array[Expression](ref("a", "b.c"), LiteralValue(1, IntegerType)))
assert(predicate2.references.map(_.describe()).toSeq == Seq("a.`b.c`"))
assert(predicate2.describe.equals("a.`b.c` = 1"))
+ val v1Filter2 = EqualTo(ref("a", "b.c").describe(), 1)
+ assert(v1Filter2.toV2 == predicate2)
val predicate3 =
new Predicate("=", Array[Expression](ref("`a`.b", "c"), LiteralValue(1, IntegerType)))
assert(predicate3.references.map(_.describe()).toSeq == Seq("```a``.b`.c"))
assert(predicate3.describe.equals("```a``.b`.c = 1"))
+ val v1Filter3 = EqualTo(ref("`a`.b", "c").describe(), 1)
+ assert(v1Filter3.toV2 == predicate3)
}
test("AlwaysTrue") {
@@ -49,6 +56,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).length == 0)
assert(predicate1.describe.equals("TRUE"))
+
+ val v1Filter = V1AlwaysTrue
+ assert(v1Filter.toV2 == predicate1)
}
test("AlwaysFalse") {
@@ -57,6 +67,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).length == 0)
assert(predicate1.describe.equals("FALSE"))
+
+ val v1Filter = V1AlwaysFalse
+ assert(v1Filter.toV2 == predicate1)
}
test("EqualTo") {
@@ -65,6 +78,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
assert(predicate1.describe.equals("a = 1"))
+
+ val v1Filter = EqualTo("a", 1)
+ assert(v1Filter.toV2 == predicate1)
}
test("EqualNullSafe") {
@@ -73,6 +89,53 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
assert(predicate1.describe.equals("(a = 1) OR (a IS NULL AND 1 IS NULL)"))
+
+ val v1Filter = EqualNullSafe("a", 1)
+ assert(v1Filter.toV2 == predicate1)
+ }
+
+ test("LessThan") {
+ val predicate1 = new Predicate("<", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+ val predicate2 = new Predicate("<", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+ assert(predicate1.equals(predicate2))
+ assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
+ assert(predicate1.describe.equals("a < 1"))
+
+ val v1Filter = LessThan("a", 1)
+ assert(v1Filter.toV2 == predicate1)
+ }
+
+ test("LessThanOrEqual") {
+ val predicate1 = new Predicate("<=", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+ val predicate2 = new Predicate("<=", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+ assert(predicate1.equals(predicate2))
+ assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
+ assert(predicate1.describe.equals("a <= 1"))
+
+ val v1Filter = LessThanOrEqual("a", 1)
+ assert(v1Filter.toV2 == predicate1)
+ }
+
+ test("GreatThan") {
+ val predicate1 = new Predicate(">", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+ val predicate2 = new Predicate(">", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+ assert(predicate1.equals(predicate2))
+ assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
+ assert(predicate1.describe.equals("a > 1"))
+
+ val v1Filter = GreaterThan("a", 1)
+ assert(v1Filter.toV2 == predicate1)
+ }
+
+ test("GreatThanOrEqual") {
+ val predicate1 = new Predicate(">=", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+ val predicate2 = new Predicate(">=", Array[Expression](ref("a"), LiteralValue(1, IntegerType)))
+ assert(predicate1.equals(predicate2))
+ assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
+ assert(predicate1.describe.equals("a >= 1"))
+
+ val v1Filter = GreaterThanOrEqual("a", 1)
+ assert(v1Filter.toV2 == predicate1)
}
test("In") {
@@ -95,6 +158,12 @@ class V2PredicateSuite extends SparkFunSuite {
expected = expected.dropRight(2) // remove the last ", "
expected += ")"
assert(predicate3.describe.equals(expected))
+
+ val v1Filter1 = In("a", Array(1, 2, 3, 4))
+ assert(v1Filter1.toV2 == predicate1)
+
+ val v1Filter2 = In("a", values.map(_.value()))
+ assert(v1Filter2.toV2 == predicate3)
}
test("IsNull") {
@@ -103,6 +172,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
assert(predicate1.describe.equals("a IS NULL"))
+
+ val v1Filter = IsNull("a")
+ assert(v1Filter.toV2 == predicate1)
}
test("IsNotNull") {
@@ -111,6 +183,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
assert(predicate1.describe.equals("a IS NOT NULL"))
+
+ val v1Filter = IsNotNull("a")
+ assert(v1Filter.toV2 == predicate1)
}
test("Not") {
@@ -121,6 +196,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
assert(predicate1.describe.equals("NOT (a < 1)"))
+
+ val v1Filter = V1Not(LessThan("a", 1))
+ assert(v1Filter.toV2 == predicate1)
}
test("And") {
@@ -133,6 +211,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a", "b"))
assert(predicate1.describe.equals("(a = 1) AND (b = 1)"))
+
+ val v1Filter = V1And(EqualTo("a", 1), EqualTo("b", 1))
+ assert(v1Filter.toV2 == predicate1)
}
test("Or") {
@@ -145,6 +226,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a", "b"))
assert(predicate1.describe.equals("(a = 1) OR (b = 1)"))
+
+ val v1Filter = V1Or(EqualTo("a", 1), EqualTo("b", 1))
+ assert(v1Filter.toV2.equals(predicate1))
}
test("StringStartsWith") {
@@ -156,6 +240,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
assert(predicate1.describe.equals("a LIKE 'str%'"))
+
+ val v1Filter = StringStartsWith("a", "str")
+ assert(v1Filter.toV2.equals(predicate1))
}
test("StringEndsWith") {
@@ -167,6 +254,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
assert(predicate1.describe.equals("a LIKE '%str'"))
+
+ val v1Filter = StringEndsWith("a", "str")
+ assert(v1Filter.toV2.equals(predicate1))
}
test("StringContains") {
@@ -178,6 +268,9 @@ class V2PredicateSuite extends SparkFunSuite {
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a"))
assert(predicate1.describe.equals("a LIKE '%str%'"))
+
+ val v1Filter = StringContains("a", "str")
+ assert(v1Filter.toV2.equals(predicate1))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org