You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2022/07/22 20:49:19 UTC

[spark] branch master updated: [SPARK-39784][SQL] Put Literal values on the right side of the data source filter after translating Catalyst Expression to data source filter

This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e2b1ae1021 [SPARK-39784][SQL] Put Literal values on the right side of the data source filter after translating Catalyst Expression to data source filter
2e2b1ae1021 is described below

commit 2e2b1ae1021bc4bc99f9749e05e4770be3aec43f
Author: huaxingao <hu...@apple.com>
AuthorDate: Fri Jul 22 13:49:00 2022 -0700

    [SPARK-39784][SQL] Put Literal values on the right side of the data source filter after translating Catalyst Expression to data source filter
    
    ### What changes were proposed in this pull request?
    
    Even though the literal value could be on both sides of the filter, e.g. both `a > 1` and `1 < a` are valid, after translating Catalyst Expression to data source filter, we want the literal value on the right side so it's easier for the data source to handle these filters. We do this kind of normalization for V1 Filter. We should have the same behavior for V2 Filter.
    
    Before this PR, for the filters that have literal values on the right side, e.g. `1 > a`, we keep it as is. After this PR, we will normalize it to `a < 1` so the data source doesn't need to check each of the filters (and do the flip).
    
    ### Why are the changes needed?
    I think we should follow V1 Filter's behavior, normalize the filters during catalyst Expression to DS Filter translation time to make the literal values on the right side, so later on, data source doesn't need to check every single filter to figure out if it needs to flip the sides.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    new test
    
    Closes #37197 from huaxingao/flip.
    
    Authored-by: huaxingao <hu...@apple.com>
    Signed-off-by: huaxingao <hu...@apple.com>
---
 .../sql/catalyst/util/V2ExpressionBuilder.scala    | 21 +++++++
 .../datasources/v2/DataSourceV2StrategySuite.scala | 67 +++++++++++++++++++++-
 2 files changed, 86 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
index 8bb65a88044..59cbcf48334 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
@@ -233,6 +233,10 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
       val r = generateExpression(b.right)
       if (l.isDefined && r.isDefined) {
         b match {
+          case _: Predicate if isBinaryComparisonOperator(b.sqlOperator) &&
+              l.get.isInstanceOf[LiteralValue[_]] && r.get.isInstanceOf[FieldReference] =>
+            Some(new V2Predicate(flipComparisonOperatorName(b.sqlOperator),
+              Array[V2Expression](r.get, l.get)))
           case _: Predicate =>
             Some(new V2Predicate(b.sqlOperator, Array[V2Expression](l.get, r.get)))
           case _ =>
@@ -408,6 +412,23 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
       }
     case _ => None
   }
+
+  private def isBinaryComparisonOperator(operatorName: String): Boolean = {
+    operatorName match {
+      case ">" | "<" | ">=" | "<=" | "=" | "<=>" => true
+      case _ => false
+    }
+  }
+
+  private def flipComparisonOperatorName(operatorName: String): String = {
+    operatorName match {
+      case ">" => "<"
+      case "<" => ">"
+      case ">=" => "<="
+      case "<=" => ">="
+      case _ => operatorName
+    }
+  }
 }
 
 object ColumnOrField {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala
index 66dc65cf681..c3f51bed269 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala
@@ -18,14 +18,77 @@
 package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
 import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, StructField, StructType}
 
 class DataSourceV2StrategySuite extends PlanTest with SharedSparkSession {
+  val attrInts = Seq(
+    $"cint".int,
+    $"c.int".int,
+    GetStructField($"a".struct(StructType(
+      StructField("cstr", StringType, nullable = true) ::
+        StructField("cint", IntegerType, nullable = true) :: Nil)), 1, None),
+    GetStructField($"a".struct(StructType(
+      StructField("c.int", IntegerType, nullable = true) ::
+        StructField("cstr", StringType, nullable = true) :: Nil)), 0, None),
+    GetStructField($"a.b".struct(StructType(
+      StructField("cstr1", StringType, nullable = true) ::
+        StructField("cstr2", StringType, nullable = true) ::
+        StructField("cint", IntegerType, nullable = true) :: Nil)), 2, None),
+    GetStructField($"a.b".struct(StructType(
+      StructField("c.int", IntegerType, nullable = true) :: Nil)), 0, None),
+    GetStructField(GetStructField($"a".struct(StructType(
+      StructField("cstr1", StringType, nullable = true) ::
+        StructField("b", StructType(StructField("cint", IntegerType, nullable = true) ::
+          StructField("cstr2", StringType, nullable = true) :: Nil)) :: Nil)), 1, None), 0, None)
+  ).zip(Seq(
+    "cint",
+    "`c.int`", // single level field that contains `dot` in name
+    "a.cint", // two level nested field
+    "a.`c.int`", // two level nested field, and nested level contains `dot`
+    "`a.b`.cint", // two level nested field, and top level contains `dot`
+    "`a.b`.`c.int`", // two level nested field, and both levels contain `dot`
+    "a.b.cint" // three level nested field
+  ))
+
+  test("SPARK-39784: translate binary expression") { attrInts
+    .foreach { case (attrInt, intColName) =>
+      testTranslateFilter(EqualTo(attrInt, 1),
+        Some(new Predicate("=", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+      testTranslateFilter(EqualTo(1, attrInt),
+        Some(new Predicate("=", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+
+      testTranslateFilter(EqualNullSafe(attrInt, 1),
+        Some(new Predicate("<=>", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+      testTranslateFilter(EqualNullSafe(1, attrInt),
+        Some(new Predicate("<=>", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+
+      testTranslateFilter(GreaterThan(attrInt, 1),
+        Some(new Predicate(">", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+      testTranslateFilter(GreaterThan(1, attrInt),
+        Some(new Predicate("<", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+
+      testTranslateFilter(LessThan(attrInt, 1),
+        Some(new Predicate("<", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+      testTranslateFilter(LessThan(1, attrInt),
+        Some(new Predicate(">", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+
+      testTranslateFilter(GreaterThanOrEqual(attrInt, 1),
+        Some(new Predicate(">=", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+      testTranslateFilter(GreaterThanOrEqual(1, attrInt),
+        Some(new Predicate("<=", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+
+      testTranslateFilter(LessThanOrEqual(attrInt, 1),
+        Some(new Predicate("<=", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+      testTranslateFilter(LessThanOrEqual(1, attrInt),
+        Some(new Predicate(">=", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
+    }
+  }
+
   test("SPARK-36644: Push down boolean column filter") {
     testTranslateFilter($"col".boolean,
       Some(new Predicate("=", Array(FieldReference("col"), LiteralValue(true, BooleanType)))))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org