You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/22 22:43:52 UTC

[hudi] branch master updated: [HUDI-4851] Fixing handling of `UTF8String` w/in `InSet` operator (#6739)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0558b6cfd9 [HUDI-4851] Fixing handling of `UTF8String` w/in `InSet` operator (#6739)
0558b6cfd9 is described below

commit 0558b6cfd916c9b48f79adfa4e0cf2ce821d51ec
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Sep 22 15:43:45 2022 -0700

    [HUDI-4851] Fixing handling of `UTF8String` w/in `InSet` operator (#6739)
    
    
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../apache/spark/sql/hudi/DataSkippingUtils.scala  | 12 +++++++--
 .../org/apache/hudi/TestDataSkippingUtils.scala    | 29 ++++++++++++++++++++--
 .../catalyst/encoders/DummyExpressionHolder.scala  | 29 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index 0fe62da0de..4a3cf38895 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, InSet, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith, SubqueryExpression}
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -234,7 +234,15 @@ object DataSkippingUtils extends Logging {
         getTargetIndexedColumnName(attrRef, indexSchema)
           .map { colName =>
             val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
-            hset.map(value => genColumnValuesEqualToExpression(colName, Literal(value), targetExprBuilder)).reduce(Or)
+            hset.map { value =>
+              // NOTE: [[Literal]] has a gap where it could hold [[UTF8String]], but [[Literal#apply]] doesn't
+              //       accept [[UTF8String]]. As such we have to handle it separately
+              val lit = value match {
+                case str: UTF8String => Literal(str.toString)
+                case _ => Literal(value)
+              }
+              genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)
+            }.reduce(Or)
           }
 
       // Filter "expr(colA) not in (B1, B2, ...)"
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index da3fd52e97..63db2f52fc 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -21,7 +21,11 @@ import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
 import org.apache.spark.sql.catalyst.expressions.{Expression, InSet, Not}
+import org.apache.spark.sql.catalyst.optimizer.OptimizeIn
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.functions.{col, lower}
 import org.apache.spark.sql.hudi.DataSkippingUtils
 import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
@@ -93,7 +97,8 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
     spark.sqlContext.setConf(SESSION_LOCAL_TIMEZONE.key, "UTC")
 
     val resolvedFilterExpr: Expression = resolveExpr(spark, sourceFilterExprStr, sourceTableSchema)
-    val rows: Seq[String] = applyFilterExpr(resolvedFilterExpr, input)
+    val optimizedExpr = optimize(resolvedFilterExpr)
+    val rows: Seq[String] = applyFilterExpr(optimizedExpr, input)
 
     assertEquals(expectedOutput, rows)
   }
@@ -113,7 +118,6 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
     assertEquals(expectedOutput, rows)
   }
 
-
   @ParameterizedTest
   @MethodSource(Array("testStringsLookupFilterExpressionsSource"))
   def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = {
@@ -134,6 +138,19 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
     assertEquals(output, rows)
   }
 
+
+  private def optimize(expr: Expression): Expression = {
+    val rules: Seq[Rule[LogicalPlan]] =
+      OptimizeIn ::
+        Nil
+
+    val plan: LogicalPlan = DummyExpressionHolder(Seq(expr))
+
+    rules.foldLeft(plan) {
+      case (plan, rule) => rule.apply(plan)
+    }.asInstanceOf[DummyExpressionHolder].exprs.head
+  }
+
   private def applyFilterExpr(resolvedExpr: Expression, input: Seq[IndexRow]): Seq[String] = {
     val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)
 
@@ -324,6 +341,14 @@ object TestDataSkippingUtils {
           IndexRow("file_3", valueCount = 1, -2, -1, 0)
         ),
         Seq("file_1", "file_2")),
+      arguments(
+        s"B in (${(0 to 10).map(i => s"'a$i'").mkString(",")})",
+        Seq(
+          IndexRow("file_1", valueCount = 1, B_minValue = "a0", B_maxValue = "a10", B_nullCount = 0),
+          IndexRow("file_2", valueCount = 1, B_minValue = "b0", B_maxValue = "b10", B_nullCount = 0),
+          IndexRow("file_3", valueCount = 1, B_minValue = "a10", B_maxValue = "b20", B_nullCount = 0)
+        ),
+        Seq("file_1", "file_3")),
       arguments(
         "A not in (0, 1)",
         Seq(
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala
new file mode 100644
index 0000000000..87dc8c0879
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.encoders
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+// A dummy logical plan that can hold expressions and go through optimizer rules.
+case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode {
+  override lazy val resolved = true
+  override def output: Seq[Attribute] = Nil
+}