You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/22 04:03:54 UTC
[flink-table-store] branch master updated: [FLINK-28179] LeafPredicate accepts multiple literals
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new cceca50 [FLINK-28179] LeafPredicate accepts multiple literals
cceca50 is described below
commit cceca50c778a0c450f959d279c69f98a5960ad3f
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Jun 22 12:03:49 2022 +0800
[FLINK-28179] LeafPredicate accepts multiple literals
This closes #168
---
.../store/connector/source/TableStoreSource.java | 4 +-
.../file/operation/AbstractFileStoreScan.java | 9 +-
.../store/file/operation/FileStoreCommitImpl.java | 2 +-
.../predicate/{IsNull.java => CompareUtils.java} | 42 ++--
.../flink/table/store/file/predicate/Equal.java | 25 +--
.../table/store/file/predicate/GreaterOrEqual.java | 23 +--
.../table/store/file/predicate/GreaterThan.java | 23 +--
.../table/store/file/predicate/IsNotNull.java | 17 +-
.../flink/table/store/file/predicate/IsNull.java | 17 +-
.../{IsNotNull.java => LeafBinaryFunction.java} | 26 +--
.../predicate/{IsNull.java => LeafFunction.java} | 26 +--
.../table/store/file/predicate/LeafPredicate.java | 79 ++++++--
.../{IsNotNull.java => LeafUnaryFunction.java} | 25 ++-
.../table/store/file/predicate/LessOrEqual.java | 23 +--
.../flink/table/store/file/predicate/LessThan.java | 23 +--
.../flink/table/store/file/predicate/Literal.java | 173 ----------------
.../flink/table/store/file/predicate/NotEqual.java | 25 +--
.../store/file/predicate/PredicateBuilder.java | 172 ++++++++++++----
.../store/file/predicate/PredicateConverter.java | 98 +++++----
.../table/store/file/predicate/StartsWith.java | 27 ++-
.../flink/table/store/table/sink/TableCompact.java | 3 +-
.../flink/table/store/table/source/TableScan.java | 5 +-
.../file/operation/KeyValueFileStoreScanTest.java | 4 +-
.../store/file/predicate/PredicateBuilderTest.java | 32 +--
.../file/predicate/PredicateConverterTest.java | 34 ++--
.../table/store/file/predicate/PredicateTest.java | 220 +++++++--------------
.../store/table/AppendOnlyFileStoreTableTest.java | 12 +-
.../ChangelogValueCountFileStoreTableTest.java | 12 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 20 +-
.../store/SearchArgumentToPredicateConverter.java | 26 ++-
.../SearchArgumentToPredicateConverterTest.java | 63 +++---
.../table/store/spark/SparkFilterConverter.java | 37 ++--
.../store/spark/SparkFilterConverterTest.java | 53 +++--
33 files changed, 624 insertions(+), 756 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 5b92912..d181e42 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
@@ -184,8 +185,9 @@ public class TableStoreSource
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
List<Predicate> converted = new ArrayList<>();
+ RowType rowType = table.schema().logicalRowType();
for (ResolvedExpression filter : filters) {
- PredicateConverter.convert(filter).ifPresent(converted::add);
+ PredicateConverter.convert(rowType, filter).ifPresent(converted::add);
}
predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted);
return Result.of(filters, filters);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 1a82f29..4745d41 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
@@ -90,16 +89,14 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
@Override
public FileStoreScan withPartitionFilter(List<BinaryRowData> partitions) {
+ PredicateBuilder builder = new PredicateBuilder(partitionConverter.rowType());
Function<BinaryRowData, Predicate> partitionToPredicate =
p -> {
List<Predicate> fieldPredicates = new ArrayList<>();
Object[] partitionObjects = partitionConverter.convert(p);
for (int i = 0; i < partitionConverter.getArity(); i++) {
- Literal l =
- new Literal(
- partitionConverter.rowType().getTypeAt(i),
- partitionObjects[i]);
- fieldPredicates.add(PredicateBuilder.equal(i, l));
+ Object partition = partitionObjects[i];
+ fieldPredicates.add(builder.equal(i, partition));
}
return PredicateBuilder.and(fieldPredicates);
};
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 79b0f8c..24d3301 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -204,7 +204,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
// sanity check, all changes must be done within the given partition
- Predicate partitionFilter = PredicateConverter.CONVERTER.fromMap(partition, partitionType);
+ Predicate partitionFilter = PredicateConverter.fromMap(partition, partitionType);
if (partitionFilter != null) {
for (ManifestEntry entry : appendChanges) {
if (!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
similarity index 53%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
index a227972..8c8ae4d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
@@ -18,29 +18,23 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
-
-import java.util.Optional;
-
-/** A {@link LeafPredicate.Function} to eval is null. */
-public class IsNull implements LeafPredicate.Function {
-
- public static final IsNull INSTANCE = new IsNull();
-
- private IsNull() {}
-
- @Override
- public boolean test(Object[] values, int index, Literal literal) {
- return values[index] == null;
- }
-
- @Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- return fieldStats[index].nullCount() > 0;
- }
-
- @Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(IsNotNull.INSTANCE, index, literal));
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** Utils for comparator. */
+public class CompareUtils {
+ private CompareUtils() {}
+
+ private static final BytePrimitiveArrayComparator BINARY_COMPARATOR =
+ new BytePrimitiveArrayComparator(true);
+
+ public static int compareLiteral(LogicalType type, Object v1, Object v2) {
+ if (v1 instanceof Comparable) {
+ return ((Comparable<Object>) v1).compareTo(v2);
+ } else if (v1 instanceof byte[]) {
+ return BINARY_COMPARATOR.compare((byte[]) v1, (byte[]) v2);
+ } else {
+ throw new RuntimeException("Unsupported type: " + type);
+ }
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
index 074acf1..cf9a0f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
@@ -19,34 +19,35 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval equal. */
-public class Equal implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval equal. */
+public class Equal extends LeafBinaryFunction {
public static final Equal INSTANCE = new Equal();
private Equal() {}
@Override
- public boolean test(Object[] values, int index, Literal literal) {
- Object field = values[index];
- return field != null && literal.compareValueTo(field) == 0;
+ public boolean test(LogicalType type, Object field, Object literal) {
+ return field != null && compareLiteral(type, literal, field) == 0;
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- FieldStats stats = fieldStats[index];
- if (rowCount == stats.nullCount()) {
+ public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+ if (rowCount == fieldStats.nullCount()) {
return false;
}
- return literal.compareValueTo(stats.minValue()) >= 0
- && literal.compareValueTo(stats.maxValue()) <= 0;
+ return compareLiteral(type, literal, fieldStats.minValue()) >= 0
+ && compareLiteral(type, literal, fieldStats.maxValue()) <= 0;
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(NotEqual.INSTANCE, index, literal));
+ public Optional<LeafFunction> negate() {
+ return Optional.of(NotEqual.INSTANCE);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
index 9527d3b..e93857e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
@@ -19,33 +19,34 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval greater or equal. */
-public class GreaterOrEqual implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval greater or equal. */
+public class GreaterOrEqual extends LeafBinaryFunction {
public static final GreaterOrEqual INSTANCE = new GreaterOrEqual();
private GreaterOrEqual() {}
@Override
- public boolean test(Object[] values, int index, Literal literal) {
- Object field = values[index];
- return field != null && literal.compareValueTo(field) <= 0;
+ public boolean test(LogicalType type, Object field, Object literal) {
+ return field != null && compareLiteral(type, literal, field) <= 0;
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- FieldStats stats = fieldStats[index];
- if (rowCount == stats.nullCount()) {
+ public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+ if (rowCount == fieldStats.nullCount()) {
return false;
}
- return literal.compareValueTo(stats.maxValue()) <= 0;
+ return compareLiteral(type, literal, fieldStats.maxValue()) <= 0;
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(LessThan.INSTANCE, index, literal));
+ public Optional<LeafFunction> negate() {
+ return Optional.of(LessThan.INSTANCE);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
index d097f88..3e771ae 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
@@ -19,33 +19,34 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval greater. */
-public class GreaterThan implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafFunction} to eval greater. */
+public class GreaterThan extends LeafBinaryFunction {
public static final GreaterThan INSTANCE = new GreaterThan();
private GreaterThan() {}
@Override
- public boolean test(Object[] values, int index, Literal literal) {
- Object field = values[index];
- return field != null && literal.compareValueTo(field) < 0;
+ public boolean test(LogicalType type, Object field, Object literal) {
+ return field != null && compareLiteral(type, literal, field) < 0;
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- FieldStats stats = fieldStats[index];
- if (rowCount == stats.nullCount()) {
+ public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+ if (rowCount == fieldStats.nullCount()) {
return false;
}
- return literal.compareValueTo(stats.maxValue()) < 0;
+ return compareLiteral(type, literal, fieldStats.maxValue()) < 0;
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(LessOrEqual.INSTANCE, index, literal));
+ public Optional<LeafFunction> negate() {
+ return Optional.of(LessOrEqual.INSTANCE);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
index e4e19c5..093ad66 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -19,28 +19,29 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval is not null. */
-public class IsNotNull implements LeafPredicate.Function {
+/** A {@link LeafBinaryFunction} to eval is not null. */
+public class IsNotNull extends LeafUnaryFunction {
public static final IsNotNull INSTANCE = new IsNotNull();
private IsNotNull() {}
@Override
- public boolean test(Object[] values, int index, Literal literal) {
- return values[index] != null;
+ public boolean test(LogicalType type, Object field) {
+ return field != null;
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- return fieldStats[index].nullCount() < rowCount;
+ public boolean test(LogicalType type, long rowCount, FieldStats fieldStats) {
+ return fieldStats.nullCount() < rowCount;
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(IsNull.INSTANCE, index, literal));
+ public Optional<LeafFunction> negate() {
+ return Optional.of(IsNull.INSTANCE);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
index a227972..5aa4842 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -19,28 +19,29 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval is null. */
-public class IsNull implements LeafPredicate.Function {
+/** A {@link LeafBinaryFunction} to eval is null. */
+public class IsNull extends LeafUnaryFunction {
public static final IsNull INSTANCE = new IsNull();
private IsNull() {}
@Override
- public boolean test(Object[] values, int index, Literal literal) {
- return values[index] == null;
+ public boolean test(LogicalType type, Object field) {
+ return field == null;
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- return fieldStats[index].nullCount() > 0;
+ public boolean test(LogicalType type, long rowCount, FieldStats fieldStats) {
+ return fieldStats.nullCount() > 0;
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(IsNotNull.INSTANCE, index, literal));
+ public Optional<LeafFunction> negate() {
+ return Optional.of(IsNotNull.INSTANCE);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
similarity index 55%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
index e4e19c5..0b91cb7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
@@ -19,28 +19,28 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
-import java.util.Optional;
+import java.util.List;
-/** A {@link LeafPredicate.Function} to eval is not null. */
-public class IsNotNull implements LeafPredicate.Function {
+/** Function to test a field with a literal. */
+public abstract class LeafBinaryFunction implements LeafFunction {
- public static final IsNotNull INSTANCE = new IsNotNull();
+ private static final long serialVersionUID = 1L;
- private IsNotNull() {}
+ public abstract boolean test(LogicalType type, Object field, Object literal);
- @Override
- public boolean test(Object[] values, int index, Literal literal) {
- return values[index] != null;
- }
+ public abstract boolean test(
+ LogicalType type, long rowCount, FieldStats fieldStats, Object literal);
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- return fieldStats[index].nullCount() < rowCount;
+ public boolean test(LogicalType type, Object field, List<Object> literals) {
+ return test(type, field, literals.get(0));
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(IsNull.INSTANCE, index, literal));
+ public boolean test(
+ LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals) {
+ return test(type, rowCount, fieldStats, literals.get(0));
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
similarity index 59%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
index a227972..97bfceb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
@@ -19,28 +19,18 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
+import java.io.Serializable;
+import java.util.List;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval is null. */
-public class IsNull implements LeafPredicate.Function {
+/** Function to test a field with literals. */
+public interface LeafFunction extends Serializable {
- public static final IsNull INSTANCE = new IsNull();
+ boolean test(LogicalType type, Object field, List<Object> literals);
- private IsNull() {}
+ boolean test(LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals);
- @Override
- public boolean test(Object[] values, int index, Literal literal) {
- return values[index] == null;
- }
-
- @Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- return fieldStats[index].nullCount() > 0;
- }
-
- @Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(IsNotNull.INSTANCE, index, literal));
- }
+ Optional<LeafFunction> negate();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 244058e..2e8d6c8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -18,70 +18,107 @@
package org.apache.flink.table.store.file.predicate;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
-import java.io.Serializable;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
-/** Leaf node of a {@link Predicate} tree. Compares a field in the row with an {@link Literal}. */
+/** Leaf node of a {@link Predicate} tree. Compares a field in the row with literals. */
public class LeafPredicate implements Predicate {
- private final Function function;
+ private static final long serialVersionUID = 1L;
+
+ private final LeafFunction function;
+ private final LogicalType type;
private final int index;
- private final Literal literal;
- public LeafPredicate(Function function, int index, Literal literal) {
+ private transient List<Object> literals;
+
+ public LeafPredicate(
+ LeafFunction function, LogicalType type, int index, List<Object> literals) {
this.function = function;
+ this.type = type;
this.index = index;
- this.literal = literal;
+ this.literals = literals;
}
- public Function function() {
+ public LeafFunction function() {
return function;
}
+ @Nullable
+ public LogicalType type() {
+ return type;
+ }
+
public int index() {
return index;
}
- public Literal literal() {
- return literal;
+ public List<Object> literals() {
+ return literals;
}
@Override
public boolean test(Object[] values) {
- return function.test(values, index, literal);
+ return function.test(type, values[index], literals);
}
@Override
public boolean test(long rowCount, FieldStats[] fieldStats) {
- return function.test(rowCount, fieldStats, index, literal);
+ return function.test(type, rowCount, fieldStats[index], literals);
}
@Override
public Optional<Predicate> negate() {
- return function.negate(index, literal);
+ return function.negate().map(negate -> new LeafPredicate(negate, type, index, literals));
}
@Override
public boolean equals(Object o) {
- if (!(o instanceof LeafPredicate)) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
}
LeafPredicate that = (LeafPredicate) o;
- return Objects.equals(function, that.function)
- && index == that.index
- && Objects.equals(literal, that.literal);
+ return index == that.index
+ && Objects.equals(function, that.function)
+ && Objects.equals(type, that.type)
+ && Objects.equals(literals, that.literals);
}
- /** Function to compare a field in the row with an {@link Literal}. */
- public interface Function extends Serializable {
+ @Override
+ public int hashCode() {
+ return Objects.hash(function, type, index, literals);
+ }
- boolean test(Object[] values, int index, Literal literal);
+ private ListSerializer<Object> objectsSerializer() {
+ return new ListSerializer<>(
+ NullableSerializer.wrapIfNullIsNotSupported(
+ InternalSerializers.create(type), false));
+ }
- boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal);
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ objectsSerializer().serialize(literals, new DataOutputViewStreamWrapper(out));
+ }
- Optional<Predicate> negate(int index, Literal literal);
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ literals = objectsSerializer().deserialize(new DataInputViewStreamWrapper(in));
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
similarity index 59%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
index e4e19c5..aa414c6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
@@ -19,28 +19,27 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
-import java.util.Optional;
+import java.util.List;
-/** A {@link LeafPredicate.Function} to eval is not null. */
-public class IsNotNull implements LeafPredicate.Function {
+/** Function to test a field. */
+public abstract class LeafUnaryFunction implements LeafFunction {
- public static final IsNotNull INSTANCE = new IsNotNull();
+ private static final long serialVersionUID = 1L;
- private IsNotNull() {}
+ public abstract boolean test(LogicalType type, Object value);
- @Override
- public boolean test(Object[] values, int index, Literal literal) {
- return values[index] != null;
- }
+ public abstract boolean test(LogicalType type, long rowCount, FieldStats fieldStats);
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- return fieldStats[index].nullCount() < rowCount;
+ public boolean test(LogicalType type, Object value, List<Object> literals) {
+ return test(type, value);
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(IsNull.INSTANCE, index, literal));
+ public boolean test(
+ LogicalType type, long rowCount, FieldStats fieldStats, List<Object> literals) {
+ return test(type, rowCount, fieldStats);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
index 5b544ea..784fef3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
@@ -19,33 +19,34 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval less or equal. */
-public class LessOrEqual implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval less or equal. */
+public class LessOrEqual extends LeafBinaryFunction {
public static final LessOrEqual INSTANCE = new LessOrEqual();
private LessOrEqual() {}
@Override
- public boolean test(Object[] values, int index, Literal literal) {
- Object field = values[index];
- return field != null && literal.compareValueTo(field) >= 0;
+ public boolean test(LogicalType type, Object field, Object literal) {
+ return field != null && compareLiteral(type, literal, field) >= 0;
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- FieldStats stats = fieldStats[index];
- if (rowCount == stats.nullCount()) {
+ public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+ if (rowCount == fieldStats.nullCount()) {
return false;
}
- return literal.compareValueTo(stats.minValue()) >= 0;
+ return compareLiteral(type, literal, fieldStats.minValue()) >= 0;
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(GreaterThan.INSTANCE, index, literal));
+ public Optional<LeafFunction> negate() {
+ return Optional.of(GreaterThan.INSTANCE);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
index 7fc3f2c..5ef2175 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
@@ -19,33 +19,34 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval less. */
-public class LessThan implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval less or equal. */
+public class LessThan extends LeafBinaryFunction {
public static final LessThan INSTANCE = new LessThan();
private LessThan() {}
@Override
- public boolean test(Object[] values, int index, Literal literal) {
- Object field = values[index];
- return field != null && literal.compareValueTo(field) > 0;
+ public boolean test(LogicalType type, Object field, Object literal) {
+ return field != null && compareLiteral(type, literal, field) > 0;
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- FieldStats stats = fieldStats[index];
- if (rowCount == stats.nullCount()) {
+ public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+ if (rowCount == fieldStats.nullCount()) {
return false;
}
- return literal.compareValueTo(stats.minValue()) > 0;
+ return compareLiteral(type, literal, fieldStats.minValue()) > 0;
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(GreaterOrEqual.INSTANCE, index, literal));
+ public Optional<LeafFunction> negate() {
+ return Optional.of(GreaterOrEqual.INSTANCE);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
deleted file mode 100644
index 4cbdb09..0000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.predicate;
-
-import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
-import java.util.Objects;
-
-/** A serializable literal class. */
-public class Literal implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final BytePrimitiveArrayComparator BINARY_COMPARATOR =
- new BytePrimitiveArrayComparator(true);
-
- private final LogicalType type;
-
- private transient Object value;
-
- public Literal(LogicalType type, Object value) {
- this.type = type;
- this.value = value;
- }
-
- public LogicalType type() {
- return type;
- }
-
- public Object value() {
- return value;
- }
-
- public int compareValueTo(Object o) {
- if (value instanceof Comparable) {
- return ((Comparable<Object>) value).compareTo(o);
- } else if (value instanceof byte[]) {
- return BINARY_COMPARATOR.compare((byte[]) value, (byte[]) o);
- } else {
- throw new RuntimeException("Unsupported type: " + type);
- }
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- InternalSerializers.create(type).serialize(value, new DataOutputViewStreamWrapper(out));
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- value = InternalSerializers.create(type).deserialize(new DataInputViewStreamWrapper(in));
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof Literal)) {
- return false;
- }
- Literal literal = (Literal) o;
- return type.equals(literal.type) && value.equals(literal.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(type, value);
- }
-
- public static Literal fromJavaObject(LogicalType literalType, Object o) {
- if (o == null) {
- throw new UnsupportedOperationException("Null literals are currently unsupported");
- }
- switch (literalType.getTypeRoot()) {
- case BOOLEAN:
- return new Literal(literalType, o);
- case BIGINT:
- return new Literal(literalType, ((Number) o).longValue());
- case DOUBLE:
- return new Literal(literalType, ((Number) o).doubleValue());
- case TINYINT:
- return new Literal(literalType, ((Number) o).byteValue());
- case SMALLINT:
- return new Literal(literalType, ((Number) o).shortValue());
- case INTEGER:
- return new Literal(literalType, ((Number) o).intValue());
- case FLOAT:
- return new Literal(literalType, ((Number) o).floatValue());
- case VARCHAR:
- return new Literal(literalType, StringData.fromString(o.toString()));
- case DATE:
- // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
- // Which uses `java.util.Date()` internally to create the object and that uses the
- // TimeZone.getDefaultRef()
- // To get back the expected date we have to use the LocalDate which gets rid of the
- // TimeZone misery as it uses the year/month/day to generate the object
- LocalDate localDate;
- if (o instanceof Timestamp) {
- localDate = ((Timestamp) o).toLocalDateTime().toLocalDate();
- } else if (o instanceof Date) {
- localDate = ((Date) o).toLocalDate();
- } else if (o instanceof LocalDate) {
- localDate = (LocalDate) o;
- } else {
- throw new UnsupportedOperationException(
- "Unexpected date literal of class " + o.getClass().getName());
- }
- LocalDate epochDay =
- Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).toLocalDate();
- int numberOfDays = (int) ChronoUnit.DAYS.between(epochDay, localDate);
- return new Literal(literalType, numberOfDays);
- case DECIMAL:
- DecimalType decimalType = (DecimalType) literalType;
- int precision = decimalType.getPrecision();
- int scale = decimalType.getScale();
- return new Literal(
- literalType, DecimalData.fromBigDecimal((BigDecimal) o, precision, scale));
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- TimestampData timestampData;
- if (o instanceof Timestamp) {
- timestampData = TimestampData.fromTimestamp((Timestamp) o);
- } else if (o instanceof Instant) {
- timestampData = TimestampData.fromInstant((Instant) o);
- } else if (o instanceof LocalDateTime) {
- timestampData = TimestampData.fromLocalDateTime((LocalDateTime) o);
- } else {
- throw new UnsupportedOperationException("Unsupported object: " + o);
- }
- return new Literal(literalType, timestampData);
- default:
- throw new UnsupportedOperationException(
- "Unsupported predicate leaf type " + literalType.getTypeRoot().name());
- }
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
index f3e9066..7137951 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
@@ -19,34 +19,35 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/** A {@link LeafPredicate.Function} to eval not equal. */
-public class NotEqual implements LeafPredicate.Function {
+import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafBinaryFunction} to eval not equal. */
+public class NotEqual extends LeafBinaryFunction {
public static final NotEqual INSTANCE = new NotEqual();
private NotEqual() {}
@Override
- public boolean test(Object[] values, int index, Literal literal) {
- Object field = values[index];
- return field != null && literal.compareValueTo(field) != 0;
+ public boolean test(LogicalType type, Object field, Object literal) {
+ return field != null && compareLiteral(type, literal, field) != 0;
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal literal) {
- FieldStats stats = fieldStats[index];
- if (rowCount == stats.nullCount()) {
+ public boolean test(LogicalType type, long rowCount, FieldStats fieldStats, Object literal) {
+ if (rowCount == fieldStats.nullCount()) {
return false;
}
- return literal.compareValueTo(stats.minValue()) != 0
- || literal.compareValueTo(stats.maxValue()) != 0;
+ return compareLiteral(type, literal, fieldStats.minValue()) != 0
+ || compareLiteral(type, literal, fieldStats.maxValue()) != 0;
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
- return Optional.of(new LeafPredicate(Equal.INSTANCE, index, literal));
+ public Optional<LeafFunction> negate() {
+ return Optional.of(Equal.INSTANCE);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index 03285ee..620900e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -18,49 +18,98 @@
package org.apache.flink.table.store.file.predicate;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import static java.util.Collections.singletonList;
+
/** A utility class to create {@link Predicate} object for common filter conditions. */
public class PredicateBuilder {
- public static Predicate equal(int idx, Literal literal) {
- return new LeafPredicate(Equal.INSTANCE, idx, literal);
+ private final RowType rowType;
+
+ public PredicateBuilder(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ public Predicate equal(int idx, Object literal) {
+ return leaf(Equal.INSTANCE, idx, literal);
+ }
+
+ public Predicate notEqual(int idx, Object literal) {
+ return leaf(NotEqual.INSTANCE, idx, literal);
}
- public static Predicate notEqual(int idx, Literal literal) {
- return new LeafPredicate(NotEqual.INSTANCE, idx, literal);
+ public Predicate lessThan(int idx, Object literal) {
+ return leaf(LessThan.INSTANCE, idx, literal);
}
- public static Predicate lessThan(int idx, Literal literal) {
- return new LeafPredicate(LessThan.INSTANCE, idx, literal);
+ public Predicate lessOrEqual(int idx, Object literal) {
+ return leaf(LessOrEqual.INSTANCE, idx, literal);
}
- public static Predicate lessOrEqual(int idx, Literal literal) {
- return new LeafPredicate(LessOrEqual.INSTANCE, idx, literal);
+ public Predicate greaterThan(int idx, Object literal) {
+ return leaf(GreaterThan.INSTANCE, idx, literal);
}
- public static Predicate greaterThan(int idx, Literal literal) {
- return new LeafPredicate(GreaterThan.INSTANCE, idx, literal);
+ public Predicate greaterOrEqual(int idx, Object literal) {
+ return leaf(GreaterOrEqual.INSTANCE, idx, literal);
}
- public static Predicate greaterOrEqual(int idx, Literal literal) {
- return new LeafPredicate(GreaterOrEqual.INSTANCE, idx, literal);
+ public Predicate isNull(int idx) {
+ return leaf(IsNull.INSTANCE, idx);
}
- public static Predicate isNull(int idx) {
- return new LeafPredicate(IsNull.INSTANCE, idx, null);
+ public Predicate isNotNull(int idx) {
+ return leaf(IsNotNull.INSTANCE, idx);
}
- public static Predicate isNotNull(int idx) {
- return new LeafPredicate(IsNotNull.INSTANCE, idx, null);
+ public Predicate startsWith(int idx, Object patternLiteral) {
+ return leaf(StartsWith.INSTANCE, idx, patternLiteral);
}
- public static Predicate startsWith(int idx, Literal patternLiteral) {
- return new LeafPredicate(StartsWith.INSTANCE, idx, patternLiteral);
+ public Predicate leaf(LeafBinaryFunction function, int idx, Object literal) {
+ return new LeafPredicate(function, rowType.getTypeAt(idx), idx, singletonList(literal));
+ }
+
+ public Predicate leaf(LeafUnaryFunction function, int idx) {
+ return new LeafPredicate(function, rowType.getTypeAt(idx), idx, Collections.emptyList());
+ }
+
+ public Predicate in(int idx, List<Object> literals) {
+ Preconditions.checkArgument(
+ literals.size() > 0,
+ "There must be at least 1 literal to construct an IN predicate");
+ return literals.stream()
+ .map(l -> equal(idx, l))
+ .reduce((a, b) -> new CompoundPredicate(Or.INSTANCE, Arrays.asList(a, b)))
+ .get();
+ }
+
+ public Predicate between(int idx, Object includedLowerBound, Object includedUpperBound) {
+ return new CompoundPredicate(
+ And.INSTANCE,
+ Arrays.asList(
+ greaterOrEqual(idx, includedLowerBound),
+ lessOrEqual(idx, includedUpperBound)));
}
public static Predicate and(Predicate... predicates) {
@@ -71,6 +120,9 @@ public class PredicateBuilder {
Preconditions.checkArgument(
predicates.size() > 0,
"There must be at least 1 inner predicate to construct an AND predicate");
+ if (predicates.size() == 1) {
+ return predicates.get(0);
+ }
return predicates.stream()
.reduce((a, b) -> new CompoundPredicate(And.INSTANCE, Arrays.asList(a, b)))
.get();
@@ -89,25 +141,6 @@ public class PredicateBuilder {
.get();
}
- public static Predicate in(int idx, List<Literal> literals) {
- Preconditions.checkArgument(
- literals.size() > 0,
- "There must be at least 1 literal to construct an IN predicate");
- return literals.stream()
- .map(l -> equal(idx, l))
- .reduce((a, b) -> new CompoundPredicate(Or.INSTANCE, Arrays.asList(a, b)))
- .get();
- }
-
- public static Predicate between(
- int idx, Literal includedLowerBound, Literal includedUpperBound) {
- return new CompoundPredicate(
- And.INSTANCE,
- Arrays.asList(
- greaterOrEqual(idx, includedLowerBound),
- lessOrEqual(idx, includedUpperBound)));
- }
-
public static List<Predicate> splitAnd(Predicate predicate) {
List<Predicate> result = new ArrayList<>();
splitAnd(predicate, result);
@@ -124,4 +157,69 @@ public class PredicateBuilder {
result.add(predicate);
}
}
+
+ public static Object convertJavaObject(LogicalType literalType, Object o) {
+ if (o == null) {
+ throw new UnsupportedOperationException("Null literals are currently unsupported");
+ }
+ switch (literalType.getTypeRoot()) {
+ case BOOLEAN:
+ return o;
+ case BIGINT:
+ return ((Number) o).longValue();
+ case DOUBLE:
+ return ((Number) o).doubleValue();
+ case TINYINT:
+ return ((Number) o).byteValue();
+ case SMALLINT:
+ return ((Number) o).shortValue();
+ case INTEGER:
+ return ((Number) o).intValue();
+ case FLOAT:
+ return ((Number) o).floatValue();
+ case VARCHAR:
+ return StringData.fromString(o.toString());
+ case DATE:
+ // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
+ // Which uses `java.util.Date()` internally to create the object and that uses the
+ // TimeZone.getDefaultRef()
+ // To get back the expected date we have to use the LocalDate which gets rid of the
+ // TimeZone misery as it uses the year/month/day to generate the object
+ LocalDate localDate;
+ if (o instanceof Timestamp) {
+ localDate = ((Timestamp) o).toLocalDateTime().toLocalDate();
+ } else if (o instanceof Date) {
+ localDate = ((Date) o).toLocalDate();
+ } else if (o instanceof LocalDate) {
+ localDate = (LocalDate) o;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unexpected date literal of class " + o.getClass().getName());
+ }
+ LocalDate epochDay =
+ Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).toLocalDate();
+ return (int) ChronoUnit.DAYS.between(epochDay, localDate);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) literalType;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ return DecimalData.fromBigDecimal((BigDecimal) o, precision, scale);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ TimestampData timestampData;
+ if (o instanceof Timestamp) {
+ timestampData = TimestampData.fromTimestamp((Timestamp) o);
+ } else if (o instanceof Instant) {
+ timestampData = TimestampData.fromInstant((Instant) o);
+ } else if (o instanceof LocalDateTime) {
+ timestampData = TimestampData.fromLocalDateTime((LocalDateTime) o);
+ } else {
+ throw new UnsupportedOperationException("Unsupported object: " + o);
+ }
+ return timestampData;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported predicate leaf type " + literalType.getTypeRoot().name());
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index 377d0fa..2683561 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -34,7 +34,6 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
import javax.annotation.Nullable;
@@ -51,29 +50,19 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
/** Convert {@link Expression} to {@link Predicate}. */
public class PredicateConverter implements ExpressionVisitor<Predicate> {
- public static final PredicateConverter CONVERTER = new PredicateConverter();
+ private final PredicateBuilder builder;
- /** Accepts simple LIKE patterns like "abc%". */
- private static final Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%");
+ public PredicateConverter(RowType type) {
+ this(new PredicateBuilder(type));
+ }
- @Nullable
- public Predicate fromMap(Map<String, String> map, RowType rowType) {
- // TODO: It is somewhat misleading that an empty map creates a null predicate filter
- List<String> fieldNames = rowType.getFieldNames();
- Predicate predicate = null;
- for (Map.Entry<String, String> entry : map.entrySet()) {
- int idx = fieldNames.indexOf(entry.getKey());
- LogicalType type = rowType.getTypeAt(idx);
- Literal literal = new Literal(type, TypeUtils.castFromString(entry.getValue(), type));
- if (predicate == null) {
- predicate = PredicateBuilder.equal(idx, literal);
- } else {
- predicate = PredicateBuilder.and(predicate, PredicateBuilder.equal(idx, literal));
- }
- }
- return predicate;
+ public PredicateConverter(PredicateBuilder builder) {
+ this.builder = builder;
}
+ /** Accepts simple LIKE patterns like "abc%". */
+ private static final Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%");
+
@Override
public Predicate visit(CallExpression call) {
FunctionDefinition func = call.getFunctionDefinition();
@@ -84,31 +73,26 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
} else if (func == BuiltInFunctionDefinitions.OR) {
return PredicateBuilder.or(children.get(0).accept(this), children.get(1).accept(this));
} else if (func == BuiltInFunctionDefinitions.EQUALS) {
- return visitBiFunction(children, PredicateBuilder::equal, PredicateBuilder::equal);
+ return visitBiFunction(children, builder::equal, builder::equal);
} else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) {
- return visitBiFunction(
- children, PredicateBuilder::notEqual, PredicateBuilder::notEqual);
+ return visitBiFunction(children, builder::notEqual, builder::notEqual);
} else if (func == BuiltInFunctionDefinitions.GREATER_THAN) {
- return visitBiFunction(
- children, PredicateBuilder::greaterThan, PredicateBuilder::lessThan);
+ return visitBiFunction(children, builder::greaterThan, builder::lessThan);
} else if (func == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) {
- return visitBiFunction(
- children, PredicateBuilder::greaterOrEqual, PredicateBuilder::lessOrEqual);
+ return visitBiFunction(children, builder::greaterOrEqual, builder::lessOrEqual);
} else if (func == BuiltInFunctionDefinitions.LESS_THAN) {
- return visitBiFunction(
- children, PredicateBuilder::lessThan, PredicateBuilder::greaterThan);
+ return visitBiFunction(children, builder::lessThan, builder::greaterThan);
} else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) {
- return visitBiFunction(
- children, PredicateBuilder::lessOrEqual, PredicateBuilder::greaterOrEqual);
+ return visitBiFunction(children, builder::lessOrEqual, builder::greaterOrEqual);
} else if (func == BuiltInFunctionDefinitions.IS_NULL) {
return extractFieldReference(children.get(0))
.map(FieldReferenceExpression::getFieldIndex)
- .map(PredicateBuilder::isNull)
+ .map(builder::isNull)
.orElseThrow(UnsupportedExpression::new);
} else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
return extractFieldReference(children.get(0))
.map(FieldReferenceExpression::getFieldIndex)
- .map(PredicateBuilder::isNotNull)
+ .map(builder::isNotNull)
.orElseThrow(UnsupportedExpression::new);
} else if (func == BuiltInFunctionDefinitions.LIKE) {
FieldReferenceExpression fieldRefExpr =
@@ -120,14 +104,12 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
String sqlPattern =
extractLiteral(fieldRefExpr.getOutputDataType(), children.get(1))
.orElseThrow(UnsupportedExpression::new)
- .value()
.toString();
String escape =
children.size() <= 2
? null
: extractLiteral(fieldRefExpr.getOutputDataType(), children.get(2))
.orElseThrow(UnsupportedExpression::new)
- .value()
.toString();
String escapedSqlPattern = sqlPattern;
boolean allowQuick = false;
@@ -171,11 +153,9 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
if (allowQuick) {
Matcher beginMatcher = BEGIN_PATTERN.matcher(escapedSqlPattern);
if (beginMatcher.matches()) {
- return PredicateBuilder.startsWith(
+ return builder.startsWith(
fieldRefExpr.getFieldIndex(),
- new Literal(
- VarCharType.STRING_TYPE,
- BinaryStringData.fromString(beginMatcher.group(1))));
+ BinaryStringData.fromString(beginMatcher.group(1)));
}
}
}
@@ -188,10 +168,10 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
private Predicate visitBiFunction(
List<Expression> children,
- BiFunction<Integer, Literal, Predicate> visit1,
- BiFunction<Integer, Literal, Predicate> visit2) {
+ BiFunction<Integer, Object, Predicate> visit1,
+ BiFunction<Integer, Object, Predicate> visit2) {
Optional<FieldReferenceExpression> fieldRefExpr = extractFieldReference(children.get(0));
- Optional<Literal> literal;
+ Optional<Object> literal;
if (fieldRefExpr.isPresent()) {
literal = extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(1));
if (literal.isPresent()) {
@@ -217,12 +197,12 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
return Optional.empty();
}
- private Optional<Literal> extractLiteral(DataType expectedType, Expression expression) {
+ private Optional<Object> extractLiteral(DataType expectedType, Expression expression) {
LogicalType expectedLogicalType = expectedType.getLogicalType();
if (!supportsPredicate(expectedLogicalType)) {
return Optional.empty();
}
- Literal literal = null;
+ Object literal = null;
if (expression instanceof ValueLiteralExpression) {
ValueLiteralExpression valueExpression = (ValueLiteralExpression) expression;
DataType actualType = valueExpression.getOutputDataType();
@@ -233,14 +213,11 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
}
Object value = valueOpt.get();
if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) {
- literal =
- new Literal(
- expectedLogicalType,
- getConverter(expectedType).toInternalOrNull(value));
+ literal = getConverter(expectedType).toInternalOrNull(value);
} else if (supportsImplicitCast(actualLogicalType, expectedLogicalType)) {
try {
value = TypeUtils.castFromString(value.toString(), expectedLogicalType);
- literal = new Literal(expectedLogicalType, value);
+ literal = value;
} catch (Exception ignored) {
// ignore here, let #visit throw UnsupportedExpression
}
@@ -303,14 +280,33 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
* @param filter a resolved expression
* @return {@link Predicate} if no {@link UnsupportedExpression} thrown.
*/
- public static Optional<Predicate> convert(ResolvedExpression filter) {
+ public static Optional<Predicate> convert(RowType rowType, ResolvedExpression filter) {
try {
- return Optional.ofNullable(filter.accept(PredicateConverter.CONVERTER));
+ return Optional.ofNullable(filter.accept(new PredicateConverter(rowType)));
} catch (UnsupportedExpression e) {
return Optional.empty();
}
}
+ @Nullable
+ public static Predicate fromMap(Map<String, String> map, RowType rowType) {
+ // TODO: It is somewhat misleading that an empty map creates a null predicate filter
+ List<String> fieldNames = rowType.getFieldNames();
+ Predicate predicate = null;
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ int idx = fieldNames.indexOf(entry.getKey());
+ LogicalType type = rowType.getTypeAt(idx);
+ Object literal = TypeUtils.castFromString(entry.getValue(), type);
+ if (predicate == null) {
+ predicate = builder.equal(idx, literal);
+ } else {
+ predicate = PredicateBuilder.and(predicate, builder.equal(idx, literal));
+ }
+ }
+ return predicate;
+ }
+
/** Encounter an unsupported expression, the caller can choose to ignore this filter branch. */
public static class UnsupportedExpression extends RuntimeException {}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
index af03163..cf1b3db 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
@@ -20,39 +20,38 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
-/**
- * A {@link LeafPredicate.Function} to evaluate {@code filter like 'abc%' or filter like 'abc_'}.
- */
-public class StartsWith implements LeafPredicate.Function {
+/** A {@link LeafBinaryFunction} to evaluate {@code filter like 'abc%' or filter like 'abc_'}. */
+public class StartsWith extends LeafBinaryFunction {
public static final StartsWith INSTANCE = new StartsWith();
private StartsWith() {}
@Override
- public boolean test(Object[] values, int index, Literal patternLiteral) {
- BinaryStringData field = (BinaryStringData) values[index];
- return field != null && field.startsWith((BinaryStringData) patternLiteral.value());
+ public boolean test(LogicalType type, Object field, Object patternLiteral) {
+ BinaryStringData fieldString = (BinaryStringData) field;
+ return fieldString != null && fieldString.startsWith((BinaryStringData) patternLiteral);
}
@Override
- public boolean test(long rowCount, FieldStats[] fieldStats, int index, Literal patternLiteral) {
- FieldStats stats = fieldStats[index];
- if (rowCount == stats.nullCount()) {
+ public boolean test(
+ LogicalType type, long rowCount, FieldStats fieldStats, Object patternLiteral) {
+ if (rowCount == fieldStats.nullCount()) {
return false;
}
- BinaryStringData min = (BinaryStringData) stats.minValue();
- BinaryStringData max = (BinaryStringData) stats.maxValue();
- BinaryStringData pattern = (BinaryStringData) patternLiteral.value();
+ BinaryStringData min = (BinaryStringData) fieldStats.minValue();
+ BinaryStringData max = (BinaryStringData) fieldStats.maxValue();
+ BinaryStringData pattern = (BinaryStringData) patternLiteral;
return (min.startsWith(pattern) || min.compareTo(pattern) <= 0)
&& (max.startsWith(pattern) || max.compareTo(pattern) >= 0);
}
@Override
- public Optional<Predicate> negate(int index, Literal literal) {
+ public Optional<LeafFunction> negate() {
return Optional.empty();
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
index 9632d6d..90afb94 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
@@ -56,8 +56,7 @@ public class TableCompact {
}
public TableCompact withPartitions(Map<String, String> partitionSpec) {
- scan.withPartitionFilter(
- PredicateConverter.CONVERTER.fromMap(partitionSpec, partitionType));
+ scan.withPartitionFilter(PredicateConverter.fromMap(partitionSpec, partitionType));
return this;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 4a0e52f..e858470 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -146,7 +146,10 @@ public abstract class TableScan {
if (mapped >= 0) {
return Optional.of(
new LeafPredicate(
- leafPredicate.function(), mapped, leafPredicate.literal()));
+ leafPredicate.function(),
+ leafPredicate.type(),
+ mapped,
+ leafPredicate.literals()));
} else {
return Optional.empty();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
index 74f7eca..201eaab 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -115,7 +115,7 @@ public class KeyValueFileStoreScanTest {
KeyValueFileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
scan.withKeyFilter(
- PredicateBuilder.equal(0, new Literal(new IntType(false), wantedShopId)));
+ new PredicateBuilder(RowType.of(new IntType(false))).equal(0, wantedShopId));
Map<BinaryRowData, BinaryRowData> expected =
store.toKvMap(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
index dd19099..39cd4b0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.table.store.file.predicate;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -29,25 +32,30 @@ public class PredicateBuilderTest {
@Test
public void testSplitAnd() {
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ RowType.of(
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType()));
+
Predicate child1 =
- PredicateBuilder.or(
- PredicateBuilder.isNull(0),
- PredicateBuilder.isNull(1),
- PredicateBuilder.isNull(2));
+ PredicateBuilder.or(builder.isNull(0), builder.isNull(1), builder.isNull(2));
Predicate child2 =
- PredicateBuilder.and(
- PredicateBuilder.isNull(3),
- PredicateBuilder.isNull(4),
- PredicateBuilder.isNull(5));
- Predicate child3 = PredicateBuilder.isNull(6);
+ PredicateBuilder.and(builder.isNull(3), builder.isNull(4), builder.isNull(5));
+ Predicate child3 = builder.isNull(6);
assertThat(PredicateBuilder.splitAnd(PredicateBuilder.and(child1, child2, child3)))
.isEqualTo(
Arrays.asList(
child1,
- PredicateBuilder.isNull(3),
- PredicateBuilder.isNull(4),
- PredicateBuilder.isNull(5),
+ builder.isNull(3),
+ builder.isNull(4),
+ builder.isNull(5),
child3));
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
index c138d2b..8106249 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -39,7 +42,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link PredicateConverter}. */
public class PredicateConverterTest {
- private static final PredicateConverter CONVERTER = new PredicateConverter();
+ private static final PredicateBuilder BUILDER =
+ new PredicateBuilder(RowType.of(new BigIntType(), new DoubleType()));
+
+ private static final PredicateConverter CONVERTER = new PredicateConverter(BUILDER);
@MethodSource("provideResolvedExpression")
@ParameterizedTest
@@ -57,12 +63,12 @@ public class PredicateConverterTest {
FieldReferenceExpression longRefExpr =
new FieldReferenceExpression("long1", DataTypes.BIGINT(), 0, 0);
ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
- Literal longLit = new Literal(DataTypes.BIGINT().getLogicalType(), 10L);
+ long longLit = 10L;
FieldReferenceExpression doubleRefExpr =
new FieldReferenceExpression("double1", DataTypes.DOUBLE(), 0, 1);
ValueLiteralExpression floatLitExpr = new ValueLiteralExpression(3.14f);
- Literal doubleLit = new Literal(DataTypes.DOUBLE().getLogicalType(), 3.14d);
+ double doubleLit = 3.14d;
return Stream.of(
Arguments.of(longRefExpr, null),
@@ -72,50 +78,50 @@ public class PredicateConverterTest {
BuiltInFunctionDefinitions.IS_NULL,
Collections.singletonList(longRefExpr),
DataTypes.BOOLEAN()),
- PredicateBuilder.isNull(0)),
+ BUILDER.isNull(0)),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.IS_NOT_NULL,
Collections.singletonList(doubleRefExpr),
DataTypes.BOOLEAN()),
- PredicateBuilder.isNotNull(1)),
+ BUILDER.isNotNull(1)),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.EQUALS,
// test literal on left
Arrays.asList(intLitExpr, longRefExpr),
DataTypes.BOOLEAN()),
- PredicateBuilder.equal(0, longLit)),
+ BUILDER.equal(0, longLit)),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.NOT_EQUALS,
Arrays.asList(longRefExpr, intLitExpr),
DataTypes.BOOLEAN()),
- PredicateBuilder.notEqual(0, longLit)),
+ BUILDER.notEqual(0, longLit)),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.GREATER_THAN,
Arrays.asList(longRefExpr, intLitExpr),
DataTypes.BOOLEAN()),
- PredicateBuilder.greaterThan(0, longLit)),
+ BUILDER.greaterThan(0, longLit)),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
Arrays.asList(longRefExpr, intLitExpr),
DataTypes.BOOLEAN()),
- PredicateBuilder.greaterOrEqual(0, longLit)),
+ BUILDER.greaterOrEqual(0, longLit)),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.LESS_THAN,
Arrays.asList(longRefExpr, intLitExpr),
DataTypes.BOOLEAN()),
- PredicateBuilder.lessThan(0, longLit)),
+ BUILDER.lessThan(0, longLit)),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
Arrays.asList(longRefExpr, intLitExpr),
DataTypes.BOOLEAN()),
- PredicateBuilder.lessOrEqual(0, longLit)),
+ BUILDER.lessOrEqual(0, longLit)),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.AND,
@@ -130,8 +136,7 @@ public class PredicateConverterTest {
DataTypes.BOOLEAN())),
DataTypes.BOOLEAN()),
PredicateBuilder.and(
- PredicateBuilder.lessOrEqual(0, longLit),
- PredicateBuilder.equal(1, doubleLit))),
+ BUILDER.lessOrEqual(0, longLit), BUILDER.equal(1, doubleLit))),
Arguments.of(
CallExpression.permanent(
BuiltInFunctionDefinitions.OR,
@@ -146,7 +151,6 @@ public class PredicateConverterTest {
DataTypes.BOOLEAN())),
DataTypes.BOOLEAN()),
PredicateBuilder.or(
- PredicateBuilder.notEqual(0, longLit),
- PredicateBuilder.equal(1, doubleLit))));
+ BUILDER.notEqual(0, longLit), BUILDER.equal(1, doubleLit))));
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index 744127f..3d396ee 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -26,17 +26,9 @@ import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BinaryType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.CharType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
@@ -45,11 +37,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -58,7 +45,6 @@ import java.util.stream.Stream;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
-import static org.apache.flink.table.store.file.predicate.PredicateConverter.CONVERTER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -67,9 +53,10 @@ public class PredicateTest {
@Test
public void testEqual() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
CallExpression expression =
call(BuiltInFunctionDefinitions.EQUALS, field(0, DataTypes.INT()), literal(5));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -81,17 +68,15 @@ public class PredicateTest {
assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
.isEqualTo(false);
- assertThat(predicate.negate().orElse(null))
- .isEqualTo(
- PredicateBuilder.notEqual(
- 0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+ assertThat(predicate.negate().orElse(null)).isEqualTo(builder.notEqual(0, 5));
}
@Test
public void testNotEqual() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
CallExpression expression =
call(BuiltInFunctionDefinitions.NOT_EQUALS, field(0, DataTypes.INT()), literal(5));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -104,20 +89,18 @@ public class PredicateTest {
assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
.isEqualTo(false);
- assertThat(predicate.negate().orElse(null))
- .isEqualTo(
- PredicateBuilder.equal(
- 0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+ assertThat(predicate.negate().orElse(null)).isEqualTo(builder.equal(0, 5));
}
@Test
public void testGreater() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
CallExpression expression =
call(
BuiltInFunctionDefinitions.GREATER_THAN,
field(0, DataTypes.INT()),
literal(5));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -131,20 +114,18 @@ public class PredicateTest {
assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
.isEqualTo(false);
- assertThat(predicate.negate().orElse(null))
- .isEqualTo(
- PredicateBuilder.lessOrEqual(
- 0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+ assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessOrEqual(0, 5));
}
@Test
public void testGreaterOrEqual() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
CallExpression expression =
call(
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
field(0, DataTypes.INT()),
literal(5));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -158,17 +139,15 @@ public class PredicateTest {
assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
.isEqualTo(false);
- assertThat(predicate.negate().orElse(null))
- .isEqualTo(
- PredicateBuilder.lessThan(
- 0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+ assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessThan(0, 5));
}
@Test
public void testLess() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
CallExpression expression =
call(BuiltInFunctionDefinitions.LESS_THAN, field(0, DataTypes.INT()), literal(5));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
@@ -181,20 +160,18 @@ public class PredicateTest {
assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
.isEqualTo(false);
- assertThat(predicate.negate().orElse(null))
- .isEqualTo(
- PredicateBuilder.greaterOrEqual(
- 0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+ assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterOrEqual(0, 5));
}
@Test
public void testLessOrEqual() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
CallExpression expression =
call(
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
field(0, DataTypes.INT()),
literal(5));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
@@ -207,20 +184,18 @@ public class PredicateTest {
assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
.isEqualTo(false);
- assertThat(predicate.negate().orElse(null))
- .isEqualTo(
- PredicateBuilder.greaterThan(
- 0, new Literal(DataTypes.INT().getLogicalType(), 5)));
+ assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterThan(0, 5));
}
@Test
public void testIsNull() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
CallExpression expression =
call(
BuiltInFunctionDefinitions.IS_NULL,
field(0, DataTypes.INT()),
literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(true);
@@ -228,17 +203,18 @@ public class PredicateTest {
assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 1)})).isEqualTo(true);
- assertThat(predicate.negate().orElse(null)).isEqualTo(PredicateBuilder.isNotNull(0));
+ assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNotNull(0));
}
@Test
public void testIsNotNull() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
CallExpression expression =
call(
BuiltInFunctionDefinitions.IS_NOT_NULL,
field(0, DataTypes.INT()),
literal(null, DataTypes.INT()));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
@@ -248,11 +224,12 @@ public class PredicateTest {
assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, null, 3)}))
.isEqualTo(false);
- assertThat(predicate.negate().orElse(null)).isEqualTo(PredicateBuilder.isNull(0));
+ assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNull(0));
}
@Test
public void testAnd() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
CallExpression expression =
call(
BuiltInFunctionDefinitions.AND,
@@ -264,7 +241,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.EQUALS,
field(1, DataTypes.INT()),
literal(5)));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4, 5})).isEqualTo(false);
assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(false);
@@ -294,16 +271,12 @@ public class PredicateTest {
.isEqualTo(false);
assertThat(predicate.negate().orElse(null))
- .isEqualTo(
- PredicateBuilder.or(
- PredicateBuilder.notEqual(
- 0, new Literal(DataTypes.INT().getLogicalType(), 3)),
- PredicateBuilder.notEqual(
- 1, new Literal(DataTypes.INT().getLogicalType(), 5))));
+ .isEqualTo(PredicateBuilder.or(builder.notEqual(0, 3), builder.notEqual(1, 5)));
}
@Test
public void testOr() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
CallExpression expression =
call(
BuiltInFunctionDefinitions.OR,
@@ -315,7 +288,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.EQUALS,
field(1, DataTypes.INT()),
literal(5)));
- Predicate predicate = expression.accept(CONVERTER);
+ Predicate predicate = expression.accept(new PredicateConverter(builder));
assertThat(predicate.test(new Object[] {4, 6})).isEqualTo(false);
assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(true);
@@ -345,12 +318,7 @@ public class PredicateTest {
.isEqualTo(false);
assertThat(predicate.negate().orElse(null))
- .isEqualTo(
- PredicateBuilder.and(
- PredicateBuilder.notEqual(
- 0, new Literal(DataTypes.INT().getLogicalType(), 3)),
- PredicateBuilder.notEqual(
- 1, new Literal(DataTypes.INT().getLogicalType(), 5))));
+ .isEqualTo(PredicateBuilder.and(builder.notEqual(0, 3), builder.notEqual(1, 5)));
}
@MethodSource("provideLikeExpressions")
@@ -362,7 +330,8 @@ public class PredicateTest {
List<Long> rowCountList,
List<FieldStats[]> statsList,
List<Boolean> expectedForStats) {
- Predicate predicate = callExpression.accept(CONVERTER);
+ Predicate predicate =
+ callExpression.accept(new PredicateConverter(RowType.of(new VarCharType())));
IntStream.range(0, valuesList.size())
.forEach(
i ->
@@ -388,12 +357,17 @@ public class PredicateTest {
BuiltInFunctionDefinitions.SIMILAR,
field(1, DataTypes.INT()),
literal(5)));
- assertThatThrownBy(() -> expression.accept(CONVERTER))
+ assertThatThrownBy(
+ () ->
+ expression.accept(
+ new PredicateConverter(
+ RowType.of(new IntType(), new IntType()))))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
}
@Test
public void testUnsupportedStartsPatternForLike() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
// starts pattern with '_' as wildcard
assertThatThrownBy(
() ->
@@ -401,7 +375,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("abc_", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// starts pattern like 'abc%xyz' or 'abc_xyz'
@@ -411,7 +385,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("abc%xyz", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
() ->
@@ -419,7 +393,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("abc_xyz", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// starts pattern like 'abc%xyz' or 'abc_xyz' with '%' or '_' to escape
@@ -432,7 +406,7 @@ public class PredicateTest {
"=%abc=%%xyz=_",
STRING()), // matches "%abc%(?s:.*)xyz_"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
() ->
@@ -443,7 +417,7 @@ public class PredicateTest {
"abc=%%xyz",
STRING()), // matches "abc%(?s:.*)xyz"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
() ->
@@ -454,7 +428,7 @@ public class PredicateTest {
"abc=%_xyz",
STRING()), // matches "abc%.xyz"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
() ->
@@ -465,7 +439,7 @@ public class PredicateTest {
"abc=_%xyz",
STRING()), // matches "abc_(?s:.*)xyz"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
() ->
@@ -476,7 +450,7 @@ public class PredicateTest {
"abc=__xyz",
STRING()), // matches "abc_.xyz"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// starts pattern with wildcard '%' at the beginning to escape
@@ -487,12 +461,13 @@ public class PredicateTest {
field(0, STRING()),
literal("=%%", STRING()), // matches "%(?s:.*)"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
}
@Test
public void testUnsupportedEndsPatternForLike() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
// ends pattern with '%' as wildcard
assertThatThrownBy(
() ->
@@ -500,7 +475,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("%456", STRING())) // matches "(?s:.*)456"
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// ends pattern with '_' as wildcard
@@ -510,7 +485,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("_456", STRING())) // matches ".456"
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// ends pattern with '[]' as wildcard
@@ -520,7 +495,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("_[456]", STRING())) // matches ".[456]"
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
() ->
@@ -530,7 +505,7 @@ public class PredicateTest {
literal(
"%[h-m]",
STRING())) // matches "(?s:.*)[h-m]"
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// ends pattern with '[^]' as wildcard
@@ -542,7 +517,7 @@ public class PredicateTest {
literal(
"%[^h-m]",
STRING())) // matches "(?s:.*)[^h-m]"
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
@@ -551,7 +526,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("_[^xyz]", STRING())) // matches ".[^xyz]"
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// ends pattern escape wildcard '%'
@@ -564,7 +539,7 @@ public class PredicateTest {
"%=%456",
STRING()), // matches "(?s:.*)%456"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
() ->
@@ -575,7 +550,7 @@ public class PredicateTest {
"%=_456",
STRING()), // matches "(?s:.*)_456"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// ends pattern escape wildcard '_'
@@ -586,12 +561,13 @@ public class PredicateTest {
field(0, STRING()),
literal("_=_456", STRING()), // matches "._456"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
}
@Test
public void testUnsupportedEqualsPatternForLike() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
// equals pattern
assertThatThrownBy(
() ->
@@ -599,7 +575,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("123456", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// equals pattern escape '%'
@@ -610,7 +586,7 @@ public class PredicateTest {
field(0, STRING()),
literal("12=%45", STRING()), // equals "12%45"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// equals pattern escape '_'
@@ -621,12 +597,13 @@ public class PredicateTest {
field(0, STRING()),
literal("12=_45", STRING()), // equals "12_45"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
}
@Test
public void testUnsupportedMiddlePatternForLike() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
// middle pattern with '%' as wildcard
assertThatThrownBy(
() ->
@@ -634,7 +611,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("%345%", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// middle pattern with '_' as wildcard
@@ -644,7 +621,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("_345_", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// middle pattern with both '%' and '_' as wildcard
@@ -654,7 +631,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("_345%", STRING())) // matches ".345(?s:.*)"
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
assertThatThrownBy(
() ->
@@ -662,7 +639,7 @@ public class PredicateTest {
BuiltInFunctionDefinitions.LIKE,
field(0, STRING()),
literal("%345_", STRING())) // matches "(?s:.*)345."
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// middle pattern with '[]' as wildcard
@@ -674,7 +651,7 @@ public class PredicateTest {
literal(
"%[a-c]_",
STRING())) // matches "(?s:.*)[a-c]."
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// middle pattern with '[^]' as wildcard
@@ -686,7 +663,7 @@ public class PredicateTest {
literal(
"%[^abc]_",
STRING())) // matches "(?s:.*)[^abc]."
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// middle pattern escape '%'
@@ -699,7 +676,7 @@ public class PredicateTest {
"%34=%5%",
STRING()), // matches "(?s:.*)34%5(.*)"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
// middle pattern escape '_'
@@ -712,56 +689,23 @@ public class PredicateTest {
"%34=_5%",
STRING()), // matches "(?s:.*)34_5(.*)"
literal("=", STRING()))
- .accept(CONVERTER))
+ .accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
}
@Test
public void testUnsupportedType() {
+ PredicateConverter converter = new PredicateConverter(RowType.of(new VarCharType()));
DataType structType = DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
CallExpression expression =
call(
BuiltInFunctionDefinitions.EQUALS,
field(0, structType),
literal(Row.of(1), structType));
- assertThatThrownBy(() -> expression.accept(CONVERTER))
+ assertThatThrownBy(() -> expression.accept(converter))
.isInstanceOf(PredicateConverter.UnsupportedExpression.class);
}
- @MethodSource("provideLiterals")
- @ParameterizedTest
- public void testSerDeLiteral(LogicalType type, Object data) throws Exception {
- Literal literal = new Literal(type, data);
- Object object = readObject(writeObject(literal));
- assertThat(object).isInstanceOf(Literal.class);
- assertThat(((Literal) object).type()).isEqualTo(literal.type());
- assertThat(((Literal) object).compareValueTo(literal.value())).isEqualTo(0);
- }
-
- public static Stream<Arguments> provideLiterals() {
- CharType charType = new CharType();
- VarCharType varCharType = VarCharType.STRING_TYPE;
- BooleanType booleanType = new BooleanType();
- BinaryType binaryType = new BinaryType();
- DecimalType decimalType = new DecimalType(2);
- SmallIntType smallIntType = new SmallIntType();
- BigIntType bigIntType = new BigIntType();
- DoubleType doubleType = new DoubleType();
- TimestampType timestampType = new TimestampType();
- return Stream.of(
- Arguments.of(charType, TypeUtils.castFromString("s", charType)),
- Arguments.of(varCharType, TypeUtils.castFromString("AbCd1Xy%@*", varCharType)),
- Arguments.of(booleanType, TypeUtils.castFromString("false", booleanType)),
- Arguments.of(binaryType, TypeUtils.castFromString("0101", binaryType)),
- Arguments.of(smallIntType, TypeUtils.castFromString("-2", smallIntType)),
- Arguments.of(decimalType, TypeUtils.castFromString("22.10", decimalType)),
- Arguments.of(bigIntType, TypeUtils.castFromString("-9999999999", bigIntType)),
- Arguments.of(doubleType, TypeUtils.castFromString("3.14159265357", doubleType)),
- Arguments.of(
- timestampType,
- TypeUtils.castFromString("2022-03-25 15:00:02", timestampType)));
- }
-
public static Stream<Arguments> provideLikeExpressions() {
CallExpression expr1 =
call(
@@ -900,22 +844,6 @@ public class PredicateTest {
expectedForStats4));
}
- private byte[] writeObject(Literal literal) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(literal);
- oos.close();
- return baos.toByteArray();
- }
-
- private Object readObject(byte[] bytes) throws IOException, ClassNotFoundException {
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- ObjectInputStream ois = new ObjectInputStream(bais);
- Object object = ois.readObject();
- ois.close();
- return object;
- }
-
private static FieldReferenceExpression field(int i, DataType type) {
return new FieldReferenceExpression("name", type, 0, i);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 0ee8e79..f15bf5f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -20,11 +20,9 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.Schema;
@@ -79,10 +77,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
public void testBatchFilter() throws Exception {
writeData();
FileStoreTable table = createFileStoreTable();
+ PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
- Predicate predicate =
- PredicateBuilder.equal(
- 2, Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+ Predicate predicate = builder.equal(2, 201L);
List<Split> splits = table.newScan().withFilter(predicate).plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
@@ -125,10 +122,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
public void testStreamingFilter() throws Exception {
writeData();
FileStoreTable table = createFileStoreTable();
+ PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
- Predicate predicate =
- PredicateBuilder.equal(
- 2, Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 101L));
+ Predicate predicate = builder.equal(2, 101L);
List<Split> splits =
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
TableRead read = table.newRead().withIncremental(true);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index b6fa2ab..2c1509f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -20,11 +20,9 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.Schema;
@@ -79,10 +77,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
public void testBatchFilter() throws Exception {
writeData();
FileStoreTable table = createFileStoreTable();
+ PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
- Predicate predicate =
- PredicateBuilder.equal(
- 2, Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+ Predicate predicate = builder.equal(2, 201L);
List<Split> splits = table.newScan().withFilter(predicate).plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
@@ -124,10 +121,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
public void testStreamingFilter() throws Exception {
writeData();
FileStoreTable table = createFileStoreTable();
+ PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
- Predicate predicate =
- PredicateBuilder.equal(
- 2, Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+ Predicate predicate = builder.equal(2, 201L);
List<Split> splits =
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
TableRead read = table.newRead().withIncremental(true);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 0846327..d248cc4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -20,11 +20,9 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.Schema;
@@ -79,14 +77,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
public void testBatchFilter() throws Exception {
writeData();
FileStoreTable table = createFileStoreTable();
+ PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
- Predicate predicate =
- PredicateBuilder.and(
- PredicateBuilder.equal(
- 2,
- Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L)),
- PredicateBuilder.equal(
- 1, Literal.fromJavaObject(DataTypes.INT().getLogicalType(), 21)));
+ Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), builder.equal(1, 21));
List<Split> splits = table.newScan().withFilter(predicate).plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
@@ -129,14 +122,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
public void testStreamingFilter() throws Exception {
writeData();
FileStoreTable table = createFileStoreTable();
+ PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
- Predicate predicate =
- PredicateBuilder.and(
- PredicateBuilder.equal(
- 2,
- Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L)),
- PredicateBuilder.equal(
- 1, Literal.fromJavaObject(DataTypes.INT().getLogicalType(), 21)));
+ Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L), builder.equal(1, 21));
List<Split> splits =
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
TableRead read = table.newRead().withIncremental(true);
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
index 9a090c2..4c4ef43 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
@@ -18,10 +18,10 @@
package org.apache.flink.table.store;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
@@ -35,6 +35,8 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
+
/** Converts {@link SearchArgument} to {@link Predicate} with best effort. */
public class SearchArgumentToPredicateConverter {
@@ -45,6 +47,7 @@ public class SearchArgumentToPredicateConverter {
private final List<PredicateLeaf> leaves;
private final List<String> columnNames;
private final List<LogicalType> columnTypes;
+ private final PredicateBuilder builder;
public SearchArgumentToPredicateConverter(
SearchArgument sarg, List<String> columnNames, List<LogicalType> columnTypes) {
@@ -52,6 +55,11 @@ public class SearchArgumentToPredicateConverter {
this.leaves = sarg.getLeaves();
this.columnNames = columnNames;
this.columnTypes = columnTypes;
+ this.builder =
+ new PredicateBuilder(
+ RowType.of(
+ columnTypes.toArray(new LogicalType[0]),
+ columnNames.toArray(new String[0])));
}
public Optional<Predicate> convert() {
@@ -98,35 +106,35 @@ public class SearchArgumentToPredicateConverter {
LogicalType columnType = columnTypes.get(idx);
switch (leaf.getOperator()) {
case EQUALS:
- return PredicateBuilder.equal(idx, toLiteral(columnType, leaf.getLiteral()));
+ return builder.equal(idx, toLiteral(columnType, leaf.getLiteral()));
case LESS_THAN:
- return PredicateBuilder.lessThan(idx, toLiteral(columnType, leaf.getLiteral()));
+ return builder.lessThan(idx, toLiteral(columnType, leaf.getLiteral()));
case LESS_THAN_EQUALS:
- return PredicateBuilder.lessOrEqual(idx, toLiteral(columnType, leaf.getLiteral()));
+ return builder.lessOrEqual(idx, toLiteral(columnType, leaf.getLiteral()));
case IN:
- return PredicateBuilder.in(
+ return builder.in(
idx,
leaf.getLiteralList().stream()
.map(o -> toLiteral(columnType, o))
.collect(Collectors.toList()));
case BETWEEN:
List<Object> literalList = leaf.getLiteralList();
- return PredicateBuilder.between(
+ return builder.between(
idx,
toLiteral(columnType, literalList.get(0)),
toLiteral(columnType, literalList.get(1)));
case IS_NULL:
- return PredicateBuilder.isNull(idx);
+ return builder.isNull(idx);
default:
throw new UnsupportedOperationException(
"Unsupported operator " + leaf.getOperator());
}
}
- private Literal toLiteral(LogicalType literalType, Object o) {
+ private Object toLiteral(LogicalType literalType, Object o) {
if (o instanceof HiveDecimalWritable) {
o = ((HiveDecimalWritable) o).getHiveDecimal().bigDecimalValue();
}
- return Literal.fromJavaObject(literalType, o);
+ return convertJavaObject(literalType, o);
}
}
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
index 54583ae..ca576e0 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
@@ -22,10 +22,10 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -94,7 +94,7 @@ public class SearchArgumentToPredicateConverterTest {
new SearchArgumentToPredicateConverter(
sarg, Collections.singletonList("a"), Collections.singletonList(flinkType));
- Predicate expected = PredicateBuilder.equal(0, new Literal(flinkType, flinkLiteral));
+ Predicate expected = new PredicateBuilder(RowType.of(flinkType)).equal(0, flinkLiteral);
Predicate actual = converter.convert().orElse(null);
assertThat(actual).isEqualTo(expected);
}
@@ -106,12 +106,17 @@ public class SearchArgumentToPredicateConverterTest {
DataTypes.BIGINT().getLogicalType(),
DataTypes.DOUBLE().getLogicalType());
private static final LogicalType BIGINT_TYPE = DataTypes.BIGINT().getLogicalType();
+ private static final PredicateBuilder BUILDER =
+ new PredicateBuilder(
+ RowType.of(
+ COLUMN_TYPES.toArray(new LogicalType[0]),
+ COLUMN_NAMES.toArray(new String[0])));
@Test
public void testEqual() {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
SearchArgument sarg = builder.equals("f_bigint", PredicateLeaf.Type.LONG, 100L).build();
- Predicate expected = PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 100L));
+ Predicate expected = BUILDER.equal(1, 100L);
assertExpected(sarg, expected);
}
@@ -120,7 +125,7 @@ public class SearchArgumentToPredicateConverterTest {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
SearchArgument sarg =
builder.startNot().equals("f_bigint", PredicateLeaf.Type.LONG, 100L).end().build();
- Predicate expected = PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 100L));
+ Predicate expected = BUILDER.notEqual(1, 100L);
assertExpected(sarg, expected);
}
@@ -128,7 +133,7 @@ public class SearchArgumentToPredicateConverterTest {
public void testLessThan() {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
SearchArgument sarg = builder.lessThan("f_bigint", PredicateLeaf.Type.LONG, 100L).build();
- Predicate expected = PredicateBuilder.lessThan(1, new Literal(BIGINT_TYPE, 100L));
+ Predicate expected = BUILDER.lessThan(1, 100L);
assertExpected(sarg, expected);
}
@@ -140,7 +145,7 @@ public class SearchArgumentToPredicateConverterTest {
.lessThan("f_bigint", PredicateLeaf.Type.LONG, 100L)
.end()
.build();
- Predicate expected = PredicateBuilder.greaterOrEqual(1, new Literal(BIGINT_TYPE, 100L));
+ Predicate expected = BUILDER.greaterOrEqual(1, 100L);
assertExpected(sarg, expected);
}
@@ -149,7 +154,7 @@ public class SearchArgumentToPredicateConverterTest {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
SearchArgument sarg =
builder.lessThanEquals("f_bigint", PredicateLeaf.Type.LONG, 100L).build();
- Predicate expected = PredicateBuilder.lessOrEqual(1, new Literal(BIGINT_TYPE, 100L));
+ Predicate expected = BUILDER.lessOrEqual(1, 100L);
assertExpected(sarg, expected);
}
@@ -161,7 +166,7 @@ public class SearchArgumentToPredicateConverterTest {
.lessThanEquals("f_bigint", PredicateLeaf.Type.LONG, 100L)
.end()
.build();
- Predicate expected = PredicateBuilder.greaterThan(1, new Literal(BIGINT_TYPE, 100L));
+ Predicate expected = BUILDER.greaterThan(1, 100L);
assertExpected(sarg, expected);
}
@@ -172,9 +177,7 @@ public class SearchArgumentToPredicateConverterTest {
builder.in("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L, 300L).build();
Predicate expected =
PredicateBuilder.or(
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 100L)),
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 200L)),
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 300L)));
+ BUILDER.equal(1, 100L), BUILDER.equal(1, 200L), BUILDER.equal(1, 300L));
assertExpected(sarg, expected);
}
@@ -188,9 +191,9 @@ public class SearchArgumentToPredicateConverterTest {
.build();
Predicate expected =
PredicateBuilder.and(
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 100L)),
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 200L)),
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 300L)));
+ BUILDER.notEqual(1, 100L),
+ BUILDER.notEqual(1, 200L),
+ BUILDER.notEqual(1, 300L));
assertExpected(sarg, expected);
}
@@ -200,9 +203,7 @@ public class SearchArgumentToPredicateConverterTest {
SearchArgument sarg =
builder.between("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L).build();
Predicate expected =
- PredicateBuilder.and(
- PredicateBuilder.greaterOrEqual(1, new Literal(BIGINT_TYPE, 100L)),
- PredicateBuilder.lessOrEqual(1, new Literal(BIGINT_TYPE, 200L)));
+ PredicateBuilder.and(BUILDER.greaterOrEqual(1, 100L), BUILDER.lessOrEqual(1, 200L));
assertExpected(sarg, expected);
}
@@ -215,9 +216,7 @@ public class SearchArgumentToPredicateConverterTest {
.end()
.build();
Predicate expected =
- PredicateBuilder.or(
- PredicateBuilder.lessThan(1, new Literal(BIGINT_TYPE, 100L)),
- PredicateBuilder.greaterThan(1, new Literal(BIGINT_TYPE, 200L)));
+ PredicateBuilder.or(BUILDER.lessThan(1, 100L), BUILDER.greaterThan(1, 200L));
assertExpected(sarg, expected);
}
@@ -225,7 +224,7 @@ public class SearchArgumentToPredicateConverterTest {
public void testIsNull() {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
SearchArgument sarg = builder.isNull("f_bigint", PredicateLeaf.Type.LONG).build();
- Predicate expected = PredicateBuilder.isNull(1);
+ Predicate expected = BUILDER.isNull(1);
assertExpected(sarg, expected);
}
@@ -234,7 +233,7 @@ public class SearchArgumentToPredicateConverterTest {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
SearchArgument sarg =
builder.startNot().isNull("f_bigint", PredicateLeaf.Type.LONG).end().build();
- Predicate expected = PredicateBuilder.isNotNull(1);
+ Predicate expected = BUILDER.isNotNull(1);
assertExpected(sarg, expected);
}
@@ -250,9 +249,7 @@ public class SearchArgumentToPredicateConverterTest {
.build();
Predicate expected =
PredicateBuilder.or(
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 100L)),
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 200L)),
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 300L)));
+ BUILDER.equal(1, 100L), BUILDER.equal(1, 200L), BUILDER.equal(1, 300L));
assertExpected(sarg, expected);
}
@@ -270,9 +267,9 @@ public class SearchArgumentToPredicateConverterTest {
.build();
Predicate expected =
PredicateBuilder.and(
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 100L)),
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 200L)),
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 300L)));
+ BUILDER.notEqual(1, 100L),
+ BUILDER.notEqual(1, 200L),
+ BUILDER.notEqual(1, 300L));
assertExpected(sarg, expected);
}
@@ -294,9 +291,9 @@ public class SearchArgumentToPredicateConverterTest {
.build();
Predicate expected =
PredicateBuilder.and(
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 100L)),
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 200L)),
- PredicateBuilder.notEqual(1, new Literal(BIGINT_TYPE, 300L)));
+ BUILDER.notEqual(1, 100L),
+ BUILDER.notEqual(1, 200L),
+ BUILDER.notEqual(1, 300L));
assertExpected(sarg, expected);
}
@@ -320,9 +317,7 @@ public class SearchArgumentToPredicateConverterTest {
.build();
Predicate expected =
PredicateBuilder.or(
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 100L)),
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 200L)),
- PredicateBuilder.equal(1, new Literal(BIGINT_TYPE, 300L)));
+ BUILDER.equal(1, 100L), BUILDER.equal(1, 200L), BUILDER.equal(1, 300L));
assertExpected(sarg, expected);
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
index ec077c2..6213ca3 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.spark;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.types.logical.LogicalType;
@@ -37,13 +36,17 @@ import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.Or;
import org.apache.spark.sql.sources.StringStartsWith;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
+
/** Conversion from {@link Filter} to {@link Predicate}. */
public class SparkFilterConverter {
private final RowType rowType;
+ private final PredicateBuilder builder;
public SparkFilterConverter(RowType rowType) {
this.rowType = rowType;
+ this.builder = new PredicateBuilder(rowType);
}
public Predicate convert(Filter filter) {
@@ -51,32 +54,32 @@ public class SparkFilterConverter {
EqualTo eq = (EqualTo) filter;
// TODO deal with isNaN
int index = fieldIndex(eq.attribute());
- Literal literal = convertLiteral(index, eq.value());
- return PredicateBuilder.equal(index, literal);
+ Object literal = convertLiteral(index, eq.value());
+ return builder.equal(index, literal);
} else if (filter instanceof GreaterThan) {
GreaterThan gt = (GreaterThan) filter;
int index = fieldIndex(gt.attribute());
- Literal literal = convertLiteral(index, gt.value());
- return PredicateBuilder.greaterThan(index, literal);
+ Object literal = convertLiteral(index, gt.value());
+ return builder.greaterThan(index, literal);
} else if (filter instanceof GreaterThanOrEqual) {
GreaterThanOrEqual gt = (GreaterThanOrEqual) filter;
int index = fieldIndex(gt.attribute());
- Literal literal = convertLiteral(index, gt.value());
- return PredicateBuilder.greaterOrEqual(index, literal);
+ Object literal = convertLiteral(index, gt.value());
+ return builder.greaterOrEqual(index, literal);
} else if (filter instanceof LessThan) {
LessThan lt = (LessThan) filter;
int index = fieldIndex(lt.attribute());
- Literal literal = convertLiteral(index, lt.value());
- return PredicateBuilder.lessThan(index, literal);
+ Object literal = convertLiteral(index, lt.value());
+ return builder.lessThan(index, literal);
} else if (filter instanceof LessThanOrEqual) {
LessThanOrEqual lt = (LessThanOrEqual) filter;
int index = fieldIndex(lt.attribute());
- Literal literal = convertLiteral(index, lt.value());
- return PredicateBuilder.lessOrEqual(index, literal);
+ Object literal = convertLiteral(index, lt.value());
+ return builder.lessOrEqual(index, literal);
} else if (filter instanceof IsNull) {
- return PredicateBuilder.isNull(fieldIndex(((IsNull) filter).attribute()));
+ return builder.isNull(fieldIndex(((IsNull) filter).attribute()));
} else if (filter instanceof IsNotNull) {
- return PredicateBuilder.isNotNull(fieldIndex(((IsNotNull) filter).attribute()));
+ return builder.isNotNull(fieldIndex(((IsNotNull) filter).attribute()));
} else if (filter instanceof And) {
And and = (And) filter;
return PredicateBuilder.and(convert(and.left()), convert(and.right()));
@@ -89,8 +92,8 @@ public class SparkFilterConverter {
} else if (filter instanceof StringStartsWith) {
StringStartsWith startsWith = (StringStartsWith) filter;
int index = fieldIndex(startsWith.attribute());
- Literal literal = convertLiteral(index, startsWith.value());
- return PredicateBuilder.startsWith(index, literal);
+ Object literal = convertLiteral(index, startsWith.value());
+ return builder.startsWith(index, literal);
}
// TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
@@ -106,8 +109,8 @@ public class SparkFilterConverter {
return index;
}
- private Literal convertLiteral(int index, Object value) {
+ private Object convertLiteral(int index, Object value) {
LogicalType type = rowType.getTypeAt(index);
- return Literal.fromJavaObject(type, value);
+ return convertJavaObject(type, value);
}
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
index dead152..dcc10ed 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.types.logical.DateType;
@@ -51,55 +50,55 @@ public class SparkFilterConverterTest {
@Test
public void testAll() {
- SparkFilterConverter converter =
- new SparkFilterConverter(
- new RowType(
- Collections.singletonList(
- new RowType.RowField("id", new IntType()))));
+ RowType rowType =
+ new RowType(Collections.singletonList(new RowType.RowField("id", new IntType())));
+ SparkFilterConverter converter = new SparkFilterConverter(rowType);
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+
String field = "id";
IsNull isNull = IsNull.apply(field);
- Predicate expectedIsNull = PredicateBuilder.isNull(0);
+ Predicate expectedIsNull = builder.isNull(0);
Predicate actualIsNull = converter.convert(isNull);
assertThat(actualIsNull).isEqualTo(expectedIsNull);
IsNotNull isNotNull = IsNotNull.apply(field);
- Predicate expectedIsNotNull = PredicateBuilder.isNotNull(0);
+ Predicate expectedIsNotNull = builder.isNotNull(0);
Predicate actualIsNotNull = converter.convert(isNotNull);
assertThat(actualIsNotNull).isEqualTo(expectedIsNotNull);
LessThan lt = LessThan.apply(field, 1);
- Predicate expectedLt = PredicateBuilder.lessThan(0, new Literal(new IntType(), 1));
+ Predicate expectedLt = builder.lessThan(0, 1);
Predicate actualLt = converter.convert(lt);
assertThat(actualLt).isEqualTo(expectedLt);
LessThanOrEqual ltEq = LessThanOrEqual.apply(field, 1);
- Predicate expectedLtEq = PredicateBuilder.lessOrEqual(0, new Literal(new IntType(), 1));
+ Predicate expectedLtEq = builder.lessOrEqual(0, 1);
Predicate actualLtEq = converter.convert(ltEq);
assertThat(actualLtEq).isEqualTo(expectedLtEq);
GreaterThan gt = GreaterThan.apply(field, 1);
- Predicate expectedGt = PredicateBuilder.greaterThan(0, new Literal(new IntType(), 1));
+ Predicate expectedGt = builder.greaterThan(0, 1);
Predicate actualGt = converter.convert(gt);
assertThat(actualGt).isEqualTo(expectedGt);
GreaterThanOrEqual gtEq = GreaterThanOrEqual.apply(field, 1);
- Predicate expectedGtEq = PredicateBuilder.greaterOrEqual(0, new Literal(new IntType(), 1));
+ Predicate expectedGtEq = builder.greaterOrEqual(0, 1);
Predicate actualGtEq = converter.convert(gtEq);
assertThat(actualGtEq).isEqualTo(expectedGtEq);
EqualTo eq = EqualTo.apply(field, 1);
- Predicate expectedEq = PredicateBuilder.equal(0, new Literal(new IntType(), 1));
+ Predicate expectedEq = builder.equal(0, 1);
Predicate actualEq = converter.convert(eq);
assertThat(actualEq).isEqualTo(expectedEq);
}
@Test
public void testTimestamp() {
- SparkFilterConverter converter =
- new SparkFilterConverter(
- new RowType(
- Collections.singletonList(
- new RowType.RowField("x", new TimestampType()))));
+ RowType rowType =
+ new RowType(
+ Collections.singletonList(new RowType.RowField("x", new TimestampType())));
+ SparkFilterConverter converter = new SparkFilterConverter(rowType);
+ PredicateBuilder builder = new PredicateBuilder(rowType);
Timestamp timestamp = Timestamp.valueOf("2018-10-18 00:00:57.907");
LocalDateTime localDateTime = LocalDateTime.parse("2018-10-18T00:00:57.907");
@@ -108,11 +107,7 @@ public class SparkFilterConverterTest {
Predicate instantExpression = converter.convert(GreaterThan.apply("x", instant));
Predicate timestampExpression = converter.convert(GreaterThan.apply("x", timestamp));
Predicate rawExpression =
- PredicateBuilder.greaterThan(
- 0,
- new Literal(
- new TimestampType(),
- TimestampData.fromLocalDateTime(localDateTime)));
+ builder.greaterThan(0, TimestampData.fromLocalDateTime(localDateTime));
assertThat(timestampExpression).isEqualTo(rawExpression);
assertThat(instantExpression).isEqualTo(rawExpression);
@@ -120,11 +115,10 @@ public class SparkFilterConverterTest {
@Test
public void testDate() {
- SparkFilterConverter converter =
- new SparkFilterConverter(
- new RowType(
- Collections.singletonList(
- new RowType.RowField("x", new DateType()))));
+ RowType rowType =
+ new RowType(Collections.singletonList(new RowType.RowField("x", new DateType())));
+ SparkFilterConverter converter = new SparkFilterConverter(rowType);
+ PredicateBuilder builder = new PredicateBuilder(rowType);
LocalDate localDate = LocalDate.parse("2018-10-18");
Date date = Date.valueOf(localDate);
@@ -132,8 +126,7 @@ public class SparkFilterConverterTest {
Predicate localDateExpression = converter.convert(GreaterThan.apply("x", localDate));
Predicate dateExpression = converter.convert(GreaterThan.apply("x", date));
- Predicate rawExpression =
- PredicateBuilder.greaterThan(0, new Literal(new DateType(), epochDay));
+ Predicate rawExpression = builder.greaterThan(0, epochDay);
assertThat(dateExpression).isEqualTo(rawExpression);
assertThat(localDateExpression).isEqualTo(rawExpression);