You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/14 02:55:12 UTC
[flink-table-store] branch master updated: [FLINK-28285] Push filter into orc reader
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 088955f7 [FLINK-28285] Push filter into orc reader
088955f7 is described below
commit 088955f72bddf8134934dfd21152e1933c0cab77
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jul 14 10:55:08 2022 +0800
[FLINK-28285] Push filter into orc reader
This closes #212
---
.../flink/table/store/file/predicate/And.java | 0
.../table/store/file/predicate/CompareUtils.java | 0
.../store/file/predicate/CompoundPredicate.java | 0
.../flink/table/store/file/predicate/Equal.java | 0
.../table/store/file/predicate/GreaterOrEqual.java | 0
.../table/store/file/predicate/GreaterThan.java | 0
.../flink/table/store/file/predicate/In.java | 0
.../table/store/file/predicate/IsNotNull.java | 0
.../flink/table/store/file/predicate/IsNull.java | 0
.../table/store/file/predicate/LeafFunction.java | 0
.../table/store/file/predicate/LeafPredicate.java | 33 ++-
.../store/file/predicate/LeafUnaryFunction.java | 0
.../table/store/file/predicate/LessOrEqual.java | 0
.../flink/table/store/file/predicate/LessThan.java | 0
.../flink/table/store/file/predicate/NotEqual.java | 0
.../flink/table/store/file/predicate/NotIn.java | 0
.../predicate/NullFalseLeafBinaryFunction.java | 0
.../flink/table/store/file/predicate/Or.java | 0
.../table/store/file/predicate/Predicate.java | 0
.../store/file/predicate/PredicateBuilder.java | 49 +++-
.../store/file/predicate/PredicateConverter.java | 0
.../table/store/file/predicate/StartsWith.java | 0
.../flink/table/store/format/FileFormat.java | 4 +-
.../store/connector/source/FileStoreSource.java | 3 +
.../source/TestChangelogDataReadWrite.java | 6 +
flink-table-store-core/pom.xml | 42 ++++
.../table/store/file/data/DataFileReader.java | 10 +-
.../file/operation/AppendOnlyFileStoreRead.java | 13 +-
.../table/store/file/operation/FileStoreRead.java | 3 +
.../file/operation/KeyValueFileStoreRead.java | 14 +-
.../store/table/AppendOnlyFileStoreTable.java | 6 +
.../table/ChangelogValueCountFileStoreTable.java | 6 +
.../table/ChangelogWithKeyFileStoreTable.java | 32 ++-
.../flink/table/store/table/source/TableRead.java | 3 +-
.../file/format/FileStatsExtractingAvroFormat.java | 4 +-
.../store/file/format/FlushingFileFormat.java | 4 +-
.../table/store/file/predicate/PredicateTest.java | 111 ++++++++++
.../store/table/AppendOnlyFileStoreTableTest.java | 1 -
.../ChangelogValueCountFileStoreTableTest.java | 1 -
.../table/ChangelogWithKeyFileStoreTableTest.java | 42 +++-
.../table/store/table/FileStoreTableTestBase.java | 29 +++
.../table/store/table/WritePreemptMemoryTest.java | 5 +-
.../table/store/format/avro/AvroFileFormat.java | 4 +-
.../table/store/format/orc/OrcFileFormat.java | 9 +-
.../table/store/format/orc/OrcFilterConverter.java | 246 +++++++++++++++++++++
.../store/format/orc/OrcFilterConverterTest.java | 80 +++++++
.../table/store/mapred/TableStoreInputFormat.java | 7 +-
.../SearchArgumentToPredicateConverterTest.java | 40 +++-
.../table/store/spark/SparkReaderFactory.java | 19 +-
.../apache/flink/table/store/spark/SparkScan.java | 2 +-
.../store/spark/SparkFilterConverterTest.java | 10 +
51 files changed, 785 insertions(+), 53 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/And.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/And.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompareUtils.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
similarity index 80%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 17a52f76..5e48776a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -26,8 +26,6 @@ import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -42,15 +40,21 @@ public class LeafPredicate implements Predicate {
private final LeafFunction function;
private final LogicalType type;
- private final int index;
+ private final int fieldIndex;
+ private final String fieldName;
private transient List<Object> literals;
public LeafPredicate(
- LeafFunction function, LogicalType type, int index, List<Object> literals) {
+ LeafFunction function,
+ LogicalType type,
+ int fieldIndex,
+ String fieldName,
+ List<Object> literals) {
this.function = function;
this.type = type;
- this.index = index;
+ this.fieldIndex = fieldIndex;
+ this.fieldName = fieldName;
this.literals = literals;
}
@@ -58,13 +62,16 @@ public class LeafPredicate implements Predicate {
return function;
}
- @Nullable
public LogicalType type() {
return type;
}
public int index() {
- return index;
+ return fieldIndex;
+ }
+
+ public String fieldName() {
+ return fieldName;
}
public List<Object> literals() {
@@ -73,17 +80,18 @@ public class LeafPredicate implements Predicate {
@Override
public boolean test(Object[] values) {
- return function.test(type, values[index], literals);
+ return function.test(type, values[fieldIndex], literals);
}
@Override
public boolean test(long rowCount, FieldStats[] fieldStats) {
- return function.test(type, rowCount, fieldStats[index], literals);
+ return function.test(type, rowCount, fieldStats[fieldIndex], literals);
}
@Override
public Optional<Predicate> negate() {
- return function.negate().map(negate -> new LeafPredicate(negate, type, index, literals));
+ return function.negate()
+ .map(negate -> new LeafPredicate(negate, type, fieldIndex, fieldName, literals));
}
@Override
@@ -95,7 +103,8 @@ public class LeafPredicate implements Predicate {
return false;
}
LeafPredicate that = (LeafPredicate) o;
- return index == that.index
+ return fieldIndex == that.fieldIndex
+ && Objects.equals(fieldName, that.fieldName)
&& Objects.equals(function, that.function)
&& Objects.equals(type, that.type)
&& Objects.equals(literals, that.literals);
@@ -103,7 +112,7 @@ public class LeafPredicate implements Predicate {
@Override
public int hashCode() {
- return Objects.hash(function, type, index, literals);
+ return Objects.hash(function, type, fieldIndex, fieldName, literals);
}
private ListSerializer<Object> objectsSerializer() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
similarity index 85%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index 035809d9..a1b86a2d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
@@ -39,6 +41,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import static java.util.Collections.singletonList;
@@ -88,19 +91,32 @@ public class PredicateBuilder {
}
public Predicate leaf(NullFalseLeafBinaryFunction function, int idx, Object literal) {
- return new LeafPredicate(function, rowType.getTypeAt(idx), idx, singletonList(literal));
+ RowType.RowField field = rowType.getFields().get(idx);
+ return new LeafPredicate(
+ function, field.getType(), idx, field.getName(), singletonList(literal));
}
public Predicate leaf(LeafUnaryFunction function, int idx) {
- return new LeafPredicate(function, rowType.getTypeAt(idx), idx, Collections.emptyList());
+ RowType.RowField field = rowType.getFields().get(idx);
+ return new LeafPredicate(
+ function, field.getType(), idx, field.getName(), Collections.emptyList());
}
public Predicate in(int idx, List<Object> literals) {
- return new LeafPredicate(In.INSTANCE, rowType.getTypeAt(idx), idx, literals);
+ if (literals.size() > 20) {
+ RowType.RowField field = rowType.getFields().get(idx);
+ return new LeafPredicate(In.INSTANCE, field.getType(), idx, field.getName(), literals);
+ }
+
+ List<Predicate> equals = new ArrayList<>(literals.size());
+ for (Object literal : literals) {
+ equals.add(equal(idx, literal));
+ }
+ return or(equals);
}
public Predicate notIn(int idx, List<Object> literals) {
- return new LeafPredicate(NotIn.INSTANCE, rowType.getTypeAt(idx), idx, literals);
+ return in(idx, literals).negate().get();
}
public Predicate between(int idx, Object includedLowerBound, Object includedUpperBound) {
@@ -140,13 +156,19 @@ public class PredicateBuilder {
.get();
}
- public static List<Predicate> splitAnd(Predicate predicate) {
+ public static List<Predicate> splitAnd(@Nullable Predicate predicate) {
+ if (predicate == null) {
+ return Collections.emptyList();
+ }
List<Predicate> result = new ArrayList<>();
splitCompound(And.INSTANCE, predicate, result);
return result;
}
- public static List<Predicate> splitOr(Predicate predicate) {
+ public static List<Predicate> splitOr(@Nullable Predicate predicate) {
+ if (predicate == null) {
+ return Collections.emptyList();
+ }
List<Predicate> result = new ArrayList<>();
splitCompound(Or.INSTANCE, predicate, result);
return result;
@@ -268,10 +290,25 @@ public class PredicateBuilder {
leafPredicate.function(),
leafPredicate.type(),
mapped,
+ leafPredicate.fieldName(),
leafPredicate.literals()));
} else {
return Optional.empty();
}
}
}
+
+ public static boolean containsFields(Predicate predicate, Set<String> fields) {
+ if (predicate instanceof CompoundPredicate) {
+ for (Predicate child : ((CompoundPredicate) predicate).children()) {
+ if (containsFields(child, fields)) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ LeafPredicate leafPredicate = (LeafPredicate) predicate;
+ return fields.contains(leafPredicate.fieldName());
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
similarity index 100%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
index 4fad3648..d457e258 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
@@ -26,7 +26,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
@@ -59,7 +59,7 @@ public abstract class FileFormat {
* @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
*/
public abstract BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters);
+ RowType type, int[][] projection, List<Predicate> filters);
/** Create a {@link BulkWriter.Factory} from the type. */
public abstract BulkWriter.Factory<RowData> createWriterFactory(RowType type);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 2752b64a..cd2c7a28 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -84,6 +84,9 @@ public class FileStoreSource
if (projectedFields != null) {
read.withProjection(projectedFields);
}
+ if (predicate != null) {
+ read.withFilter(predicate);
+ }
return new FileStoreSourceReader(context, read);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 1d26808c..8c7a504b 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
@@ -108,6 +109,11 @@ public class TestChangelogDataReadWrite {
avro,
pathFactory);
return new KeyValueTableRead(read) {
+ @Override
+ public TableRead withFilter(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public TableRead withProjection(int[][] projection) {
throw new UnsupportedOperationException();
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index c364d497..ac25fab5 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -89,6 +89,48 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index a57044f3..26cd5c65 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
@@ -37,6 +38,8 @@ import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
/**
* Reads {@link KeyValue}s from data files.
@@ -173,10 +176,11 @@ public class DataFileReader {
}
public DataFileReader create(BinaryRowData partition, int bucket) {
- return create(partition, bucket, true);
+ return create(partition, bucket, true, Collections.emptyList());
}
- public DataFileReader create(BinaryRowData partition, int bucket, boolean projectKeys) {
+ public DataFileReader create(
+ BinaryRowData partition, int bucket, boolean projectKeys, List<Predicate> filters) {
int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection;
RowType projectedKeyType = projectKeys ? this.projectedKeyType : keyType;
@@ -188,7 +192,7 @@ public class DataFileReader {
schemaId,
projectedKeyType,
projectedValueType,
- fileFormat.createReaderFactory(recordType, projection),
+ fileFormat.createReaderFactory(recordType, projection, filters),
pathFactory.createDataFilePathFactory(partition, bucket));
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index c527f415..176b7524 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.store.file.data.AppendOnlyReader;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
@@ -37,6 +38,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+
/** {@link FileStoreRead} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
@@ -48,6 +51,8 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
private int[][] projection;
+ private List<Predicate> filters;
+
public AppendOnlyFileStoreRead(
SchemaManager schemaManager,
long schemaId,
@@ -68,10 +73,16 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
return this;
}
+ @Override
+ public FileStoreRead<RowData> withFilter(Predicate predicate) {
+ this.filters = splitAnd(predicate);
+ return this;
+ }
+
@Override
public RecordReader<RowData> createReader(Split split) throws IOException {
BulkFormat<RowData, FileSourceSplit> readerFactory =
- fileFormat.createReaderFactory(rowType, projection);
+ fileFormat.createReaderFactory(rowType, projection, filters);
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(split.partition(), split.bucket());
List<ConcatRecordReader.ReaderSupplier<RowData>> suppliers = new ArrayList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index 43729878..f540f0cb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.source.Split;
@@ -30,6 +31,8 @@ import java.io.IOException;
*/
public interface FileStoreRead<T> {
+ FileStoreRead<T> withFilter(Predicate predicate);
+
/** Create a {@link RecordReader} from split. */
RecordReader<T> createReader(Split split) throws IOException;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index de83b87c..40bd85c5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
@@ -41,6 +42,7 @@ import java.util.List;
import java.util.Optional;
import static org.apache.flink.table.store.file.data.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
/**
* {@link FileStoreRead} implementation for {@link
@@ -54,6 +56,8 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
private int[][] keyProjectedFields;
+ private List<Predicate> filters;
+
public KeyValueFileStoreRead(
SchemaManager schemaManager,
long schemaId,
@@ -81,11 +85,17 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
return this;
}
+ @Override
+ public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
+ this.filters = splitAnd(predicate);
+ return this;
+ }
+
@Override
public RecordReader<KeyValue> createReader(Split split) throws IOException {
if (split.isIncremental()) {
DataFileReader dataFileReader =
- dataFileReaderFactory.create(split.partition(), split.bucket(), true);
+ dataFileReaderFactory.create(split.partition(), split.bucket(), true, filters);
// Return the raw file contents without merging
List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
for (DataFileMeta file : split.files()) {
@@ -97,7 +107,7 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
// in this case merge tree should merge records with same key
// Do not project key in MergeTreeReader.
DataFileReader dataFileReader =
- dataFileReaderFactory.create(split.partition(), split.bucket(), false);
+ dataFileReaderFactory.create(split.partition(), split.bucket(), false, filters);
MergeTreeReader reader =
new MergeTreeReader(
new IntervalPartition(split.files(), keyComparator).partition(),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 16f4be28..65975cc6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -85,6 +85,12 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
public TableRead newRead() {
AppendOnlyFileStoreRead read = store.newRead();
return new TableRead() {
+ @Override
+ public TableRead withFilter(Predicate predicate) {
+ read.withFilter(predicate);
+ return this;
+ }
+
@Override
public TableRead withProjection(int[][] projection) {
read.withProjection(projection);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index ad90ede0..9dea5237 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -98,6 +98,12 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
public TableRead newRead() {
return new KeyValueTableRead(store.newRead()) {
+ @Override
+ public TableRead withFilter(Predicate predicate) {
+ read.withFilter(predicate);
+ return this;
+ }
+
@Override
public TableRead withProjection(int[][] projection) {
read.withKeyProjection(projection);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 37a29062..a47a1557 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -50,9 +49,13 @@ import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterat
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.containsFields;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
@@ -142,7 +145,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
tableSchema.fieldNames(),
tableSchema.trimmedPrimaryKeys());
if (keyFilters.size() > 0) {
- scan.withKeyFilter(PredicateBuilder.and(keyFilters));
+ scan.withKeyFilter(and(keyFilters));
}
}
};
@@ -150,7 +153,32 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
@Override
public TableRead newRead() {
+ List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
+ Set<String> nonPrimaryKeys =
+ tableSchema.fieldNames().stream()
+ .filter(name -> !primaryKeys.contains(name))
+ .collect(Collectors.toSet());
return new KeyValueTableRead(store.newRead()) {
+
+ @Override
+ public TableRead withFilter(Predicate predicate) {
+ List<Predicate> predicates = new ArrayList<>();
+ for (Predicate sub : splitAnd(predicate)) {
+ // TODO support value filter
+ if (containsFields(sub, nonPrimaryKeys)) {
+ continue;
+ }
+
+ // TODO Actually, the index is wrong, but it is OK. The orc filter
+ // just use name instead of index.
+ predicates.add(sub);
+ }
+ if (predicates.size() > 0) {
+ read.withFilter(and(predicates));
+ }
+ return this;
+ }
+
@Override
public TableRead withProjection(int[][] projection) {
read.withValueProjection(projection);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index b8fa4061..440e3d2d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.RecordReader;
import java.io.IOException;
@@ -28,7 +29,7 @@ import java.util.Arrays;
/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
public interface TableRead {
- // TODO support filter push down
+ TableRead withFilter(Predicate predicate);
default TableRead withProjection(int[] projection) {
int[][] nestedProjection =
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index 29ebd0c3..692ba1b5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.stats.TestFileStatsExtractor;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractor;
@@ -44,7 +44,7 @@ public class FileStatsExtractingAvroFormat extends FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ RowType type, int[][] projection, List<Predicate> filters) {
return avro.createReaderFactory(type, projection, filters);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index 23569b21..94e11003 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
@@ -42,7 +42,7 @@ public class FlushingFileFormat extends FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ RowType type, int[][] projection, List<Predicate> filters) {
return format.createReaderFactory(type, projection, filters);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index ce15619f..5e58e73b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -24,7 +24,9 @@ import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -258,6 +260,7 @@ public class PredicateTest {
public void testIn() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
Predicate predicate = builder.in(0, Arrays.asList(1, 3));
+ assertThat(predicate).isInstanceOf(CompoundPredicate.class);
assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
@@ -274,6 +277,7 @@ public class PredicateTest {
public void testInNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
Predicate predicate = builder.in(0, Arrays.asList(1, null, 3));
+ assertThat(predicate).isInstanceOf(CompoundPredicate.class);
assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
@@ -290,6 +294,7 @@ public class PredicateTest {
public void testNotIn() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
Predicate predicate = builder.notIn(0, Arrays.asList(1, 3));
+ assertThat(predicate).isInstanceOf(CompoundPredicate.class);
assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
assertThat(predicate.test(new Object[] {2})).isEqualTo(true);
@@ -309,6 +314,7 @@ public class PredicateTest {
public void testNotInNull() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
Predicate predicate = builder.notIn(0, Arrays.asList(1, null, 3));
+ assertThat(predicate).isInstanceOf(CompoundPredicate.class);
assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
@@ -324,6 +330,111 @@ public class PredicateTest {
.isEqualTo(false);
}
+ @Test
+ public void testLargeIn() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+ List<Object> literals = new ArrayList<>();
+ literals.add(1);
+ literals.add(3);
+ for (int i = 10; i < 30; i++) {
+ literals.add(i);
+ }
+ Predicate predicate = builder.in(0, literals);
+ assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+ assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+ .isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 0)})).isEqualTo(true);
+ }
+
+ @Test
+ public void testLargeInNull() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+ List<Object> literals = new ArrayList<>();
+ literals.add(1);
+ literals.add(null);
+ literals.add(3);
+ for (int i = 10; i < 30; i++) {
+ literals.add(i);
+ }
+ Predicate predicate = builder.in(0, literals);
+ assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+ assertThat(predicate.test(new Object[] {1})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+ .isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 0)})).isEqualTo(true);
+ }
+
+ @Test
+ public void testLargeNotIn() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+ List<Object> literals = new ArrayList<>();
+ literals.add(1);
+ literals.add(3);
+ for (int i = 10; i < 30; i++) {
+ literals.add(i);
+ }
+ Predicate predicate = builder.notIn(0, literals);
+ assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+ assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {2})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+ .isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 0)})).isEqualTo(true);
+ }
+
+ @Test
+ public void testLargeNotInNull() {
+ PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));
+ List<Object> literals = new ArrayList<>();
+ literals.add(1);
+ literals.add(null);
+ literals.add(3);
+ for (int i = 10; i < 30; i++) {
+ literals.add(i);
+ }
+ Predicate predicate = builder.notIn(0, literals);
+ assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+ assertThat(predicate.test(new Object[] {1})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {2})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 0)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, null, 1)}))
+ .isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 0)}))
+ .isEqualTo(false);
+ }
+
@Test
public void testAnd() {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType()));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index f4498949..57cfb1bd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -168,7 +168,6 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
- conf.set(CoreOptions.FILE_FORMAT, "avro");
conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
configure.accept(conf);
SchemaManager schemaManager = new SchemaManager(tablePath);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 24733a7b..9f3604d3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -171,7 +171,6 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
- conf.set(CoreOptions.FILE_FORMAT, "avro");
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
configure.accept(conf);
SchemaManager schemaManager = new SchemaManager(tablePath);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index a7a5a318..6a19f043 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -209,6 +209,47 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.close();
}
+ @Override
+ @Test
+ public void testReadFilter() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
+
+ write.write(GenericRowData.of(1, 10, 100L));
+ write.write(GenericRowData.of(1, 20, 200L));
+ commit.commit("0", write.prepareCommit(true));
+
+ write.write(GenericRowData.of(1, 30, 300L));
+ write.write(GenericRowData.of(1, 40, 400L));
+ commit.commit("1", write.prepareCommit(true));
+
+ write.write(GenericRowData.of(1, 50, 500L));
+ write.write(GenericRowData.of(1, 60, 600L));
+ commit.commit("2", write.prepareCommit(true));
+
+ write.close();
+
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+ List<Split> splits = table.newScan().plan().splits;
+
+ TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+ "1|10|100",
+ "1|20|200",
+ "1|30|300",
+ "1|40|400",
+ "1|50|500",
+ "1|60|600"));
+
+ read = table.newRead().withFilter(builder.equal(1, 30));
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+ }
+
protected FileStoreTable createFileStoreTable(boolean changelogFile) throws Exception {
return createFileStoreTable(conf -> conf.set(CoreOptions.CHANGELOG_FILE, changelogFile));
}
@@ -219,7 +260,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
- conf.set(CoreOptions.FILE_FORMAT, "avro");
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
configure.accept(conf);
SchemaManager schemaManager = new SchemaManager(tablePath);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 2b818185..8b9250bf 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -40,6 +40,7 @@ import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -133,6 +134,34 @@ public abstract class FileStoreTableTestBase {
assertThat(splits.get(0).bucket()).isEqualTo(1);
}
+ @Test
+ public void testReadFilter() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
+
+ write.write(GenericRowData.of(1, 10, 100L));
+ write.write(GenericRowData.of(1, 20, 200L));
+ commit.commit("0", write.prepareCommit(true));
+
+ write.write(GenericRowData.of(1, 30, 300L));
+ write.write(GenericRowData.of(1, 40, 400L));
+ commit.commit("1", write.prepareCommit(true));
+
+ write.write(GenericRowData.of(1, 50, 500L));
+ write.write(GenericRowData.of(1, 60, 600L));
+ commit.commit("2", write.prepareCommit(true));
+
+ write.close();
+
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index d4ee1d13..7252a6a6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -60,6 +60,10 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
testWritePreemptMemory(true);
}
+ @Override // this has been tested in ChangelogWithKeyFileStoreTableTest
+ @Test
+ public void testReadFilter() {}
+
private void testWritePreemptMemory(boolean singlePartition) throws Exception {
// write
FileStoreTable table = createFileStoreTable();
@@ -93,7 +97,6 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
- conf.set(CoreOptions.FILE_FORMAT, "avro");
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
index 66e1901a..90618f1d 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -31,8 +31,8 @@ import org.apache.flink.formats.avro.RowDataToAvroConverters;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.LogicalType;
@@ -65,7 +65,7 @@ public class AvroFileFormat extends FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ RowType type, int[][] projection, List<Predicate> filters) {
// avro is a file format that keeps schemas in file headers,
// if the schema given to the reader is not equal to the schema in header,
// reader will automatically map the fields and give back records with our desired
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 60ebece8..880074c2 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -29,8 +29,7 @@ import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.store.utils.Projection;
@@ -68,12 +67,12 @@ public class OrcFileFormat extends FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ RowType type, int[][] projection, List<Predicate> filters) {
List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
if (filters != null) {
- for (Expression pred : filters) {
- OrcFilters.Predicate orcPred = OrcFilters.toOrcPredicate(pred);
+ for (Predicate pred : filters) {
+ OrcFilters.Predicate orcPred = OrcFilterConverter.toOrcPredicate(pred);
if (orcPred != null) {
orcPredicates.add(orcPred);
}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java
new file mode 100644
index 00000000..acccd161
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.orc.OrcFilters.Predicate;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.predicate.CompoundPredicate;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.GreaterOrEqual;
+import org.apache.flink.table.store.file.predicate.GreaterThan;
+import org.apache.flink.table.store.file.predicate.IsNotNull;
+import org.apache.flink.table.store.file.predicate.IsNull;
+import org.apache.flink.table.store.file.predicate.LeafFunction;
+import org.apache.flink.table.store.file.predicate.LeafPredicate;
+import org.apache.flink.table.store.file.predicate.LessOrEqual;
+import org.apache.flink.table.store.file.predicate.LessThan;
+import org.apache.flink.table.store.file.predicate.NotEqual;
+import org.apache.flink.table.store.file.predicate.Or;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.util.function.Function;
+
+/** Utility class that provides helper methods to work with Orc Filter PushDown. */
+public class OrcFilterConverter {
+
+ private static final ImmutableMap<
+ Class<? extends LeafFunction>, Function<LeafPredicate, Predicate>>
+ FILTERS =
+ new ImmutableMap.Builder<
+ Class<? extends LeafFunction>,
+ Function<LeafPredicate, Predicate>>()
+ .put(IsNull.class, OrcFilterConverter::convertIsNull)
+ .put(IsNotNull.class, OrcFilterConverter::convertIsNotNull)
+ .put(
+ Equal.class,
+ call -> convertBinary(call, OrcFilterConverter::convertEquals))
+ .put(
+ NotEqual.class,
+ call ->
+ convertBinary(
+ call, OrcFilterConverter::convertNotEquals))
+ .put(
+ GreaterThan.class,
+ call ->
+ convertBinary(
+ call, OrcFilterConverter::convertGreaterThan))
+ .put(
+ GreaterOrEqual.class,
+ call ->
+ convertBinary(
+ call,
+ OrcFilterConverter::convertGreaterThanEquals))
+ .put(
+ LessThan.class,
+ call ->
+ convertBinary(
+ call, OrcFilterConverter::convertLessThan))
+ .put(
+ LessOrEqual.class,
+ call ->
+ convertBinary(
+ call,
+ OrcFilterConverter::convertLessThanEquals))
+ .build();
+
+ private static Predicate convertIsNull(LeafPredicate predicate) {
+ PredicateLeaf.Type colType = toOrcType(predicate.type());
+ if (colType == null) {
+ return null;
+ }
+
+ return new OrcFilters.IsNull(predicate.fieldName(), colType);
+ }
+
+ private static Predicate convertIsNotNull(LeafPredicate predicate) {
+ Predicate isNull = convertIsNull(predicate);
+ if (isNull == null) {
+ return null;
+ }
+ return new OrcFilters.Not(isNull);
+ }
+
+ private static Predicate convertOr(CompoundPredicate or) {
+ if (or.children().size() != 2) {
+ throw new RuntimeException("Illegal or children: " + or.children().size());
+ }
+
+ Predicate c1 = toOrcPredicate(or.children().get(0));
+ if (c1 == null) {
+ return null;
+ }
+ Predicate c2 = toOrcPredicate(or.children().get(1));
+ if (c2 == null) {
+ return null;
+ }
+
+ return new OrcFilters.Or(c1, c2);
+ }
+
+ private static Predicate convertBinary(
+ LeafPredicate predicate,
+ TriFunction<String, PredicateLeaf.Type, Serializable, Predicate> func) {
+ PredicateLeaf.Type litType = toOrcType(predicate.type());
+ if (litType == null) {
+ return null;
+ }
+
+ String colName = predicate.fieldName();
+
+ // fetch literal and ensure it is serializable
+ Object orcObj = toOrcObject(litType, predicate.literals().get(0));
+ Serializable literal;
+ // validate that literal is serializable
+ if (orcObj instanceof Serializable) {
+ literal = (Serializable) orcObj;
+ } else {
+ return null;
+ }
+
+ return func.apply(colName, litType, literal);
+ }
+
+ private static Predicate convertEquals(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new OrcFilters.Equals(colName, litType, literal);
+ }
+
+ private static Predicate convertNotEquals(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new OrcFilters.Not(convertEquals(colName, litType, literal));
+ }
+
+ private static Predicate convertGreaterThan(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new OrcFilters.Not(new OrcFilters.LessThanEquals(colName, litType, literal));
+ }
+
+ private static Predicate convertGreaterThanEquals(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new OrcFilters.Not(new OrcFilters.LessThan(colName, litType, literal));
+ }
+
+ private static Predicate convertLessThan(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new OrcFilters.LessThan(colName, litType, literal);
+ }
+
+ private static Predicate convertLessThanEquals(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new OrcFilters.LessThanEquals(colName, litType, literal);
+ }
+
+ @Nullable
+ public static Predicate toOrcPredicate(
+ org.apache.flink.table.store.file.predicate.Predicate expression) {
+ if (expression instanceof CompoundPredicate) {
+ CompoundPredicate compound = (CompoundPredicate) expression;
+ if (compound.function().equals(Or.INSTANCE)) {
+ return convertOr(compound);
+ }
+ } else if (expression instanceof LeafPredicate) {
+ LeafPredicate leaf = (LeafPredicate) expression;
+ Function<LeafPredicate, Predicate> function = FILTERS.get(leaf.function().getClass());
+ if (function == null) {
+ return null;
+ }
+ return function.apply(leaf);
+ }
+ return null;
+ }
+
+ @Nullable
+ private static Object toOrcObject(PredicateLeaf.Type litType, Object literalObj) {
+ if (literalObj == null) {
+ return null;
+ }
+
+ switch (litType) {
+ case STRING:
+ return literalObj.toString();
+ case DECIMAL:
+ return ((DecimalData) literalObj).toBigDecimal();
+ case DATE:
+ return Date.valueOf(LocalDate.ofEpochDay(((Number) literalObj).longValue()));
+ case TIMESTAMP:
+ return ((TimestampData) literalObj).toTimestamp();
+ default:
+ return literalObj;
+ }
+ }
+
+ @Nullable
+ private static PredicateLeaf.Type toOrcType(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ return PredicateLeaf.Type.LONG;
+ case FLOAT:
+ case DOUBLE:
+ return PredicateLeaf.Type.FLOAT;
+ case BOOLEAN:
+ return PredicateLeaf.Type.BOOLEAN;
+ case CHAR:
+ case VARCHAR:
+ return PredicateLeaf.Type.STRING;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return PredicateLeaf.Type.TIMESTAMP;
+ case DATE:
+ return PredicateLeaf.Type.DATE;
+ case DECIMAL:
+ return PredicateLeaf.Type.DECIMAL;
+ default:
+ return null;
+ }
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
new file mode 100644
index 00000000..1c11af1c
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit Tests for {@link OrcFilterConverter}. */
+public class OrcFilterConverterTest {
+
+ @Test
+ public void testApplyPredicate() {
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new RowType.RowField("long1", new BigIntType()))));
+ test(builder.isNull(0), new OrcFilters.IsNull("long1", PredicateLeaf.Type.LONG));
+ test(
+ builder.isNotNull(0),
+ new OrcFilters.Not(new OrcFilters.IsNull("long1", PredicateLeaf.Type.LONG)));
+ test(builder.equal(0, 10L), new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 10));
+ test(
+ builder.notEqual(0, 10L),
+ new OrcFilters.Not(new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 10)));
+ test(
+ builder.lessThan(0, 10L),
+ new OrcFilters.LessThan("long1", PredicateLeaf.Type.LONG, 10));
+ test(
+ builder.lessOrEqual(0, 10L),
+ new OrcFilters.LessThanEquals("long1", PredicateLeaf.Type.LONG, 10));
+ test(
+ builder.greaterThan(0, 10L),
+ new OrcFilters.Not(
+ new OrcFilters.LessThanEquals("long1", PredicateLeaf.Type.LONG, 10)));
+ test(
+ builder.greaterOrEqual(0, 10L),
+ new OrcFilters.Not(new OrcFilters.LessThan("long1", PredicateLeaf.Type.LONG, 10)));
+
+ test(
+ builder.in(0, Arrays.asList(1L, 2L, 3L)),
+ new OrcFilters.Or(
+ new OrcFilters.Or(
+ new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 1),
+ new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 2)),
+ new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 3)));
+ }
+
+ private void test(Predicate predicate, OrcFilters.Predicate orcPredicate) {
+ assertThat(OrcFilterConverter.toOrcPredicate(predicate))
+ .hasToString(orcPredicate.toString());
+ }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index bcdf4814..5d24bef9 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
@@ -52,7 +53,7 @@ import java.util.Optional;
public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer> {
@Override
- public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+ public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
FileStoreTable table = createFileStoreTable(jobConf);
TableScan scan = table.newScan();
createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
@@ -66,8 +67,10 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
FileStoreTable table = createFileStoreTable(jobConf);
TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
+ TableRead read = table.newRead();
+ createPredicate(table.schema(), jobConf).ifPresent(read::withFilter);
return new TableStoreRecordReader(
- table.newRead(),
+ read,
split,
table.schema().fieldNames(),
Arrays.asList(ColumnProjectionUtils.getReadColumnNames(jobConf)));
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
index 0ab02e1a..37d799ea 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
@@ -94,7 +94,9 @@ public class SearchArgumentToPredicateConverterTest {
new SearchArgumentToPredicateConverter(
sarg, Collections.singletonList("a"), Collections.singletonList(flinkType));
- Predicate expected = new PredicateBuilder(RowType.of(flinkType)).equal(0, flinkLiteral);
+ Predicate expected =
+ new PredicateBuilder(RowType.of(new LogicalType[] {flinkType}, new String[] {"a"}))
+ .equal(0, flinkLiteral);
Predicate actual = converter.convert().orElse(null);
assertThat(actual).isEqualTo(expected);
}
@@ -176,6 +178,15 @@ public class SearchArgumentToPredicateConverterTest {
builder.in("f_bigint", PredicateLeaf.Type.LONG, 100L, 200L, 300L).build();
Predicate expected = BUILDER.in(1, Arrays.asList(100L, 200L, 300L));
assertExpected(sarg, expected);
+
+ builder = SearchArgumentFactory.newBuilder();
+ Object[] literals = new Object[30];
+ for (int i = 0; i < literals.length; i++) {
+ literals[i] = i * 100L;
+ }
+ sarg = builder.in("f_bigint", PredicateLeaf.Type.LONG, literals).build();
+ expected = BUILDER.in(1, Arrays.asList(literals));
+ assertExpected(sarg, expected);
}
@Test
@@ -211,6 +222,33 @@ public class SearchArgumentToPredicateConverterTest {
assertExpected(sarg, expected);
}
+ @Test
+ public void testLargeIn() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ Object[] literals = new Object[30];
+ literals[0] = null;
+ for (int i = 1; i < literals.length; i++) {
+ literals[i] = i * 100L;
+ }
+ SearchArgument sarg = builder.in("f_bigint", PredicateLeaf.Type.LONG, literals).build();
+ Predicate expected = BUILDER.in(1, Arrays.asList(literals));
+ assertExpected(sarg, expected);
+ }
+
+ @Test
+ public void testLargeNotIn() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ Object[] literals = new Object[30];
+ literals[0] = null;
+ for (int i = 1; i < literals.length; i++) {
+ literals[i] = i * 100L;
+ }
+ SearchArgument sarg =
+ builder.startNot().in("f_bigint", PredicateLeaf.Type.LONG, literals).end().build();
+ Predicate expected = BUILDER.notIn(1, Arrays.asList(literals));
+ assertExpected(sarg, expected);
+ }
+
@Test
public void testBetween() {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
index f247f908..c94db2eb 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -18,9 +18,11 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.RowType;
@@ -31,6 +33,9 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.List;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
/** A Spark {@link PartitionReaderFactory} for table store. */
public class SparkReaderFactory implements PartitionReaderFactory {
@@ -39,10 +44,13 @@ public class SparkReaderFactory implements PartitionReaderFactory {
private final FileStoreTable table;
private final int[] projectedFields;
+ private final List<Predicate> predicates;
- public SparkReaderFactory(FileStoreTable table, int[] projectedFields) {
+ public SparkReaderFactory(
+ FileStoreTable table, int[] projectedFields, List<Predicate> predicates) {
this.table = table;
this.projectedFields = projectedFields;
+ this.predicates = predicates;
}
private RowType readRowType() {
@@ -52,11 +60,12 @@ public class SparkReaderFactory implements PartitionReaderFactory {
@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
RecordReader<RowData> reader;
+ TableRead read = table.newRead().withProjection(projectedFields);
+ if (predicates.size() > 0) {
+ read.withFilter(and(predicates));
+ }
try {
- reader =
- table.newRead()
- .withProjection(projectedFields)
- .createReader(((SparkInputPartition) partition).split());
+ reader = read.createReader(((SparkInputPartition) partition).split());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
index da96491d..5db24a09 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -79,7 +79,7 @@ public class SparkScan implements Scan, SupportsReportStatistics {
@Override
public PartitionReaderFactory createReaderFactory() {
- return new SparkReaderFactory(table, projectedFields);
+ return new SparkReaderFactory(table, projectedFields, predicates);
}
};
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
index 8b35a31f..7180f3d6 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkFilterConverterTest.java
@@ -122,6 +122,16 @@ public class SparkFilterConverterTest {
Predicate expectedIn = builder.in(0, Arrays.asList(1, null, 2));
Predicate actualIn = converter.convert(in);
assertThat(actualIn).isEqualTo(expectedIn);
+
+ Object[] literals = new Object[30];
+ literals[0] = null;
+ for (int i = 1; i < literals.length; i++) {
+ literals[i] = i * 100;
+ }
+ In largeIn = In.apply(field, literals);
+ Predicate expectedLargeIn = builder.in(0, Arrays.asList(literals));
+ Predicate actualLargeIn = converter.convert(largeIn);
+ assertThat(actualLargeIn).isEqualTo(expectedLargeIn);
}
@Test