You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/08 00:34:43 UTC

[hudi] branch master updated: [HUDI-5657] Fix NPE if filters condition contains null literal when using column stats data skipping for flink (#7801)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dee020e18ce [HUDI-5657] Fix NPE if filters condition contains null literal when using column stats data skipping for flink (#7801)
dee020e18ce is described below

commit dee020e18ce4ae719f18dab608f2ad65a96b9e3c
Author: Jing Zhang <be...@gmail.com>
AuthorDate: Wed Feb 8 08:34:37 2023 +0800

    [HUDI-5657] Fix NPE if filters condition contains null literal when using column stats data skipping for flink (#7801)
    
    -  Fix NPE if filters condition contains null literal when using column stats data skipping for flink
---
 .../hudi/source/stats/ExpressionEvaluator.java     | 107 +++++++++++++++------
 .../hudi/source/stats/TestExpressionEvaluator.java |  44 +++++++--
 .../apache/hudi/adapter/TestCallExpressions.java   |  44 +++++++++
 .../apache/hudi/adapter/TestCallExpressions.java   |  44 +++++++++
 .../apache/hudi/adapter/TestCallExpressions.java   |  39 ++++++++
 .../apache/hudi/adapter/TestCallExpressions.java   |  39 ++++++++
 6 files changed, 281 insertions(+), 36 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
index 08ded144e0a..d1bb01126a0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
@@ -40,6 +40,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Tool to evaluate the {@link org.apache.flink.table.expressions.ResolvedExpression}s.
@@ -118,16 +119,6 @@ public class ExpressionEvaluator {
         return ((Or) evaluator).bindEvaluator(evaluator1, evaluator2);
       }
 
-      // handle IN specifically
-      if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
-        ValidationUtils.checkState(normalized, "The IN expression expects to be normalized");
-        evaluator = In.getInstance();
-        FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
-        evaluator.bindFieldReference(rExpr);
-        ((In) evaluator).bindVals(getInLiteralVals(childExprs));
-        return evaluator.bindColStats(indexRow, queryFields, rExpr);
-      }
-
       // handle unary operators
       if (BuiltInFunctionDefinitions.IS_NULL.equals(funDef)) {
         FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
@@ -141,6 +132,25 @@ public class ExpressionEvaluator {
             .bindColStats(indexRow, queryFields, rExpr);
       }
 
+      boolean hasNullLiteral =
+          childExprs.stream().anyMatch(e ->
+             e instanceof ValueLiteralExpression
+                 && ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) e) == null);
+      if (hasNullLiteral) {
+        evaluator = AlwaysFalse.getInstance();
+        return evaluator;
+      }
+
+      // handle IN specifically
+      if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
+        ValidationUtils.checkState(normalized, "The IN expression expects to be normalized");
+        evaluator = In.getInstance();
+        FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
+        evaluator.bindFieldReference(rExpr);
+        ((In) evaluator).bindVals(getInLiteralVals(childExprs));
+        return evaluator.bindColStats(indexRow, queryFields, rExpr);
+      }
+
       // handle binary operators
       if (BuiltInFunctionDefinitions.EQUALS.equals(funDef)) {
         evaluator = EqualTo.getInstance();
@@ -206,37 +216,51 @@ public class ExpressionEvaluator {
     public abstract boolean eval();
   }
 
+  public abstract static class NullFalseEvaluator extends Evaluator {
+
+    @Override
+    public final boolean eval() {
+      if (this.val == null) {
+        return false;
+      } else {
+        return eval(this.val);
+      }
+    }
+
+    protected abstract boolean eval(@NotNull Object val);
+  }
+
   /**
    * To evaluate = expr.
    */
-  public static class EqualTo extends Evaluator {
+  public static class EqualTo extends NullFalseEvaluator {
 
     public static EqualTo getInstance() {
       return new EqualTo();
     }
 
     @Override
-    public boolean eval() {
-      if (this.minVal == null || this.maxVal == null || this.val == null) {
+    protected boolean eval(@NotNull Object val) {
+      if (this.minVal == null || this.maxVal == null) {
         return false;
       }
-      if (compare(this.minVal, this.val, this.type) > 0) {
+      if (compare(this.minVal, val, this.type) > 0) {
         return false;
       }
-      return compare(this.maxVal, this.val, this.type) >= 0;
+      return compare(this.maxVal, val, this.type) >= 0;
     }
   }
 
   /**
    * To evaluate <> expr.
    */
-  public static class NotEqualTo extends Evaluator {
+  public static class NotEqualTo extends NullFalseEvaluator {
     public static NotEqualTo getInstance() {
       return new NotEqualTo();
     }
 
     @Override
-    public boolean eval() {
+    protected boolean eval(@NotNull Object val) {
       // because the bounds are not necessarily a min or max value, this cannot be answered using
       // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
       return true;
@@ -275,68 +299,68 @@ public class ExpressionEvaluator {
   /**
    * To evaluate < expr.
    */
-  public static class LessThan extends Evaluator {
+  public static class LessThan extends NullFalseEvaluator {
     public static LessThan getInstance() {
       return new LessThan();
     }
 
     @Override
-    public boolean eval() {
+    public boolean eval(@NotNull Object val) {
       if (this.minVal == null) {
         return false;
       }
-      return compare(this.minVal, this.val, this.type) < 0;
+      return compare(this.minVal, val, this.type) < 0;
     }
   }
 
   /**
    * To evaluate > expr.
    */
-  public static class GreaterThan extends Evaluator {
+  public static class GreaterThan extends NullFalseEvaluator {
     public static GreaterThan getInstance() {
       return new GreaterThan();
     }
 
     @Override
-    public boolean eval() {
+    protected boolean eval(@NotNull Object val) {
       if (this.maxVal == null) {
         return false;
       }
-      return compare(this.maxVal, this.val, this.type) > 0;
+      return compare(this.maxVal, val, this.type) > 0;
     }
   }
 
   /**
    * To evaluate <= expr.
    */
-  public static class LessThanOrEqual extends Evaluator {
+  public static class LessThanOrEqual extends NullFalseEvaluator {
     public static LessThanOrEqual getInstance() {
       return new LessThanOrEqual();
     }
 
     @Override
-    public boolean eval() {
+    protected boolean eval(@NotNull Object val) {
       if (this.minVal == null) {
         return false;
       }
-      return compare(this.minVal, this.val, this.type) <= 0;
+      return compare(this.minVal, val, this.type) <= 0;
     }
   }
 
   /**
    * To evaluate >= expr.
    */
-  public static class GreaterThanOrEqual extends Evaluator {
+  public static class GreaterThanOrEqual extends NullFalseEvaluator {
     public static GreaterThanOrEqual getInstance() {
       return new GreaterThanOrEqual();
     }
 
     @Override
-    public boolean eval() {
+    protected boolean eval(@NotNull Object val) {
       if (this.maxVal == null) {
         return false;
       }
-      return compare(this.maxVal, this.val, this.type) >= 0;
+      return compare(this.maxVal, val, this.type) >= 0;
     }
   }
 
@@ -352,6 +376,9 @@ public class ExpressionEvaluator {
 
     @Override
     public boolean eval() {
+      if (Arrays.stream(vals).anyMatch(Objects::isNull)) {
+        return false;
+      }
       if (this.minVal == null) {
         return false; // values are all null and literalSet cannot contain null.
       }
@@ -379,6 +406,28 @@ public class ExpressionEvaluator {
     }
   }
 
+  // A special evaluator which is not possible to match any condition
+  public static class AlwaysFalse extends Evaluator {
+
+    public static AlwaysFalse getInstance() {
+      return new AlwaysFalse();
+    }
+
+    @Override
+    public Evaluator bindColStats(
+        RowData indexRow,
+        RowType.RowField[] queryFields,
+        FieldReferenceExpression expr) {
+      // this no need to do anything
+      return this;
+    }
+
+    @Override
+    public boolean eval() {
+      return false;
+    }
+  }
+
   // component predicate
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
index 4ad286b780a..cb65c60ac40 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
@@ -18,14 +18,18 @@
 
 package org.apache.hudi.source.stats;
 
+import org.apache.hudi.adapter.TestCallExpressions;
 import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.junit.jupiter.api.Test;
@@ -33,6 +37,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hudi.source.stats.ExpressionEvaluator.Evaluator.bindCall;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -104,7 +109,7 @@ public class TestExpressionEvaluator {
     assertFalse(equalTo.eval(), "12 <> null");
 
     equalTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(equalTo.eval(), "null <> null");
+    assertFalse(equalTo.eval(), "It is not possible to test for NULL values with '=' operator");
   }
 
   @Test
@@ -140,7 +145,7 @@ public class TestExpressionEvaluator {
     assertTrue(notEqualTo.eval(), "12 <> null");
 
     notEqualTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertTrue(notEqualTo.eval(), "null <> null");
+    assertFalse(notEqualTo.eval(), "It is not possible to test for NULL values with '<>' operator");
   }
 
   @Test
@@ -206,7 +211,7 @@ public class TestExpressionEvaluator {
     assertFalse(lessThan.eval(), "12 <> null");
 
     lessThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(lessThan.eval(), "null <> null");
+    assertFalse(lessThan.eval(), "It is not possible to test for NULL values with '<' operator");
   }
 
   @Test
@@ -242,7 +247,7 @@ public class TestExpressionEvaluator {
     assertFalse(greaterThan.eval(), "12 <> null");
 
     greaterThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(greaterThan.eval(), "null <> null");
+    assertFalse(greaterThan.eval(), "It is not possible to test for NULL values with '>' operator");
   }
 
   @Test
@@ -278,7 +283,7 @@ public class TestExpressionEvaluator {
     assertFalse(lessThanOrEqual.eval(), "12 <> null");
 
     lessThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(lessThanOrEqual.eval(), "null <> null");
+    assertFalse(lessThanOrEqual.eval(), "It is not possible to test for NULL values with '<=' operator");
   }
 
   @Test
@@ -314,7 +319,7 @@ public class TestExpressionEvaluator {
     assertFalse(greaterThanOrEqual.eval(), "12 <> null");
 
     greaterThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(greaterThanOrEqual.eval(), "null <> null");
+    assertFalse(greaterThanOrEqual.eval(), "It is not possible to test for NULL values with '>=' operator");
   }
 
   @Test
@@ -349,7 +354,32 @@ public class TestExpressionEvaluator {
     assertFalse(in.eval(), "12 <> null");
 
     in.bindVals((Object) null);
-    assertFalse(in.eval(), "null <> null");
+    assertFalse(in.eval(), "It is not possible to test for NULL values with 'in' operator");
+  }
+
+  @Test
+  void testAlwaysFalse() {
+    FieldReferenceExpression ref = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2);
+    ValueLiteralExpression nullLiteral = new ValueLiteralExpression(null, DataTypes.INT());
+    RowData indexRow = intIndexRow(11, 13);
+    RowType.RowField[] queryFields = queryFields(2);
+    BuiltInFunctionDefinition[] funDefs = new BuiltInFunctionDefinition[] {
+        BuiltInFunctionDefinitions.EQUALS,
+        BuiltInFunctionDefinitions.NOT_EQUALS,
+        BuiltInFunctionDefinitions.LESS_THAN,
+        BuiltInFunctionDefinitions.GREATER_THAN,
+        BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+        BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+        BuiltInFunctionDefinitions.IN};
+    for (BuiltInFunctionDefinition funDef : funDefs) {
+      CallExpression expr =
+          TestCallExpressions.permanent(
+              funDef,
+              Arrays.asList(ref, nullLiteral),
+              DataTypes.BOOLEAN());
+      // always return false if the literal value is null
+      assertFalse(bindCall(expr, indexRow, queryFields).eval());
+    }
   }
 
   private static RowData intIndexRow(Integer minVal, Integer maxVal) {
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
new file mode 100644
index 00000000000..6a0d0902042
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Call Expression creator for test goals.
+ */
+public class TestCallExpressions {
+
+  public static CallExpression permanent(
+      BuiltInFunctionDefinition builtInFunctionDefinition,
+      List<ResolvedExpression> args,
+      DataType dataType) {
+    return new CallExpression(
+        FunctionIdentifier.of(builtInFunctionDefinition.getName()),
+        builtInFunctionDefinition,
+        args,
+        dataType);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
new file mode 100644
index 00000000000..6a0d0902042
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Call Expression creator for test goals.
+ */
+public class TestCallExpressions {
+
+  public static CallExpression permanent(
+      BuiltInFunctionDefinition builtInFunctionDefinition,
+      List<ResolvedExpression> args,
+      DataType dataType) {
+    return new CallExpression(
+        FunctionIdentifier.of(builtInFunctionDefinition.getName()),
+        builtInFunctionDefinition,
+        args,
+        dataType);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
new file mode 100644
index 00000000000..1b02f2bafab
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Call Expression creator for test goals.
+ */
+public class TestCallExpressions {
+
+  public static CallExpression permanent(
+      BuiltInFunctionDefinition builtInFunctionDefinition,
+      List<ResolvedExpression> args,
+      DataType dataType) {
+    return CallExpression.permanent(builtInFunctionDefinition, args, dataType);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
new file mode 100644
index 00000000000..1b02f2bafab
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestCallExpressions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+import java.util.List;
+
+/**
+ * Call Expression creator for test goals.
+ */
+public class TestCallExpressions {
+
+  public static CallExpression permanent(
+      BuiltInFunctionDefinition builtInFunctionDefinition,
+      List<ResolvedExpression> args,
+      DataType dataType) {
+    return CallExpression.permanent(builtInFunctionDefinition, args, dataType);
+  }
+}