You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/08 00:34:43 UTC
[hudi] branch master updated: [HUDI-5657] Fix NPE if filters condition contains null literal when using column stats data skipping for flink (#7801)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dee020e18ce [HUDI-5657] Fix NPE if filters condition contains null literal when using column stats data skipping for flink (#7801)
dee020e18ce is described below
commit dee020e18ce4ae719f18dab608f2ad65a96b9e3c
Author: Jing Zhang <be...@gmail.com>
AuthorDate: Wed Feb 8 08:34:37 2023 +0800
[HUDI-5657] Fix NPE if filters condition contains null literal when using column stats data skipping for flink (#7801)
- Fix NPE if filters condition contains null literal when using column stats data skipping for flink
---
.../hudi/source/stats/ExpressionEvaluator.java | 107 +++++++++++++++------
.../hudi/source/stats/TestExpressionEvaluator.java | 44 +++++++--
.../apache/hudi/adapter/TestCallExpressions.java | 44 +++++++++
.../apache/hudi/adapter/TestCallExpressions.java | 44 +++++++++
.../apache/hudi/adapter/TestCallExpressions.java | 39 ++++++++
.../apache/hudi/adapter/TestCallExpressions.java | 39 ++++++++
6 files changed, 281 insertions(+), 36 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
index 08ded144e0a..d1bb01126a0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
@@ -40,6 +40,7 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
/**
* Tool to evaluate the {@link org.apache.flink.table.expressions.ResolvedExpression}s.
@@ -118,16 +119,6 @@ public class ExpressionEvaluator {
return ((Or) evaluator).bindEvaluator(evaluator1, evaluator2);
}
- // handle IN specifically
- if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
- ValidationUtils.checkState(normalized, "The IN expression expects to be normalized");
- evaluator = In.getInstance();
- FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
- evaluator.bindFieldReference(rExpr);
- ((In) evaluator).bindVals(getInLiteralVals(childExprs));
- return evaluator.bindColStats(indexRow, queryFields, rExpr);
- }
-
// handle unary operators
if (BuiltInFunctionDefinitions.IS_NULL.equals(funDef)) {
FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
@@ -141,6 +132,25 @@ public class ExpressionEvaluator {
.bindColStats(indexRow, queryFields, rExpr);
}
+ boolean hasNullLiteral =
+ childExprs.stream().anyMatch(e ->
+ e instanceof ValueLiteralExpression
+ && ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) e) == null);
+ if (hasNullLiteral) {
+ evaluator = AlwaysFalse.getInstance();
+ return evaluator;
+ }
+
+ // handle IN specifically
+ if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
+ ValidationUtils.checkState(normalized, "The IN expression expects to be normalized");
+ evaluator = In.getInstance();
+ FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
+ evaluator.bindFieldReference(rExpr);
+ ((In) evaluator).bindVals(getInLiteralVals(childExprs));
+ return evaluator.bindColStats(indexRow, queryFields, rExpr);
+ }
+
// handle binary operators
if (BuiltInFunctionDefinitions.EQUALS.equals(funDef)) {
evaluator = EqualTo.getInstance();
@@ -206,37 +216,51 @@ public class ExpressionEvaluator {
public abstract boolean eval();
}
+ public abstract static class NullFalseEvaluator extends Evaluator {
+
+ @Override
+ public final boolean eval() {
+ if (this.val == null) {
+ return false;
+ } else {
+ return eval(this.val);
+ }
+ }
+
+ protected abstract boolean eval(@NotNull Object val);
+ }
+
/**
* To evaluate = expr.
*/
- public static class EqualTo extends Evaluator {
+ public static class EqualTo extends NullFalseEvaluator {
public static EqualTo getInstance() {
return new EqualTo();
}
@Override
- public boolean eval() {
- if (this.minVal == null || this.maxVal == null || this.val == null) {
+ protected boolean eval(@NotNull Object val) {
+ if (this.minVal == null || this.maxVal == null) {
return false;
}
- if (compare(this.minVal, this.val, this.type) > 0) {
+ if (compare(this.minVal, val, this.type) > 0) {
return false;
}
- return compare(this.maxVal, this.val, this.type) >= 0;
+ return compare(this.maxVal, val, this.type) >= 0;
}
}
/**
* To evaluate <> expr.
*/
- public static class NotEqualTo extends Evaluator {
+ public static class NotEqualTo extends NullFalseEvaluator {
public static NotEqualTo getInstance() {
return new NotEqualTo();
}
@Override
- public boolean eval() {
+ protected boolean eval(@NotNull Object val) {
// because the bounds are not necessarily a min or max value, this cannot be answered using
// them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
return true;
@@ -275,68 +299,68 @@ public class ExpressionEvaluator {
/**
* To evaluate < expr.
*/
- public static class LessThan extends Evaluator {
+ public static class LessThan extends NullFalseEvaluator {
public static LessThan getInstance() {
return new LessThan();
}
@Override
- public boolean eval() {
+ public boolean eval(@NotNull Object val) {
if (this.minVal == null) {
return false;
}
- return compare(this.minVal, this.val, this.type) < 0;
+ return compare(this.minVal, val, this.type) < 0;
}
}
/**
* To evaluate > expr.
*/
- public static class GreaterThan extends Evaluator {
+ public static class GreaterThan extends NullFalseEvaluator {
public static GreaterThan getInstance() {
return new GreaterThan();
}
@Override
- public boolean eval() {
+ protected boolean eval(@NotNull Object val) {
if (this.maxVal == null) {
return false;
}
- return compare(this.maxVal, this.val, this.type) > 0;
+ return compare(this.maxVal, val, this.type) > 0;
}
}
/**
* To evaluate <= expr.
*/
- public static class LessThanOrEqual extends Evaluator {
+ public static class LessThanOrEqual extends NullFalseEvaluator {
public static LessThanOrEqual getInstance() {
return new LessThanOrEqual();
}
@Override
- public boolean eval() {
+ protected boolean eval(@NotNull Object val) {
if (this.minVal == null) {
return false;
}
- return compare(this.minVal, this.val, this.type) <= 0;
+ return compare(this.minVal, val, this.type) <= 0;
}
}
/**
* To evaluate >= expr.
*/
- public static class GreaterThanOrEqual extends Evaluator {
+ public static class GreaterThanOrEqual extends NullFalseEvaluator {
public static GreaterThanOrEqual getInstance() {
return new GreaterThanOrEqual();
}
@Override
- public boolean eval() {
+ protected boolean eval(@NotNull Object val) {
if (this.maxVal == null) {
return false;
}
- return compare(this.maxVal, this.val, this.type) >= 0;
+ return compare(this.maxVal, val, this.type) >= 0;
}
}
@@ -352,6 +376,9 @@ public class ExpressionEvaluator {
@Override
public boolean eval() {
+ if (Arrays.stream(vals).anyMatch(Objects::isNull)) {
+ return false;
+ }
if (this.minVal == null) {
return false; // values are all null and literalSet cannot contain null.
}
@@ -379,6 +406,28 @@ public class ExpressionEvaluator {
}
}
+ // A special evaluator which is not possible to match any condition
+ public static class AlwaysFalse extends Evaluator {
+
+ public static AlwaysFalse getInstance() {
+ return new AlwaysFalse();
+ }
+
+ @Override
+ public Evaluator bindColStats(
+ RowData indexRow,
+ RowType.RowField[] queryFields,
+ FieldReferenceExpression expr) {
+ // this no need to do anything
+ return this;
+ }
+
+ @Override
+ public boolean eval() {
+ return false;
+ }
+ }
+
// component predicate
/**
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
index 4ad286b780a..cb65c60ac40 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
@@ -18,14 +18,18 @@
package org.apache.hudi.source.stats;
+import org.apache.hudi.adapter.TestCallExpressions;
import org.apache.hudi.utils.TestData;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
@@ -33,6 +37,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
+import static org.apache.hudi.source.stats.ExpressionEvaluator.Evaluator.bindCall;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -104,7 +109,7 @@ public class TestExpressionEvaluator {
assertFalse(equalTo.eval(), "12 <> null");
equalTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(equalTo.eval(), "null <> null");
+ assertFalse(equalTo.eval(), "It is not possible to test for NULL values with '=' operator");
}
@Test
@@ -140,7 +145,7 @@ public class TestExpressionEvaluator {
assertTrue(notEqualTo.eval(), "12 <> null");
notEqualTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertTrue(notEqualTo.eval(), "null <> null");
+ assertFalse(notEqualTo.eval(), "It is not possible to test for NULL values with '<>' operator");
}
@Test
@@ -206,7 +211,7 @@ public class TestExpressionEvaluator {
assertFalse(lessThan.eval(), "12 <> null");
lessThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(lessThan.eval(), "null <> null");
+ assertFalse(lessThan.eval(), "It is not possible to test for NULL values with '<' operator");
}
@Test
@@ -242,7 +247,7 @@ public class TestExpressionEvaluator {
assertFalse(greaterThan.eval(), "12 <> null");
greaterThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(greaterThan.eval(), "null <> null");
+ assertFalse(greaterThan.eval(), "It is not possible to test for NULL values with '>' operator");
}
@Test
@@ -278,7 +283,7 @@ public class TestExpressionEvaluator {
assertFalse(lessThanOrEqual.eval(), "12 <> null");
lessThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(lessThanOrEqual.eval(), "null <> null");
+ assertFalse(lessThanOrEqual.eval(), "It is not possible to test for NULL values with '<=' operator");
}
@Test
@@ -314,7 +319,7 @@ public class TestExpressionEvaluator {
assertFalse(greaterThanOrEqual.eval(), "12 <> null");
greaterThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(greaterThanOrEqual.eval(), "null <> null");
+ assertFalse(greaterThanOrEqual.eval(), "It is not possible to test for NULL values with '>=' operator");
}
@Test
@@ -349,7 +354,32 @@ public class TestExpressionEvaluator {
assertFalse(in.eval(), "12 <> null");
in.bindVals((Object) null);
- assertFalse(in.eval(), "null <> null");
+ assertFalse(in.eval(), "It is not possible to test for NULL values with 'in' operator");
+ }
+
+ @Test
+ void testAlwaysFalse() {
+ FieldReferenceExpression ref = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2);
+ ValueLiteralExpression nullLiteral = new ValueLiteralExpression(null, DataTypes.INT());
+ RowData indexRow = intIndexRow(11, 13);
+ RowType.RowField[] queryFields = queryFields(2);
+ BuiltInFunctionDefinition[] funDefs = new BuiltInFunctionDefinition[] {
+ BuiltInFunctionDefinitions.EQUALS,
+ BuiltInFunctionDefinitions.NOT_EQUALS,
+ BuiltInFunctionDefinitions.LESS_THAN,
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+ BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+ BuiltInFunctionDefinitions.IN};
+ for (BuiltInFunctionDefinition funDef : funDefs) {
+ CallExpression expr =
+ TestCallExpressions.permanent(
+ funDef,
+ Arrays.asList(ref, nullLiteral),
+ DataTypes.BOOLEAN());
+ // always return false if the literal value is null
+ assertFalse(bindCall(expr, indexRow, queryFields).eval());
+ }
}
private static RowData intIndexRow(Integer minVal, Integer maxVal) {
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
new file mode 100644
index 00000000000..6a0d0902042
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Call Expression creator for test goals.
+ */
+public class TestCallExpressions {
+
+ public static CallExpression permanent(
+ BuiltInFunctionDefinition builtInFunctionDefinition,
+ List<ResolvedExpression> args,
+ DataType dataType) {
+ return new CallExpression(
+ FunctionIdentifier.of(builtInFunctionDefinition.getName()),
+ builtInFunctionDefinition,
+ args,
+ dataType);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
new file mode 100644
index 00000000000..6a0d0902042
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Call Expression creator for test goals.
+ */
+public class TestCallExpressions {
+
+ public static CallExpression permanent(
+ BuiltInFunctionDefinition builtInFunctionDefinition,
+ List<ResolvedExpression> args,
+ DataType dataType) {
+ return new CallExpression(
+ FunctionIdentifier.of(builtInFunctionDefinition.getName()),
+ builtInFunctionDefinition,
+ args,
+ dataType);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
new file mode 100644
index 00000000000..1b02f2bafab
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Call Expression creator for test goals.
+ */
+public class TestCallExpressions {
+
+ public static CallExpression permanent(
+ BuiltInFunctionDefinition builtInFunctionDefinition,
+ List<ResolvedExpression> args,
+ DataType dataType) {
+ return CallExpression.permanent(builtInFunctionDefinition, args, dataType);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
new file mode 100644
index 00000000000..1b02f2bafab
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Call Expression creator for test goals.
+ */
+public class TestCallExpressions {
+
+ public static CallExpression permanent(
+ BuiltInFunctionDefinition builtInFunctionDefinition,
+ List<ResolvedExpression> args,
+ DataType dataType) {
+ return CallExpression.permanent(builtInFunctionDefinition, args, dataType);
+ }
+}