You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/22 22:43:52 UTC
[hudi] branch master updated: [HUDI-4851] Fixing handling of `UTF8String` w/in `InSet` operator (#6739)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0558b6cfd9 [HUDI-4851] Fixing handling of `UTF8String` w/in `InSet` operator (#6739)
0558b6cfd9 is described below
commit 0558b6cfd916c9b48f79adfa4e0cf2ce821d51ec
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Sep 22 15:43:45 2022 -0700
[HUDI-4851] Fixing handling of `UTF8String` w/in `InSet` operator (#6739)
Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
.../apache/spark/sql/hudi/DataSkippingUtils.scala | 12 +++++++--
.../org/apache/hudi/TestDataSkippingUtils.scala | 29 ++++++++++++++++++++--
.../catalyst/encoders/DummyExpressionHolder.scala | 29 ++++++++++++++++++++++
3 files changed, 66 insertions(+), 4 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index 0fe62da0de..4a3cf38895 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, InSet, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith, SubqueryExpression}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils}
import org.apache.spark.unsafe.types.UTF8String
@@ -234,7 +234,15 @@ object DataSkippingUtils extends Logging {
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
- hset.map(value => genColumnValuesEqualToExpression(colName, Literal(value), targetExprBuilder)).reduce(Or)
+ hset.map { value =>
+ // NOTE: [[Literal]] has a gap where it could hold [[UTF8String]], but [[Literal#apply]] doesn't
+ // accept [[UTF8String]]. As such we have to handle it separately
+ val lit = value match {
+ case str: UTF8String => Literal(str.toString)
+ case _ => Literal(value)
+ }
+ genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)
+ }.reduce(Or)
}
// Filter "expr(colA) not in (B1, B2, ...)"
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index da3fd52e97..63db2f52fc 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -21,7 +21,11 @@ import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
import org.apache.spark.sql.catalyst.expressions.{Expression, InSet, Not}
+import org.apache.spark.sql.catalyst.optimizer.OptimizeIn
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.functions.{col, lower}
import org.apache.spark.sql.hudi.DataSkippingUtils
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
@@ -93,7 +97,8 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
spark.sqlContext.setConf(SESSION_LOCAL_TIMEZONE.key, "UTC")
val resolvedFilterExpr: Expression = resolveExpr(spark, sourceFilterExprStr, sourceTableSchema)
- val rows: Seq[String] = applyFilterExpr(resolvedFilterExpr, input)
+ val optimizedExpr = optimize(resolvedFilterExpr)
+ val rows: Seq[String] = applyFilterExpr(optimizedExpr, input)
assertEquals(expectedOutput, rows)
}
@@ -113,7 +118,6 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
assertEquals(expectedOutput, rows)
}
-
@ParameterizedTest
@MethodSource(Array("testStringsLookupFilterExpressionsSource"))
def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = {
@@ -134,6 +138,19 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
assertEquals(output, rows)
}
+
+ private def optimize(expr: Expression): Expression = {
+ val rules: Seq[Rule[LogicalPlan]] =
+ OptimizeIn ::
+ Nil
+
+ val plan: LogicalPlan = DummyExpressionHolder(Seq(expr))
+
+ rules.foldLeft(plan) {
+ case (plan, rule) => rule.apply(plan)
+ }.asInstanceOf[DummyExpressionHolder].exprs.head
+ }
+
private def applyFilterExpr(resolvedExpr: Expression, input: Seq[IndexRow]): Seq[String] = {
val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)
@@ -324,6 +341,14 @@ object TestDataSkippingUtils {
IndexRow("file_3", valueCount = 1, -2, -1, 0)
),
Seq("file_1", "file_2")),
+ arguments(
+ s"B in (${(0 to 10).map(i => s"'a$i'").mkString(",")})",
+ Seq(
+ IndexRow("file_1", valueCount = 1, B_minValue = "a0", B_maxValue = "a10", B_nullCount = 0),
+ IndexRow("file_2", valueCount = 1, B_minValue = "b0", B_maxValue = "b10", B_nullCount = 0),
+ IndexRow("file_3", valueCount = 1, B_minValue = "a10", B_maxValue = "b20", B_nullCount = 0)
+ ),
+ Seq("file_1", "file_3")),
arguments(
"A not in (0, 1)",
Seq(
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala
new file mode 100644
index 0000000000..87dc8c0879
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.encoders
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+// A dummy logical plan that can hold expressions and go through optimizer rules.
+case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode {
+ override lazy val resolved = true
+ override def output: Seq[Attribute] = Nil
+}