You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/12/02 13:57:49 UTC
[flink-table-store] branch master updated: [FLINK-30272] Introduce a Predicate Visitor
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 69ebaa2a [FLINK-30272] Introduce a Predicate Visitor
69ebaa2a is described below
commit 69ebaa2a861da54dd21e62778a9e9eab73f72c18
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Dec 2 21:57:44 2022 +0800
[FLINK-30272] Introduce a Predicate Visitor
This closes #417
---
.../store/file/predicate/CompoundPredicate.java | 5 ++
.../table/store/file/predicate/LeafPredicate.java | 5 ++
.../table/store/file/predicate/Predicate.java | 2 +
...Predicate.java => PredicateReplaceVisitor.java} | 39 ++++++------
.../{Predicate.java => PredicateVisitor.java} | 27 ++------
.../store/file/schema/SchemaEvolutionUtil.java | 71 ++++++++--------------
6 files changed, 57 insertions(+), 92 deletions(-)
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
index 042b269b..d707a6d5 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
@@ -62,6 +62,11 @@ public class CompoundPredicate implements Predicate {
return function.negate(children);
}
+ @Override
+ public <T> T visit(PredicateVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof CompoundPredicate)) {
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 9ffdaf19..0f89e579 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -103,6 +103,11 @@ public class LeafPredicate implements Predicate {
.map(negate -> new LeafPredicate(negate, type, fieldIndex, fieldName, literals));
}
+ @Override
+ public <T> T visit(PredicateVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
index 3f6748d8..70f8906f 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
@@ -43,4 +43,6 @@ public interface Predicate extends Serializable {
/** @return the negation predicate of this predicate if possible. */
Optional<Predicate> negate();
+
+ <T> T visit(PredicateVisitor<T> visitor);
}
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateReplaceVisitor.java
similarity index 52%
copy from flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
copy to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateReplaceVisitor.java
index 3f6748d8..6277d17e 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateReplaceVisitor.java
@@ -18,29 +18,24 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.format.FieldStats;
-
-import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
-/** Predicate which returns Boolean and provides testing by stats. */
-public interface Predicate extends Serializable {
-
- /**
- * Test based on the specific input column values.
- *
- * @return return true when hit, false when not hit.
- */
- boolean test(Object[] values);
-
- /**
- * Test based on the statistical information to determine whether a hit is possible.
- *
- * @return return true is likely to hit (there may also be false positives), return false is
- * absolutely not possible to hit.
- */
- boolean test(long rowCount, FieldStats[] fieldStats);
+/** A {@link PredicateVisitor} to replace {@link Predicate}. */
+public interface PredicateReplaceVisitor extends PredicateVisitor<Optional<Predicate>> {
- /** @return the negation predicate of this predicate if possible. */
- Optional<Predicate> negate();
+ @Override
+ default Optional<Predicate> visit(CompoundPredicate predicate) {
+ List<Predicate> converted = new ArrayList<>();
+ for (Predicate child : predicate.children()) {
+ Optional<Predicate> optional = child.visit(this);
+ if (optional.isPresent()) {
+ converted.add(optional.get());
+ } else {
+ return Optional.empty();
+ }
+ }
+ return Optional.of(new CompoundPredicate(predicate.function(), converted));
+ }
}
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateVisitor.java
similarity index 51%
copy from flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
copy to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateVisitor.java
index 3f6748d8..6663f19a 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateVisitor.java
@@ -18,29 +18,10 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.format.FieldStats;
+/** A visitor to visit {@link Predicate}. */
+public interface PredicateVisitor<T> {
-import java.io.Serializable;
-import java.util.Optional;
+ T visit(LeafPredicate predicate);
-/** Predicate which returns Boolean and provides testing by stats. */
-public interface Predicate extends Serializable {
-
- /**
- * Test based on the specific input column values.
- *
- * @return return true when hit, false when not hit.
- */
- boolean test(Object[] values);
-
- /**
- * Test based on the statistical information to determine whether a hit is possible.
- *
- * @return return true is likely to hit (there may also be false positives), return false is
- * absolutely not possible to hit.
- */
- boolean test(long rowCount, FieldStats[] fieldStats);
-
- /** @return the negation predicate of this predicate if possible. */
- Optional<Predicate> negate();
+ T visit(CompoundPredicate predicate);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
index 53a523ee..820ce6d4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
@@ -20,9 +20,9 @@ package org.apache.flink.table.store.file.schema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.predicate.CompoundPredicate;
import org.apache.flink.table.store.file.predicate.LeafPredicate;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateReplaceVisitor;
import org.apache.flink.table.store.utils.ProjectedRowData;
import javax.annotation.Nullable;
@@ -264,55 +264,32 @@ public class SchemaEvolutionUtil {
LinkedHashMap<Integer, DataField> idToDataFields = new LinkedHashMap<>();
dataFields.forEach(f -> idToDataFields.put(f.id(), f));
List<Predicate> dataFilters = new ArrayList<>(filters.size());
- for (Predicate predicate : filters) {
- createDataPredicate(nameToTableFields, idToDataFields, predicate)
- .ifPresent(dataFilters::add);
- }
- return dataFilters;
- }
- private static Optional<Predicate> createDataPredicate(
- Map<String, DataField> tableFields,
- LinkedHashMap<Integer, DataField> dataFields,
- Predicate predicate) {
- if (predicate instanceof CompoundPredicate) {
- CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
- List<Predicate> children = compoundPredicate.children();
- List<Predicate> dataChildren = new ArrayList<>(children.size());
- for (Predicate child : children) {
- Optional<Predicate> childPredicate =
- createDataPredicate(tableFields, dataFields, child);
- if (childPredicate.isPresent()) {
- dataChildren.add(childPredicate.get());
- } else {
- return Optional.empty();
- }
- }
- return Optional.of(new CompoundPredicate(compoundPredicate.function(), dataChildren));
- } else if (predicate instanceof LeafPredicate) {
- LeafPredicate leafPredicate = (LeafPredicate) predicate;
- DataField tableField =
- checkNotNull(
- tableFields.get(leafPredicate.fieldName()),
- String.format("Find no field %s", leafPredicate.fieldName()));
- DataField dataField = dataFields.get(tableField.id());
- if (dataField == null) {
- return Optional.empty();
- }
+ PredicateReplaceVisitor visitor =
+ predicate -> {
+ DataField tableField =
+ checkNotNull(
+ nameToTableFields.get(predicate.fieldName()),
+ String.format("Find no field %s", predicate.fieldName()));
+ DataField dataField = idToDataFields.get(tableField.id());
+ if (dataField == null) {
+ return Optional.empty();
+ }
+
+ /// TODO Should deal with column type schema evolution here
+ return Optional.of(
+ new LeafPredicate(
+ predicate.function(),
+ predicate.type(),
+ indexOf(dataField, idToDataFields),
+ dataField.name(),
+ predicate.literals()));
+ };
- /// TODO Should deal with column type schema evolution here
- return Optional.of(
- new LeafPredicate(
- leafPredicate.function(),
- leafPredicate.type(),
- indexOf(dataField, dataFields),
- dataField.name(),
- leafPredicate.literals()));
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "Not support to create data predicate from %s", predicate.getClass()));
+ for (Predicate predicate : filters) {
+ predicate.visit(visitor).ifPresent(dataFilters::add);
}
+ return dataFilters;
}
private static int indexOf(DataField dataField, LinkedHashMap<Integer, DataField> dataFields) {