You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/12/02 13:57:49 UTC

[flink-table-store] branch master updated: [FLINK-30272] Introduce a Predicate Visitor

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 69ebaa2a [FLINK-30272] Introduce a Predicate Visitor
69ebaa2a is described below

commit 69ebaa2a861da54dd21e62778a9e9eab73f72c18
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Dec 2 21:57:44 2022 +0800

    [FLINK-30272] Introduce a Predicate Visitor
    
    This closes #417
---
 .../store/file/predicate/CompoundPredicate.java    |  5 ++
 .../table/store/file/predicate/LeafPredicate.java  |  5 ++
 .../table/store/file/predicate/Predicate.java      |  2 +
 ...Predicate.java => PredicateReplaceVisitor.java} | 39 ++++++------
 .../{Predicate.java => PredicateVisitor.java}      | 27 ++------
 .../store/file/schema/SchemaEvolutionUtil.java     | 71 ++++++++--------------
 6 files changed, 57 insertions(+), 92 deletions(-)

diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
index 042b269b..d707a6d5 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
@@ -62,6 +62,11 @@ public class CompoundPredicate implements Predicate {
         return function.negate(children);
     }
 
+    @Override
+    public <T> T visit(PredicateVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof CompoundPredicate)) {
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 9ffdaf19..0f89e579 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -103,6 +103,11 @@ public class LeafPredicate implements Predicate {
                 .map(negate -> new LeafPredicate(negate, type, fieldIndex, fieldName, literals));
     }
 
+    @Override
+    public <T> T visit(PredicateVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
index 3f6748d8..70f8906f 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
@@ -43,4 +43,6 @@ public interface Predicate extends Serializable {
 
     /** @return the negation predicate of this predicate if possible. */
     Optional<Predicate> negate();
+
+    <T> T visit(PredicateVisitor<T> visitor);
 }
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateReplaceVisitor.java
similarity index 52%
copy from flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
copy to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateReplaceVisitor.java
index 3f6748d8..6277d17e 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateReplaceVisitor.java
@@ -18,29 +18,24 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.format.FieldStats;
-
-import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 
-/** Predicate which returns Boolean and provides testing by stats. */
-public interface Predicate extends Serializable {
-
-    /**
-     * Test based on the specific input column values.
-     *
-     * @return return true when hit, false when not hit.
-     */
-    boolean test(Object[] values);
-
-    /**
-     * Test based on the statistical information to determine whether a hit is possible.
-     *
-     * @return return true is likely to hit (there may also be false positives), return false is
-     *     absolutely not possible to hit.
-     */
-    boolean test(long rowCount, FieldStats[] fieldStats);
+/** A {@link PredicateVisitor} to replace {@link Predicate}. */
+public interface PredicateReplaceVisitor extends PredicateVisitor<Optional<Predicate>> {
 
-    /** @return the negation predicate of this predicate if possible. */
-    Optional<Predicate> negate();
+    @Override
+    default Optional<Predicate> visit(CompoundPredicate predicate) {
+        List<Predicate> converted = new ArrayList<>();
+        for (Predicate child : predicate.children()) {
+            Optional<Predicate> optional = child.visit(this);
+            if (optional.isPresent()) {
+                converted.add(optional.get());
+            } else {
+                return Optional.empty();
+            }
+        }
+        return Optional.of(new CompoundPredicate(predicate.function(), converted));
+    }
 }
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateVisitor.java
similarity index 51%
copy from flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
copy to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateVisitor.java
index 3f6748d8..6663f19a 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateVisitor.java
@@ -18,29 +18,10 @@
 
 package org.apache.flink.table.store.file.predicate;
 
-import org.apache.flink.table.store.format.FieldStats;
+/** A visitor to visit {@link Predicate}. */
+public interface PredicateVisitor<T> {
 
-import java.io.Serializable;
-import java.util.Optional;
+    T visit(LeafPredicate predicate);
 
-/** Predicate which returns Boolean and provides testing by stats. */
-public interface Predicate extends Serializable {
-
-    /**
-     * Test based on the specific input column values.
-     *
-     * @return return true when hit, false when not hit.
-     */
-    boolean test(Object[] values);
-
-    /**
-     * Test based on the statistical information to determine whether a hit is possible.
-     *
-     * @return return true is likely to hit (there may also be false positives), return false is
-     *     absolutely not possible to hit.
-     */
-    boolean test(long rowCount, FieldStats[] fieldStats);
-
-    /** @return the negation predicate of this predicate if possible. */
-    Optional<Predicate> negate();
+    T visit(CompoundPredicate predicate);
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
index 53a523ee..820ce6d4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
@@ -20,9 +20,9 @@ package org.apache.flink.table.store.file.schema;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.predicate.CompoundPredicate;
 import org.apache.flink.table.store.file.predicate.LeafPredicate;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateReplaceVisitor;
 import org.apache.flink.table.store.utils.ProjectedRowData;
 
 import javax.annotation.Nullable;
@@ -264,55 +264,32 @@ public class SchemaEvolutionUtil {
         LinkedHashMap<Integer, DataField> idToDataFields = new LinkedHashMap<>();
         dataFields.forEach(f -> idToDataFields.put(f.id(), f));
         List<Predicate> dataFilters = new ArrayList<>(filters.size());
-        for (Predicate predicate : filters) {
-            createDataPredicate(nameToTableFields, idToDataFields, predicate)
-                    .ifPresent(dataFilters::add);
-        }
-        return dataFilters;
-    }
 
-    private static Optional<Predicate> createDataPredicate(
-            Map<String, DataField> tableFields,
-            LinkedHashMap<Integer, DataField> dataFields,
-            Predicate predicate) {
-        if (predicate instanceof CompoundPredicate) {
-            CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
-            List<Predicate> children = compoundPredicate.children();
-            List<Predicate> dataChildren = new ArrayList<>(children.size());
-            for (Predicate child : children) {
-                Optional<Predicate> childPredicate =
-                        createDataPredicate(tableFields, dataFields, child);
-                if (childPredicate.isPresent()) {
-                    dataChildren.add(childPredicate.get());
-                } else {
-                    return Optional.empty();
-                }
-            }
-            return Optional.of(new CompoundPredicate(compoundPredicate.function(), dataChildren));
-        } else if (predicate instanceof LeafPredicate) {
-            LeafPredicate leafPredicate = (LeafPredicate) predicate;
-            DataField tableField =
-                    checkNotNull(
-                            tableFields.get(leafPredicate.fieldName()),
-                            String.format("Find no field %s", leafPredicate.fieldName()));
-            DataField dataField = dataFields.get(tableField.id());
-            if (dataField == null) {
-                return Optional.empty();
-            }
+        PredicateReplaceVisitor visitor =
+                predicate -> {
+                    DataField tableField =
+                            checkNotNull(
+                                    nameToTableFields.get(predicate.fieldName()),
+                                    String.format("Find no field %s", predicate.fieldName()));
+                    DataField dataField = idToDataFields.get(tableField.id());
+                    if (dataField == null) {
+                        return Optional.empty();
+                    }
+
+                    /// TODO Should deal with column type schema evolution here
+                    return Optional.of(
+                            new LeafPredicate(
+                                    predicate.function(),
+                                    predicate.type(),
+                                    indexOf(dataField, idToDataFields),
+                                    dataField.name(),
+                                    predicate.literals()));
+                };
 
-            /// TODO Should deal with column type schema evolution here
-            return Optional.of(
-                    new LeafPredicate(
-                            leafPredicate.function(),
-                            leafPredicate.type(),
-                            indexOf(dataField, dataFields),
-                            dataField.name(),
-                            leafPredicate.literals()));
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "Not support to create data predicate from %s", predicate.getClass()));
+        for (Predicate predicate : filters) {
+            predicate.visit(visitor).ifPresent(dataFilters::add);
         }
+        return dataFilters;
     }
 
     private static int indexOf(DataField dataField, LinkedHashMap<Integer, DataField> dataFields) {