You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/24 11:41:25 UTC
[flink-table-store] branch master updated: [FLINK-27897] SearchArgumentToPredicateConverter can now keep partial results and throw away unsupported filters
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a415eb7 [FLINK-27897] SearchArgumentToPredicateConverter can now keep partial results and throw away unsupported filters
a415eb7 is described below
commit a415eb7c910e9851db0a7e4efa40f815c62df8f4
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Jun 24 19:41:21 2022 +0800
[FLINK-27897] SearchArgumentToPredicateConverter can now keep partial results and throw away unsupported filters
This closes #174
---
.../store/SearchArgumentToPredicateConverter.java | 31 ++++++++++++++++------
.../SearchArgumentToPredicateConverterTest.java | 28 ++++++++++++++++++-
2 files changed, 50 insertions(+), 9 deletions(-)
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
index 4c4ef43..3fb28ff 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -63,15 +64,29 @@ public class SearchArgumentToPredicateConverter {
}
public Optional<Predicate> convert() {
- try {
- return Optional.of(convertTree(root));
- } catch (UnsupportedOperationException e) {
- LOG.warn(
- "Failed to convert predicate due to unsupported feature. "
- + "Filter will be processed by Hive instead.",
- e);
- return Optional.empty();
+ List<ExpressionTree> trees = new ArrayList<>();
+ if (root.getOperator() == ExpressionTree.Operator.AND) {
+ trees.addAll(root.getChildren());
+ } else {
+ trees.add(root);
}
+
+ List<Predicate> converted = new ArrayList<>();
+ for (ExpressionTree tree : trees) {
+ try {
+ converted.add(convertTree(tree));
+ } catch (UnsupportedOperationException e) {
+ LOG.warn(
+ "Failed to convert predicate "
+ + tree
+ + " due to unsupported feature. "
+ + "This part of filter will be processed by Hive instead.",
+ e);
+ }
+ }
+ return converted.isEmpty()
+ ? Optional.empty()
+ : Optional.of(PredicateBuilder.and(converted));
}
private Predicate convertTree(ExpressionTree tree) {
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
index ca576e0..af27005 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
@@ -105,7 +105,6 @@ public class SearchArgumentToPredicateConverterTest {
DataTypes.INT().getLogicalType(),
DataTypes.BIGINT().getLogicalType(),
DataTypes.DOUBLE().getLogicalType());
- private static final LogicalType BIGINT_TYPE = DataTypes.BIGINT().getLogicalType();
private static final PredicateBuilder BUILDER =
new PredicateBuilder(
RowType.of(
@@ -321,6 +320,33 @@ public class SearchArgumentToPredicateConverterTest {
assertExpected(sarg, expected);
}
+ @Test
+ public void testUnsupported() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument sarg =
+ builder.nullSafeEquals("f_bigint", PredicateLeaf.Type.LONG, 100L).build();
+ assertExpected(sarg, null);
+ }
+
+ @Test
+ public void testKeepPartialResult() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument sarg =
+ builder.startAnd()
+ .startNot()
+ .nullSafeEquals("f_bigint", PredicateLeaf.Type.LONG, 100L)
+ .end()
+ .lessThanEquals("f_bigint", PredicateLeaf.Type.LONG, 200L)
+ .startNot()
+ .lessThan("f_bigint", PredicateLeaf.Type.LONG, 0L)
+ .end()
+ .end()
+ .build();
+ Predicate expected =
+ PredicateBuilder.and(BUILDER.lessOrEqual(1, 200L), BUILDER.greaterOrEqual(1, 0L));
+ assertExpected(sarg, expected);
+ }
+
private void assertExpected(SearchArgument sarg, Predicate expected) {
SearchArgumentToPredicateConverter converter =
new SearchArgumentToPredicateConverter(sarg, COLUMN_NAMES, COLUMN_TYPES);