You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/24 11:41:25 UTC

[flink-table-store] branch master updated: [FLINK-27897] SearchArgumentToPredicateConverter can now keep partial results and throw away unsupported filters

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a415eb7  [FLINK-27897] SearchArgumentToPredicateConverter can now keep partial results and throw away unsupported filters
a415eb7 is described below

commit a415eb7c910e9851db0a7e4efa40f815c62df8f4
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Jun 24 19:41:21 2022 +0800

    [FLINK-27897] SearchArgumentToPredicateConverter can now keep partial results and throw away unsupported filters
    
    This closes #174
---
 .../store/SearchArgumentToPredicateConverter.java  | 31 ++++++++++++++++------
 .../SearchArgumentToPredicateConverterTest.java    | 28 ++++++++++++++++++-
 2 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
index 4c4ef43..3fb28ff 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -63,15 +64,29 @@ public class SearchArgumentToPredicateConverter {
     }
 
     public Optional<Predicate> convert() {
-        try {
-            return Optional.of(convertTree(root));
-        } catch (UnsupportedOperationException e) {
-            LOG.warn(
-                    "Failed to convert predicate due to unsupported feature. "
-                            + "Filter will be processed by Hive instead.",
-                    e);
-            return Optional.empty();
+        List<ExpressionTree> trees = new ArrayList<>();
+        if (root.getOperator() == ExpressionTree.Operator.AND) {
+            trees.addAll(root.getChildren());
+        } else {
+            trees.add(root);
         }
+
+        List<Predicate> converted = new ArrayList<>();
+        for (ExpressionTree tree : trees) {
+            try {
+                converted.add(convertTree(tree));
+            } catch (UnsupportedOperationException e) {
+                LOG.warn(
+                        "Failed to convert predicate "
+                                + tree
+                                + "  due to unsupported feature. "
+                                + "This part of filter will be processed by Hive instead.",
+                        e);
+            }
+        }
+        return converted.isEmpty()
+                ? Optional.empty()
+                : Optional.of(PredicateBuilder.and(converted));
     }
 
     private Predicate convertTree(ExpressionTree tree) {
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
index ca576e0..af27005 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
@@ -105,7 +105,6 @@ public class SearchArgumentToPredicateConverterTest {
                     DataTypes.INT().getLogicalType(),
                     DataTypes.BIGINT().getLogicalType(),
                     DataTypes.DOUBLE().getLogicalType());
-    private static final LogicalType BIGINT_TYPE = DataTypes.BIGINT().getLogicalType();
     private static final PredicateBuilder BUILDER =
             new PredicateBuilder(
                     RowType.of(
@@ -321,6 +320,33 @@ public class SearchArgumentToPredicateConverterTest {
         assertExpected(sarg, expected);
     }
 
+    @Test
+    public void testUnsupported() {
+        SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+        SearchArgument sarg =
+                builder.nullSafeEquals("f_bigint", PredicateLeaf.Type.LONG, 100L).build();
+        assertExpected(sarg, null);
+    }
+
+    @Test
+    public void testKeepPartialResult() {
+        SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+        SearchArgument sarg =
+                builder.startAnd()
+                        .startNot()
+                        .nullSafeEquals("f_bigint", PredicateLeaf.Type.LONG, 100L)
+                        .end()
+                        .lessThanEquals("f_bigint", PredicateLeaf.Type.LONG, 200L)
+                        .startNot()
+                        .lessThan("f_bigint", PredicateLeaf.Type.LONG, 0L)
+                        .end()
+                        .end()
+                        .build();
+        Predicate expected =
+                PredicateBuilder.and(BUILDER.lessOrEqual(1, 200L), BUILDER.greaterOrEqual(1, 0L));
+        assertExpected(sarg, expected);
+    }
+
     private void assertExpected(SearchArgument sarg, Predicate expected) {
         SearchArgumentToPredicateConverter converter =
                 new SearchArgumentToPredicateConverter(sarg, COLUMN_NAMES, COLUMN_TYPES);