You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/04 04:26:44 UTC
[flink-table-store] branch master updated: [FLINK-28063] Introduce dedicated In and NotIn predicate function
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new d11a287c [FLINK-28063] Introduce dedicated In and NotIn predicate function
d11a287c is described below
commit d11a287c756713c6b16a6c93291566a780f97b4a
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Jul 4 12:26:40 2022 +0800
[FLINK-28063] Introduce dedicated In and NotIn predicate function
This closes #189
---
.../flink/table/store/file/predicate/In.java | 71 ++
.../flink/table/store/file/predicate/NotIn.java | 71 ++
.../store/file/predicate/PredicateBuilder.java | 12 +-
.../store/file/predicate/PredicateConverter.java | 9 +
.../store/file/predicate/PredicateBuilderTest.java | 70 --
.../file/predicate/PredicateConverterTest.java | 553 +++++++++++++++-
.../table/store/file/predicate/PredicateTest.java | 736 +++------------------
.../SearchArgumentToPredicateConverterTest.java | 20 +-
.../table/store/spark/SparkFilterConverter.java | 12 +
.../store/spark/SparkFilterConverterTest.java | 7 +
10 files changed, 817 insertions(+), 744 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
new file mode 100644
index 00000000..aa338945
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafFunction} to eval in. */
+public class In implements LeafFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final In INSTANCE = new In();
+
+ private In() {}
+
+ @Override
+ public boolean test(LogicalType type, Object field, List<Object> literals) {
+ if (field == null) {
+ return false;
+ }
+ for (Object literal : literals) {
+ if (literal != null && compareLiteral(type, literal, field) == 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean test(
+ LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals) {
+ if (rowCount == fieldStats.nullCount()) {
+ return false;
+ }
+ for (Object literal : literals) {
+ if (literal != null
+ && compareLiteral(type, literal, fieldStats.minValue()) >= 0
+ && compareLiteral(type, literal, fieldStats.maxValue()) <= 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Optional<LeafFunction> negate() {
+ return Optional.of(NotIn.INSTANCE);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
new file mode 100644
index 00000000..d710da0f
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafFunction} to eval not in. */
+public class NotIn implements LeafFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final NotIn INSTANCE = new NotIn();
+
+ private NotIn() {}
+
+ @Override
+ public boolean test(LogicalType type, Object field, List<Object> literals) {
+ if (field == null) {
+ return false;
+ }
+ for (Object literal : literals) {
+ if (literal == null || compareLiteral(type, literal, field) == 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean test(
+ LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals) {
+ if (rowCount == fieldStats.nullCount()) {
+ return false;
+ }
+ for (Object literal : literals) {
+ if (literal == null
+ || (compareLiteral(type, literal, fieldStats.minValue()) == 0
+ && compareLiteral(type, literal, fieldStats.maxValue()) == 0)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Optional<LeafFunction> negate() {
+ return Optional.of(In.INSTANCE);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index c3888ecf..93921a50 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -95,13 +95,11 @@ public class PredicateBuilder {
}
public Predicate in(int idx, List<Object> literals) {
- Preconditions.checkArgument(
- literals.size() > 0,
- "There must be at least 1 literal to construct an IN predicate");
- return literals.stream()
- .map(l -> equal(idx, l))
- .reduce((a, b) -> new CompoundPredicate(Or.INSTANCE, Arrays.asList(a, b)))
- .get();
+ return new LeafPredicate(In.INSTANCE, rowType.getTypeAt(idx), idx, literals);
+ }
+
+ public Predicate notIn(int idx, List<Object> literals) {
+ return new LeafPredicate(NotIn.INSTANCE, rowType.getTypeAt(idx), idx, literals);
}
public Predicate between(int idx, Object includedLowerBound, Object includedUpperBound) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index f53ee9bc..baaf768d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -83,6 +84,14 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
return visitBiFunction(children, builder::lessThan, builder::greaterThan);
} else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) {
return visitBiFunction(children, builder::lessOrEqual, builder::greaterOrEqual);
+ } else if (func == BuiltInFunctionDefinitions.IN) {
+ FieldReferenceExpression fieldRefExpr =
+ extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
+ List<Object> literals = new ArrayList<>();
+ for (int i = 1; i < children.size(); i++) {
+ literals.add(extractLiteral(fieldRefExpr.getOutputDataType(), children.get(i)));
+ }
+ return builder.in(fieldRefExpr.getInputIndex(), literals);
} else if (func == BuiltInFunctionDefinitions.IS_NULL) {
return extractFieldReference(children.get(0))
.map(FieldReferenceExpression::getFieldIndex)
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
index 4f3bcbeb..4152b272 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
@@ -31,76 +31,6 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link PredicateBuilder}. */
public class PredicateBuilderTest {
- @Test
- public void testIn() {
- PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- Predicate predicate = builder.in(0, Arrays.asList(1, 3));
-
- assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
- assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
- assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
- assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
-
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
- .isEqualTo(false);
- }
-
- @Test
- public void testInNull() {
- PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- Predicate predicate = builder.in(0, Arrays.asList(1, null, 3));
-
- assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
- assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
- assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
- assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
-
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
- .isEqualTo(false);
- }
-
- @Test
- public void testNotIn() {
- PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- Predicate predicate = builder.in(0, Arrays.asList(1, 3)).negate().get();
-
- assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
- assertThat(predicate.test(new Object[] {2})).isEqualTo(true);
- assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
- assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
-
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(true);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
- .isEqualTo(false);
- }
-
- @Test
- public void testNotInNull() {
- PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- Predicate predicate = builder.in(0, Arrays.asList(1, null, 3)).negate().get();
-
- assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
- assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
- assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
- assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
-
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
- .isEqualTo(false);
- }
-
@Test
public void testBetween() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
index 7ffa337b..137232ad 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
@@ -19,23 +19,35 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -63,6 +75,7 @@ public class PredicateConverterTest {
FieldReferenceExpression longRefExpr =
new FieldReferenceExpression("long1", DataTypes.BIGINT(), 0, 0);
ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
+ ValueLiteralExpression intLitExpr2 = new ValueLiteralExpression(20);
long longLit = 10L;
ValueLiteralExpression nullLongLitExpr =
new ValueLiteralExpression(null, DataTypes.BIGINT());
@@ -189,6 +202,544 @@ public class PredicateConverterTest {
DataTypes.BOOLEAN())),
DataTypes.BOOLEAN()),
PredicateBuilder.or(
- BUILDER.notEqual(0, longLit), BUILDER.equal(1, doubleLit))));
+ BUILDER.notEqual(0, longLit), BUILDER.equal(1, doubleLit))),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.IN,
+ Arrays.asList(
+ longRefExpr, intLitExpr, nullLongLitExpr, intLitExpr2),
+ DataTypes.BOOLEAN()),
+ BUILDER.in(0, Arrays.asList(10L, null, 20L))));
+ }
+
+ @MethodSource("provideLikeExpressions")
+ @ParameterizedTest
+ public void testStartsWith(
+ CallExpression callExpression,
+ List<Object[]> valuesList,
+ List<Boolean> expectedForValues,
+ List<Long> rowCountList,
+ List<FieldStats[]> statsList,
+ List<Boolean> expectedForStats) {
+ Predicate predicate =
+ callExpression.accept(new PredicateConverter(RowType.of(new VarCharType())));
+ IntStream.range(0, valuesList.size())
+ .forEach(
+ i ->
+ assertThat(predicate.test(valuesList.get(i)))
+ .isEqualTo(expectedForValues.get(i)));
+ IntStream.range(0, rowCountList.size())
+ .forEach(
+ i ->
+ assertThat(predicate.test(rowCountList.get(i), statsList.get(i)))
+ .isEqualTo(expectedForStats.get(i)));
+ }
+
+ public static Stream<Arguments> provideLikeExpressions() {
+ CallExpression expr1 =
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("abd%", STRING()));
+ List<Object[]> valuesList1 =
+ Arrays.asList(
+ new Object[] {null},
+ new Object[] {StringData.fromString("a")},
+ new Object[] {StringData.fromString("ab")},
+ new Object[] {StringData.fromString("abd")},
+ new Object[] {StringData.fromString("abd%")},
+ new Object[] {StringData.fromString("abd1")},
+ new Object[] {StringData.fromString("abde@")},
+ new Object[] {StringData.fromString("abd_")},
+ new Object[] {StringData.fromString("abd_%")});
+ List<Boolean> expectedForValues1 =
+ Arrays.asList(false, false, false, true, true, true, true, true, true);
+ List<Long> rowCountList1 = Arrays.asList(0L, 3L, 3L, 3L);
+ List<FieldStats[]> statsList1 =
+ Arrays.asList(
+ new FieldStats[] {new FieldStats(null, null, 0L)},
+ new FieldStats[] {new FieldStats(null, null, 3L)},
+ new FieldStats[] {
+ new FieldStats(
+ StringData.fromString("ab"),
+ StringData.fromString("abc123"),
+ 1L)
+ },
+ new FieldStats[] {
+ new FieldStats(
+ StringData.fromString("abc"), StringData.fromString("abe"), 1L)
+ });
+ List<Boolean> expectedForStats1 = Arrays.asList(false, false, false, true);
+
+ CallExpression expr2 =
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("test=_%", STRING()),
+ literal("=", STRING()));
+ List<Object[]> valuesList2 =
+ Arrays.asList(
+ new Object[] {StringData.fromString("test%")},
+ new Object[] {StringData.fromString("test_123")},
+ new Object[] {StringData.fromString("test_%")},
+ new Object[] {StringData.fromString("test__")});
+ List<Boolean> expectedForValues2 = Arrays.asList(false, true, true, true);
+ List<Long> rowCountList2 = Collections.singletonList(3L);
+ List<FieldStats[]> statsList2 =
+ Collections.singletonList(
+ new FieldStats[] {
+ new FieldStats(
+ StringData.fromString("test_123"),
+ StringData.fromString("test_789"),
+ 0L)
+ });
+ List<Boolean> expectedForStats2 = Collections.singletonList(true);
+
+ // currently, SQL wildcards '[]' and '[^]' are deemed as normal characters in Flink
+ CallExpression expr3 =
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("[a-c]xyz%", STRING()));
+ List<Object[]> valuesList3 =
+ Arrays.asList(
+ new Object[] {StringData.fromString("axyz")},
+ new Object[] {StringData.fromString("bxyz")},
+ new Object[] {StringData.fromString("cxyz")},
+ new Object[] {StringData.fromString("[a-c]xyz")});
+ List<Boolean> expectedForValues3 = Arrays.asList(false, false, false, true);
+ List<Long> rowCountList3 = Collections.singletonList(3L);
+ List<FieldStats[]> statsList3 =
+ Collections.singletonList(
+ new FieldStats[] {
+ new FieldStats(
+ StringData.fromString("[a-c]xyz"),
+ StringData.fromString("[a-c]xyzz"),
+ 0L)
+ });
+ List<Boolean> expectedForStats3 = Collections.singletonList(true);
+
+ CallExpression expr4 =
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("[^a-d]xyz%", STRING()));
+ List<Object[]> valuesList4 =
+ Arrays.asList(
+ new Object[] {StringData.fromString("exyz")},
+ new Object[] {StringData.fromString("fxyz")},
+ new Object[] {StringData.fromString("axyz")},
+ new Object[] {StringData.fromString("[^a-d]xyz")});
+ List<Boolean> expectedForValues4 = Arrays.asList(false, false, false, true);
+ List<Long> rowCountList4 = Collections.singletonList(3L);
+ List<FieldStats[]> statsList4 =
+ Collections.singletonList(
+ new FieldStats[] {
+ new FieldStats(
+ StringData.fromString("[^a-d]xyz"),
+ StringData.fromString("[^a-d]xyzz"),
+ 1L)
+ });
+ List<Boolean> expectedForStats4 = Collections.singletonList(true);
+
+ return Stream.of(
+ Arguments.of(
+ expr1,
+ valuesList1,
+ expectedForValues1,
+ rowCountList1,
+ statsList1,
+ expectedForStats1),
+ Arguments.of(
+ expr2,
+ valuesList2,
+ expectedForValues2,
+ rowCountList2,
+ statsList2,
+ expectedForStats2),
+ Arguments.of(
+ expr3,
+ valuesList3,
+ expectedForValues3,
+ rowCountList3,
+ statsList3,
+ expectedForStats3),
+ Arguments.of(
+ expr4,
+ valuesList4,
+ expectedForValues4,
+ rowCountList4,
+ statsList4,
+ expectedForStats4));
+ }
+
+ @Test
+ public void testUnsupportedExpression() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.AND,
+ call(
+ BuiltInFunctionDefinitions.EQUALS,
+ field(0, DataTypes.INT()),
+ literal(3)),
+ call(
+ BuiltInFunctionDefinitions.SIMILAR,
+ field(1, DataTypes.INT()),
+ literal(5)));
+ assertThatThrownBy(
+ () ->
+ expression.accept(
+ new PredicateConverter(
+ RowType.of(new IntType(), new IntType()))))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ }
+
+ @Test
+ public void testUnsupportedStartsPatternForLike() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+ // starts pattern with '_' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("abc_", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // starts pattern like 'abc%xyz' or 'abc_xyz'
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("abc%xyz", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("abc_xyz", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // starts pattern like 'abc%xyz' or 'abc_xyz' with '%' or '_' to escape
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "=%abc=%%xyz=_",
+ STRING()), // matches "%abc%(?s:.*)xyz_"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "abc=%%xyz",
+ STRING()), // matches "abc%(?s:.*)xyz"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "abc=%_xyz",
+ STRING()), // matches "abc%.xyz"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "abc=_%xyz",
+ STRING()), // matches "abc_(?s:.*)xyz"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "abc=__xyz",
+ STRING()), // matches "abc_.xyz"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // starts pattern with wildcard '%' at the beginning to escape
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("=%%", STRING()), // matches "%(?s:.*)"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ }
+
+ @Test
+ public void testUnsupportedEndsPatternForLike() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+ // ends pattern with '%' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("%456", STRING())) // matches "(?s:.*)456"
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // ends pattern with '_' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("_456", STRING())) // matches ".456"
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // ends pattern with '[]' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("_[456]", STRING())) // matches ".[456]"
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "%[h-m]",
+ STRING())) // matches "(?s:.*)[h-m]"
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // ends pattern with '[^]' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "%[^h-m]",
+ STRING())) // matches "(?s:.*)[^h-m]"
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("_[^xyz]", STRING())) // matches ".[^xyz]"
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // ends pattern escape wildcard '%'
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "%=%456",
+ STRING()), // matches "(?s:.*)%456"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "%=_456",
+ STRING()), // matches "(?s:.*)_456"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // ends pattern escape wildcard '_'
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("_=_456", STRING()), // matches "._456"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ }
+
+ @Test
+ public void testUnsupportedEqualsPatternForLike() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+ // equals pattern
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("123456", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // equals pattern escape '%'
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("12=%45", STRING()), // equals "12%45"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // equals pattern escape '_'
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("12=_45", STRING()), // equals "12_45"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ }
+
+ @Test
+ public void testUnsupportedMiddlePatternForLike() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+ // middle pattern with '%' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("%345%", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // middle pattern with '_' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("_345_", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // middle pattern with both '%' and '_' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("_345%", STRING())) // matches ".345(?s:.*)"
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal("%345_", STRING())) // matches "(?s:.*)345."
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // middle pattern with '[]' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "%[a-c]_",
+ STRING())) // matches "(?s:.*)[a-c]."
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // middle pattern with '[^]' as wildcard
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "%[^abc]_",
+ STRING())) // matches "(?s:.*)[^abc]."
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // middle pattern escape '%'
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "%34=%5%",
+ STRING()), // matches "(?s:.*)34%5(.*)"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+
+ // middle pattern escape '_'
+ assertThatThrownBy(
+ () ->
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(0, STRING()),
+ literal(
+ "%34=_5%",
+ STRING()), // matches "(?s:.*)34_5(.*)"
+ literal("=", STRING()))
+ .accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ }
+
+ @Test
+ public void testUnsupportedType() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
+ DataType structType = DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.EQUALS,
+ field(0, structType),
+ literal(Row.of(1), structType));
+ assertThatThrownBy(() -> expression.accept(converter))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ }
+
+ private static FieldReferenceExpression field(int i, DataType type) {
+ return new FieldReferenceExpression("name", type, 0, i);
+ }
+
+ private static CallExpression call(FunctionDefinition function, ResolvedExpression... args) {
+ return new CallExpression(function, Arrays.asList(args), DataTypes.BOOLEAN());
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index 36b8ba8e..ce15619f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -18,35 +18,15 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.store.format.FieldStats;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-import static org.apache.flink.table.api.DataTypes.STRING;
-import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link Predicate}s. */
public class PredicateTest {
@@ -54,9 +34,7 @@ public class PredicateTest {
@Test
public void testEqual() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(BuiltInFunctionDefinitions.EQUALS, field(0, DataTypes.INT()), literal(5));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.equal(0, 5);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -74,12 +52,7 @@ public class PredicateTest {
@Test
public void testEqualNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.EQUALS,
- field(0, DataTypes.INT()),
- literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.equal(0, null);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -92,9 +65,7 @@ public class PredicateTest {
@Test
public void testNotEqual() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(BuiltInFunctionDefinitions.NOT_EQUALS, field(0, DataTypes.INT()), literal(5));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.notEqual(0, 5);
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -113,12 +84,7 @@ public class PredicateTest {
@Test
public void testNotEqualNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.NOT_EQUALS,
- field(0, DataTypes.INT()),
- literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.notEqual(0, null);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -131,12 +97,7 @@ public class PredicateTest {
@Test
public void testGreater() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.GREATER_THAN,
- field(0, DataTypes.INT()),
- literal(5));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.greaterThan(0, 5);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -156,12 +117,7 @@ public class PredicateTest {
@Test
public void testGreaterNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.GREATER_THAN,
- field(0, DataTypes.INT()),
- literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.greaterThan(0, null);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -174,12 +130,7 @@ public class PredicateTest {
@Test
public void testGreaterOrEqual() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
- field(0, DataTypes.INT()),
- literal(5));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.greaterOrEqual(0, 5);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -199,12 +150,7 @@ public class PredicateTest {
@Test
public void testGreaterOrEqualNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
- field(0, DataTypes.INT()),
- literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.greaterOrEqual(0, null);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -217,9 +163,7 @@ public class PredicateTest {
@Test
public void testLess() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(BuiltInFunctionDefinitions.LESS_THAN, field(0, DataTypes.INT()), literal(5));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.lessThan(0, 5);
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -238,12 +182,7 @@ public class PredicateTest {
@Test
public void testLessNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.LESS_THAN,
- field(0, DataTypes.INT()),
- literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.lessThan(0, null);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -256,12 +195,7 @@ public class PredicateTest {
@Test
public void testLessOrEqual() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
- field(0, DataTypes.INT()),
- literal(5));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.lessOrEqual(0, 5);
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -280,12 +214,7 @@ public class PredicateTest {
@Test
public void testLessOrEqualNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
- field(0, DataTypes.INT()),
- literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.lessOrEqual(0, null);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -298,12 +227,7 @@ public class PredicateTest {
@Test
public void testIsNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.IS_NULL,
- field(0, DataTypes.INT()),
- literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.isNull(0);
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(true);
@@ -317,12 +241,7 @@ public class PredicateTest {
@Test
public void testIsNotNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.IS_NOT_NULL,
- field(0, DataTypes.INT()),
- literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = builder.isNotNull(0);
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -335,21 +254,80 @@ public class PredicateTest {
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNull(0));
}
+ @Test
+ public void testIn() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+ Predicate predicate = builder.in(0, Arrays.asList(1, 3));
+
+ assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testInNull() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+ Predicate predicate = builder.in(0, Arrays.asList(1, null, 3));
+
+ assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testNotIn() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+ Predicate predicate = builder.notIn(0, Arrays.asList(1, 3));
+
+ assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {2})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testNotInNull() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+ Predicate predicate = builder.notIn(0, Arrays.asList(1, null, 3));
+
+ assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+ .isEqualTo(false);
+ }
+
@Test
public void testAnd() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.AND,
- call(
- BuiltInFunctionDefinitions.EQUALS,
- field(0, DataTypes.INT()),
- literal(3)),
- call(
- BuiltInFunctionDefinitions.EQUALS,
- field(1, DataTypes.INT()),
- literal(5)));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = PredicateBuilder.and(builder.equal(0, 3), builder.equal(1, 5));
assertThat(predicate.test(new Object[] {4, 5})).isEqualTo(false);
assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(false);
@@ -385,18 +363,7 @@ public class PredicateTest {
@Test
public void testOr() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.OR,
- call(
- BuiltInFunctionDefinitions.EQUALS,
- field(0, DataTypes.INT()),
- literal(3)),
- call(
- BuiltInFunctionDefinitions.EQUALS,
- field(1, DataTypes.INT()),
- literal(5)));
- Predicate predicate = expression.accept(new PredicateConverter(builder));
+ Predicate predicate = PredicateBuilder.or(builder.equal(0, 3), builder.equal(1, 5));
assertThat(predicate.test(new Object[] {4, 6})).isEqualTo(false);
assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(true);
@@ -428,535 +395,4 @@ public class PredicateTest {
assertThat(predicate.negate().orElse(null))
.isEqualTo(PredicateBuilder.and(builder.notEqual(0, 3), builder.notEqual(1, 5)));
}
-
- @MethodSource("provideLikeExpressions")
- @ParameterizedTest
- public void testStartsWith(
- CallExpression callExpression,
- List<Object[]> valuesList,
- List<Boolean> expectedForValues,
- List<Long> rowCountList,
- List<FieldStats[]> statsList,
- List<Boolean> expectedForStats) {
- Predicate predicate =
- callExpression.accept(new PredicateConverter(RowType.of(new VarCharType())));
- IntStream.range(0, valuesList.size())
- .forEach(
- i ->
- assertThat(predicate.test(valuesList.get(i)))
- .isEqualTo(expectedForValues.get(i)));
- IntStream.range(0, rowCountList.size())
- .forEach(
- i ->
- assertThat(predicate.test(rowCountList.get(i), statsList.get(i)))
- .isEqualTo(expectedForStats.get(i)));
- }
-
- @Test
- public void testUnsupportedExpression() {
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.AND,
- call(
- BuiltInFunctionDefinitions.EQUALS,
- field(0, DataTypes.INT()),
- literal(3)),
- call(
- BuiltInFunctionDefinitions.SIMILAR,
- field(1, DataTypes.INT()),
- literal(5)));
- assertThatThrownBy(
- () ->
- expression.accept(
- new PredicateConverter(
- RowType.of(new IntType(), new IntType()))))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- }
-
- @Test
- public void testUnsupportedStartsPatternForLike() {
- PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
- // starts pattern with '_' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("abc_", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // starts pattern like 'abc%xyz' or 'abc_xyz'
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("abc%xyz", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("abc_xyz", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // starts pattern like 'abc%xyz' or 'abc_xyz' with '%' or '_' to escape
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "=%abc=%%xyz=_",
- STRING()), // matches "%abc%(?s:.*)xyz_"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "abc=%%xyz",
- STRING()), // matches "abc%(?s:.*)xyz"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "abc=%_xyz",
- STRING()), // matches "abc%.xyz"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "abc=_%xyz",
- STRING()), // matches "abc_(?s:.*)xyz"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "abc=__xyz",
- STRING()), // matches "abc_.xyz"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // starts pattern with wildcard '%' at the beginning to escape
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("=%%", STRING()), // matches "%(?s:.*)"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- }
-
- @Test
- public void testUnsupportedEndsPatternForLike() {
- PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
- // ends pattern with '%' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("%456", STRING())) // matches "(?s:.*)456"
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // ends pattern with '_' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("_456", STRING())) // matches ".456"
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // ends pattern with '[]' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("_[456]", STRING())) // matches ".[456]"
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "%[h-m]",
- STRING())) // matches "(?s:.*)[h-m]"
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // ends pattern with '[^]' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "%[^h-m]",
- STRING())) // matches "(?s:.*)[^h-m]"
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("_[^xyz]", STRING())) // matches ".[^xyz]"
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // ends pattern escape wildcard '%'
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "%=%456",
- STRING()), // matches "(?s:.*)%456"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "%=_456",
- STRING()), // matches "(?s:.*)_456"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // ends pattern escape wildcard '_'
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("_=_456", STRING()), // matches "._456"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- }
-
- @Test
- public void testUnsupportedEqualsPatternForLike() {
- PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
- // equals pattern
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("123456", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // equals pattern escape '%'
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("12=%45", STRING()), // equals "12%45"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // equals pattern escape '_'
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("12=_45", STRING()), // equals "12_45"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- }
-
- @Test
- public void testUnsupportedMiddlePatternForLike() {
- PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
- // middle pattern with '%' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("%345%", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // middle pattern with '_' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("_345_", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // middle pattern with both '%' and '_' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("_345%", STRING())) // matches ".345(?s:.*)"
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("%345_", STRING())) // matches "(?s:.*)345."
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // middle pattern with '[]' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "%[a-c]_",
- STRING())) // matches "(?s:.*)[a-c]."
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // middle pattern with '[^]' as wildcard
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "%[^abc]_",
- STRING())) // matches "(?s:.*)[^abc]."
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // middle pattern escape '%'
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "%34=%5%",
- STRING()), // matches "(?s:.*)34%5(.*)"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
-
- // middle pattern escape '_'
- assertThatThrownBy(
- () ->
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal(
- "%34=_5%",
- STRING()), // matches "(?s:.*)34_5(.*)"
- literal("=", STRING()))
- .accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- }
-
- @Test
- public void testUnsupportedType() {
- PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
- DataType structType = DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
- CallExpression expression =
- call(
- BuiltInFunctionDefinitions.EQUALS,
- field(0, structType),
- literal(Row.of(1), structType));
- assertThatThrownBy(() -> expression.accept(converter))
- .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
- }
-
- public static Stream<Arguments> provideLikeExpressions() {
- CallExpression expr1 =
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("abd%", STRING()));
- List<Object[]> valuesList1 =
- Arrays.asList(
- new Object[] {null},
- new Object[] {StringData.fromString("a")},
- new Object[] {StringData.fromString("ab")},
- new Object[] {StringData.fromString("abd")},
- new Object[] {StringData.fromString("abd%")},
- new Object[] {StringData.fromString("abd1")},
- new Object[] {StringData.fromString("abde@")},
- new Object[] {StringData.fromString("abd_")},
- new Object[] {StringData.fromString("abd_%")});
- List<Boolean> expectedForValues1 =
- Arrays.asList(false, false, false, true, true, true, true, true, true);
- List<Long> rowCountList1 = Arrays.asList(0L, 3L, 3L, 3L);
- List<FieldStats[]> statsList1 =
- Arrays.asList(
- new FieldStats[] {new FieldStats(null, null, 0L)},
- new FieldStats[] {new FieldStats(null, null, 3L)},
- new FieldStats[] {
- new FieldStats(
- StringData.fromString("ab"),
- StringData.fromString("abc123"),
- 1L)
- },
- new FieldStats[] {
- new FieldStats(
- StringData.fromString("abc"), StringData.fromString("abe"), 1L)
- });
- List<Boolean> expectedForStats1 = Arrays.asList(false, false, false, true);
-
- CallExpression expr2 =
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("test=_%", STRING()),
- literal("=", STRING()));
- List<Object[]> valuesList2 =
- Arrays.asList(
- new Object[] {StringData.fromString("test%")},
- new Object[] {StringData.fromString("test_123")},
- new Object[] {StringData.fromString("test_%")},
- new Object[] {StringData.fromString("test__")});
- List<Boolean> expectedForValues2 = Arrays.asList(false, true, true, true);
- List<Long> rowCountList2 = Collections.singletonList(3L);
- List<FieldStats[]> statsList2 =
- Collections.singletonList(
- new FieldStats[] {
- new FieldStats(
- StringData.fromString("test_123"),
- StringData.fromString("test_789"),
- 0L)
- });
- List<Boolean> expectedForStats2 = Collections.singletonList(true);
-
- // currently, SQL wildcards '[]' and '[^]' are deemed as normal characters in Flink
- CallExpression expr3 =
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("[a-c]xyz%", STRING()));
- List<Object[]> valuesList3 =
- Arrays.asList(
- new Object[] {StringData.fromString("axyz")},
- new Object[] {StringData.fromString("bxyz")},
- new Object[] {StringData.fromString("cxyz")},
- new Object[] {StringData.fromString("[a-c]xyz")});
- List<Boolean> expectedForValues3 = Arrays.asList(false, false, false, true);
- List<Long> rowCountList3 = Collections.singletonList(3L);
- List<FieldStats[]> statsList3 =
- Collections.singletonList(
- new FieldStats[] {
- new FieldStats(
- StringData.fromString("[a-c]xyz"),
- StringData.fromString("[a-c]xyzz"),
- 0L)
- });
- List<Boolean> expectedForStats3 = Collections.singletonList(true);
-
- CallExpression expr4 =
- call(
- BuiltInFunctionDefinitions.LIKE,
- field(0, STRING()),
- literal("[^a-d]xyz%", STRING()));
- List<Object[]> valuesList4 =
- Arrays.asList(
- new Object[] {StringData.fromString("exyz")},
- new Object[] {StringData.fromString("fxyz")},
- new Object[] {StringData.fromString("axyz")},
- new Object[] {StringData.fromString("[^a-d]xyz")});
- List<Boolean> expectedForValues4 = Arrays.asList(false, false, false, true);
- List<Long> rowCountList4 = Collections.singletonList(3L);
- List<FieldStats[]> statsList4 =
- Collections.singletonList(
- new FieldStats[] {
- new FieldStats(
- StringData.fromString("[^a-d]xyz"),
- StringData.fromString("[^a-d]xyzz"),
- 1L)
- });
- List<Boolean> expectedForStats4 = Collections.singletonList(true);
-
- return Stream.of(
- Arguments.of(
- expr1,
- valuesList1,
- expectedForValues1,
- rowCountList1,
- statsList1,
- expectedForStats1),
- Arguments.of(
- expr2,
- valuesList2,
- expectedForValues2,
- rowCountList2,
- statsList2,
- expectedForStats2),
- Arguments.of(
- expr3,
- valuesList3,
- expectedForValues3,
- rowCountList3,
- statsList3,
- expectedForStats3),
- Arguments.of(
- expr4,
- valuesList4,
- expectedForValues4,
- rowCountList4,
- statsList4,
- expectedForStats4));
- }
-
- private static FieldReferenceExpression field(int i, DataType type) {
- return new FieldReferenceExpression("name", type, 0, i);
- }
-
- private static CallExpression call(FunctionDefinition function, ResolvedExpression... args) {
- return new CallExpression(function, Arrays.asList(args), DataTypes.BOOLEAN());
- }
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
index 24406d79..0ab02e1a 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
@@ -174,9 +174,7 @@ public class SearchArgumentToPredicateConverterTest {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
SearchArgument sarg =
builder.in("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L, 300L).build();
- Predicate expected =
- PredicateBuilder.or(
- BUILDER.equal(1, 100L), BUILDER.equal(1, 200L), BUILDER.equal(1, 300L));
+ Predicate expected = BUILDER.in(1, Arrays.asList(100L, 200L, 300L));
assertExpected(sarg, expected);
}
@@ -185,9 +183,7 @@ public class SearchArgumentToPredicateConverterTest {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
SearchArgument sarg =
builder.in("f_bigint", PredicateLeaf.Type.LONG, 100L, null, 300L).build();
- Predicate expected =
- PredicateBuilder.or(
- BUILDER.equal(1, 100L), BUILDER.equal(1, null), BUILDER.equal(1, 300L));
+ Predicate expected = BUILDER.in(1, Arrays.asList(100L, null, 300L));
assertExpected(sarg, expected);
}
@@ -199,11 +195,7 @@ public class SearchArgumentToPredicateConverterTest {
.in("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L, 300L)
.end()
.build();
- Predicate expected =
- PredicateBuilder.and(
- BUILDER.notEqual(1, 100L),
- BUILDER.notEqual(1, 200L),
- BUILDER.notEqual(1, 300L));
+ Predicate expected = BUILDER.notIn(1, Arrays.asList(100L, 200L, 300L));
assertExpected(sarg, expected);
}
@@ -215,11 +207,7 @@ public class SearchArgumentToPredicateConverterTest {
.in("f_bigint", PredicateLeaf.Type.LONG, 100L, null, 300L)
.end()
.build();
- Predicate expected =
- PredicateBuilder.and(
- BUILDER.notEqual(1, 100L),
- BUILDER.notEqual(1, null),
- BUILDER.notEqual(1, 300L));
+ Predicate expected = BUILDER.notIn(1, Arrays.asList(100L, null, 300L));
assertExpected(sarg, expected);
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
index 6213ca39..285911bb 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
@@ -28,6 +28,7 @@ import org.apache.spark.sql.sources.EqualTo;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.GreaterThan;
import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.In;
import org.apache.spark.sql.sources.IsNotNull;
import org.apache.spark.sql.sources.IsNull;
import org.apache.spark.sql.sources.LessThan;
@@ -36,6 +37,9 @@ import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.Or;
import org.apache.spark.sql.sources.StringStartsWith;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
/** Conversion from {@link Filter} to {@link Predicate}. */
@@ -76,6 +80,14 @@ public class SparkFilterConverter {
int index = fieldIndex(lt.attribute());
Object literal = convertLiteral(index, lt.value());
return builder.lessOrEqual(index, literal);
+ } else if (filter instanceof In) {
+ In in = (In) filter;
+ int index = fieldIndex(in.attribute());
+ return builder.in(
+ index,
+ Arrays.stream(in.values())
+ .map(v -> convertLiteral(index, v))
+ .collect(Collectors.toList()));
} else if (filter instanceof IsNull) {
return builder.isNull(fieldIndex(((IsNull) filter).attribute()));
} else if (filter instanceof IsNotNull) {
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
index f2c3a711..8b35a31f 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.TimestampType;
import org.apache.spark.sql.sources.EqualTo;
import org.apache.spark.sql.sources.GreaterThan;
import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.In;
import org.apache.spark.sql.sources.IsNotNull;
import org.apache.spark.sql.sources.IsNull;
import org.apache.spark.sql.sources.LessThan;
@@ -41,6 +42,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
+import java.util.Arrays;
import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
@@ -115,6 +117,11 @@ public class SparkFilterConverterTest {
Predicate expectedEqNull = builder.equal(0, null);
Predicate actualEqNull = converter.convert(eqNull);
assertThat(actualEqNull).isEqualTo(expectedEqNull);
+
+ In in = In.apply(field, new Object[] {1, null, 2});
+ Predicate expectedIn = builder.in(0, Arrays.asList(1, null, 2));
+ Predicate actualIn = converter.convert(in);
+ assertThat(actualIn).isEqualTo(expectedIn);
}
@Test