You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/05 13:33:36 UTC
[flink-table-store] branch master updated: [FLINK-29129] Try best to push down value filters
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 769f19d8 [FLINK-29129] Try best to push down value filters
769f19d8 is described below
commit 769f19d8566f0a30d3a7c09b297b9dce3280ac1e
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Sep 5 21:33:32 2022 +0800
[FLINK-29129] Try best to push down value filters
This closes #280
---
.../flink/table/store/format/FileFormat.java | 4 +-
.../source/FileStoreSourceReaderTest.java | 24 +++++++
.../source/FileStoreSourceSplitReaderTest.java | 23 +++++++
.../table/store/file/data/DataFileReader.java | 5 +-
.../store/file/mergetree/MergeTreeReader.java | 6 ++
.../file/operation/AppendOnlyFileStoreRead.java | 4 +-
.../file/operation/KeyValueFileStoreRead.java | 74 ++++++++++++++++++----
.../table/store/file/schema/SchemaManager.java | 4 +-
.../table/ChangelogWithKeyFileStoreTable.java | 23 +------
.../flink/table/store/file/TestFileStore.java | 8 ++-
.../table/store/file/data/DataFileTestUtils.java | 16 +++++
.../file/format/FileStatsExtractingAvroFormat.java | 4 +-
.../store/file/format/FlushingFileFormat.java | 4 +-
.../store/file/operation/FileStoreCommitTest.java | 15 ++++-
.../store/file/operation/FileStoreExpireTest.java | 14 +++-
.../file/operation/KeyValueFileStoreReadTest.java | 26 ++++++--
.../file/operation/KeyValueFileStoreScanTest.java | 16 ++++-
.../store/table/AppendOnlyFileStoreTableTest.java | 1 -
.../table/ChangelogWithKeyFileStoreTableTest.java | 39 +++++++++---
.../table/store/format/avro/AvroFileFormat.java | 4 +-
.../table/store/format/orc/OrcFileFormat.java | 4 +-
21 files changed, 255 insertions(+), 63 deletions(-)
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
index d457e258..f34d4e1b 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -59,7 +61,7 @@ public abstract class FileFormat {
* @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
*/
public abstract BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<Predicate> filters);
+ RowType type, int[][] projection, @Nullable List<Predicate> filters);
/** Create a {@link BulkWriter.Factory} from the type. */
public abstract BulkWriter.Factory<RowData> createWriterFactory(RowType type);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index a7d9d2dc..d48b9c56 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -19,10 +19,18 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.util.Arrays;
import java.util.Collections;
import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
@@ -34,6 +42,22 @@ public class FileStoreSourceReaderTest {
@TempDir java.nio.file.Path tempDir;
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("k", new BigIntType()),
+ new RowType.RowField("v", new BigIntType()),
+ new RowType.RowField("default", new IntType()))),
+ Collections.singletonList("default"),
+ Arrays.asList("k", "default"),
+ Collections.emptyMap(),
+ null));
+ }
+
@Test
public void testRequestSplitWhenNoSplitRestored() throws Exception {
final TestingReaderContext context = new TestingReaderContext();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 32a40de1..25211fe6 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -23,15 +23,22 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -67,6 +74,22 @@ public class FileStoreSourceSplitReaderTest {
service = null;
}
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("k", new BigIntType()),
+ new RowType.RowField("v", new BigIntType()),
+ new RowType.RowField("default", new IntType()))),
+ Collections.singletonList("default"),
+ Arrays.asList("k", "default"),
+ Collections.emptyMap(),
+ null));
+ }
+
@Test
public void testPrimaryKey() throws Exception {
innerTestOnce(false, 0);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index 5da2a2b0..7a0c6eef 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -178,7 +178,10 @@ public class DataFileReader {
}
public DataFileReader create(
- BinaryRowData partition, int bucket, boolean projectKeys, List<Predicate> filters) {
+ BinaryRowData partition,
+ int bucket,
+ boolean projectKeys,
+ @Nullable List<Predicate> filters) {
int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection;
RowType projectedKeyType = projectKeys ? this.projectedKeyType : keyType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index da39d0a3..a3830bc1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -62,6 +62,12 @@ public class MergeTreeReader implements RecordReader<KeyValue> {
this.reader = ConcatRecordReader.create(readers);
}
+ public MergeTreeReader(boolean dropDelete, List<ReaderSupplier<KeyValue>> sectionReaders)
+ throws IOException {
+ this.dropDelete = dropDelete;
+ this.reader = ConcatRecordReader.create(sectionReaders);
+ }
+
@Nullable
@Override
public RecordIterator<KeyValue> readBatch() throws IOException {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 176b7524..7f4f21a6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -34,6 +34,8 @@ import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -51,7 +53,7 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
private int[][] projection;
- private List<Predicate> filters;
+ @Nullable private List<Predicate> filters;
public AppendOnlyFileStoreRead(
SchemaManager schemaManager,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 40bd85c5..0f7a6648 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -23,11 +23,13 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
import org.apache.flink.table.store.file.utils.RecordReader;
@@ -35,13 +37,18 @@ import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.data.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.containsFields;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
/**
@@ -50,13 +57,17 @@ import static org.apache.flink.table.store.file.predicate.PredicateBuilder.split
*/
public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
+ private final TableSchema tableSchema;
private final DataFileReader.Factory dataFileReaderFactory;
private final Comparator<RowData> keyComparator;
private final MergeFunction mergeFunction;
+ private final boolean valueCountMode;
+
+ @Nullable private int[][] keyProjectedFields;
- private int[][] keyProjectedFields;
+ @Nullable private List<Predicate> filtersForOverlappedSection;
- private List<Predicate> filters;
+ @Nullable private List<Predicate> filtersForNonOverlappedSection;
public KeyValueFileStoreRead(
SchemaManager schemaManager,
@@ -67,11 +78,13 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
MergeFunction mergeFunction,
FileFormat fileFormat,
FileStorePathFactory pathFactory) {
+ this.tableSchema = schemaManager.schema(schemaId);
this.dataFileReaderFactory =
new DataFileReader.Factory(
schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory);
this.keyComparator = keyComparator;
this.mergeFunction = mergeFunction;
+ this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
}
public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
@@ -87,7 +100,27 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
@Override
public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
- this.filters = splitAnd(predicate);
+ List<Predicate> allFilters = new ArrayList<>();
+ List<Predicate> pkFilters = null;
+ List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
+ Set<String> nonPrimaryKeys =
+ tableSchema.fieldNames().stream()
+ .filter(name -> !primaryKeys.contains(name))
+ .collect(Collectors.toSet());
+ for (Predicate sub : splitAnd(predicate)) {
+ allFilters.add(sub);
+ if (!containsFields(sub, nonPrimaryKeys)) {
+ if (pkFilters == null) {
+ pkFilters = new ArrayList<>();
+ }
+ // TODO Actually, the index is wrong, but it is OK.
+ // The orc filter just use name instead of index.
+ pkFilters.add(sub);
+ }
+ }
+ // for section which does not have key range overlap, push down value filters too
+ filtersForNonOverlappedSection = allFilters;
+ filtersForOverlappedSection = valueCountMode ? allFilters : pkFilters;
return this;
}
@@ -95,7 +128,8 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
public RecordReader<KeyValue> createReader(Split split) throws IOException {
if (split.isIncremental()) {
DataFileReader dataFileReader =
- dataFileReaderFactory.create(split.partition(), split.bucket(), true, filters);
+ dataFileReaderFactory.create(
+ split.partition(), split.bucket(), true, filtersForOverlappedSection);
// Return the raw file contents without merging
List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
for (DataFileMeta file : split.files()) {
@@ -106,15 +140,7 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
} else {
// in this case merge tree should merge records with same key
// Do not project key in MergeTreeReader.
- DataFileReader dataFileReader =
- dataFileReaderFactory.create(split.partition(), split.bucket(), false, filters);
- MergeTreeReader reader =
- new MergeTreeReader(
- new IntervalPartition(split.files(), keyComparator).partition(),
- true,
- dataFileReader,
- keyComparator,
- mergeFunction.copy());
+ MergeTreeReader reader = new MergeTreeReader(true, createSectionReaders(split));
// project key using ProjectKeyRecordReader
return keyProjectedFields == null
@@ -123,6 +149,28 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
}
}
+ private List<ConcatRecordReader.ReaderSupplier<KeyValue>> createSectionReaders(Split split) {
+ List<ConcatRecordReader.ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
+ MergeFunction mergeFunc = mergeFunction.copy();
+ for (List<SortedRun> section :
+ new IntervalPartition(split.files(), keyComparator).partition()) {
+ DataFileReader dataFileReader = createDataFileReader(split, section.size() > 1);
+ sectionReaders.add(
+ () ->
+ MergeTreeReader.readerForSection(
+ section, dataFileReader, keyComparator, mergeFunc));
+ }
+ return sectionReaders;
+ }
+
+ private DataFileReader createDataFileReader(Split split, boolean overlapped) {
+ return dataFileReaderFactory.create(
+ split.partition(),
+ split.bucket(),
+ false,
+ overlapped ? filtersForOverlappedSection : filtersForNonOverlappedSection);
+ }
+
private Optional<String> changelogFile(DataFileMeta fileMeta) {
for (String file : fileMeta.extraFiles()) {
if (file.startsWith(CHANGELOG_FILE_PREFIX)) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index d9ab4b75..0b387441 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.schema;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
@@ -314,7 +315,8 @@ public class SchemaManager implements Serializable {
return new Path(tableRoot + "/schema");
}
- private Path toSchemaPath(long id) {
+ @VisibleForTesting
+ public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index c7e3f0d8..8a983479 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -50,13 +50,10 @@ import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
-import static org.apache.flink.table.store.file.predicate.PredicateBuilder.containsFields;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
@@ -155,29 +152,11 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
@Override
public TableRead newRead() {
- List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
- Set<String> nonPrimaryKeys =
- tableSchema.fieldNames().stream()
- .filter(name -> !primaryKeys.contains(name))
- .collect(Collectors.toSet());
return new KeyValueTableRead(store.newRead()) {
@Override
public TableRead withFilter(Predicate predicate) {
- List<Predicate> predicates = new ArrayList<>();
- for (Predicate sub : splitAnd(predicate)) {
- // TODO support value filter
- if (containsFields(sub, nonPrimaryKeys)) {
- continue;
- }
-
- // TODO Actually, the index is wrong, but it is OK. The orc filter
- // just use name instead of index.
- predicates.add(sub);
- }
- if (predicates.size() > 0) {
- read.withFilter(and(predicates));
- }
+ read.withFilter(predicate);
return this;
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 1b347791..339f8011 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -378,14 +378,19 @@ public class TestFileStore extends KeyValueFileStore {
}
private Set<Path> getFilesInUse() {
+ Set<Path> result = new HashSet<>();
FileStorePathFactory pathFactory = pathFactory();
ManifestList manifestList = manifestListFactory().create();
FileStoreScan scan = newScan();
+ SchemaManager schemaManager = new SchemaManager(options.path());
+ schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id)));
+
SnapshotManager snapshotManager = snapshotManager();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
+
if (latestSnapshotId == null) {
- return Collections.emptySet();
+ return result;
}
long firstInUseSnapshotId = Snapshot.FIRST_SNAPSHOT_ID;
@@ -396,7 +401,6 @@ public class TestFileStore extends KeyValueFileStore {
}
}
- Set<Path> result = new HashSet<>();
for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) {
Path snapshotPath = snapshotManager.snapshotPath(id);
Snapshot snapshot = Snapshot.fromPath(snapshotPath);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
index b977c6a8..9b0736f5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.data;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.stats.StatsTestUtils;
/** Utils for {@link DataFileMeta}. */
public class DataFileTestUtils {
@@ -47,6 +48,21 @@ public class DataFileTestUtils {
DataFileMeta.DUMMY_LEVEL);
}
+ public static DataFileMeta newFile() {
+ return new DataFileMeta(
+ "",
+ 0,
+ 0,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ StatsTestUtils.newEmptyTableStats(),
+ StatsTestUtils.newEmptyTableStats(),
+ 0,
+ 0,
+ 0,
+ 0);
+ }
+
public static DataFileMeta newFile(
String name, int level, int minKey, int maxKey, long maxSequence) {
return new DataFileMeta(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index 692ba1b5..d63305e2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Optional;
@@ -44,7 +46,7 @@ public class FileStatsExtractingAvroFormat extends FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<Predicate> filters) {
+ RowType type, int[][] projection, @Nullable List<Predicate> filters) {
return avro.createReaderFactory(type, projection, filters);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index 94e11003..0ffa1e12 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -27,6 +27,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.List;
@@ -42,7 +44,7 @@ public class FlushingFileFormat extends FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<Predicate> filters) {
+ RowType type, int[][] projection, @Nullable List<Predicate> filters) {
return format.createReaderFactory(type, projection, filters);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 13accde3..37c168a0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -339,16 +341,25 @@ public class FileStoreCommitTest {
assertThat(store.snapshotManager().findLatest()).isEqualTo(snapshot.id() + 1);
}
- private TestFileStore createStore(boolean failing) {
+ private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}
- private TestFileStore createStore(boolean failing, int numBucket) {
+ private TestFileStore createStore(boolean failing, int numBucket) throws Exception {
String root =
failing
? FailingAtomicRenameFileSystem.getFailingPath(
failingName, tempDir.toString())
: TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString();
+ SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.emptyMap(),
+ null));
return TestFileStore.create(
"avro",
root,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index b9fac9fd..49587a20 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -40,6 +42,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -56,7 +59,7 @@ public class FileStoreExpireTest {
private SnapshotManager snapshotManager;
@BeforeEach
- public void beforeEach() throws IOException {
+ public void beforeEach() throws Exception {
gen = new TestKeyValueGenerator();
store =
TestFileStore.create(
@@ -68,6 +71,15 @@ public class FileStoreExpireTest {
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new DeduplicateMergeFunction());
snapshotManager = store.snapshotManager();
+ SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.emptyMap(),
+ null));
}
@AfterEach
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index a2086ba8..a48bbdc4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -28,6 +29,8 @@ import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.source.Split;
@@ -42,12 +45,14 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -222,10 +227,23 @@ public class KeyValueFileStoreReadTest {
}
private TestFileStore createStore(
- RowType partitionType,
- RowType keyType,
- RowType valueType,
- MergeFunction mergeFunction) {
+ RowType partitionType, RowType keyType, RowType valueType, MergeFunction mergeFunction)
+ throws Exception {
+ SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+ boolean valueCountMode = mergeFunction instanceof ValueCountMergeFunction;
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ valueCountMode ? keyType : valueType,
+ partitionType.getFieldNames(),
+ valueCountMode
+ ? Collections.emptyList()
+ : Stream.concat(
+ keyType.getFieldNames().stream()
+ .map(field -> field.replace("key_", "")),
+ partitionType.getFieldNames().stream())
+ .collect(Collectors.toList()),
+ Collections.emptyMap(),
+ null));
return TestFileStore.create(
"avro", tempDir.toString(), 1, partitionType, keyType, valueType, mergeFunction);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
index 201eaab5..79ba448f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
@@ -27,6 +28,8 @@ import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -37,6 +40,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -57,7 +61,7 @@ public class KeyValueFileStoreScanTest {
private SnapshotManager snapshotManager;
@BeforeEach
- public void beforeEach() {
+ public void beforeEach() throws Exception {
gen = new TestKeyValueGenerator();
store =
TestFileStore.create(
@@ -69,6 +73,16 @@ public class KeyValueFileStoreScanTest {
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new DeduplicateMergeFunction());
snapshotManager = store.snapshotManager();
+
+ SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.emptyMap(),
+ null));
}
@Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 9cc7ed32..620252bb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -190,7 +190,6 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits =
table.newScan().withFilter(partitionFilter).withBucket(bucket).plan().splits;
TableRead read = table.newRead();
- getResult(read, splits, binaryRow(partition), bucket, STREAMING_ROW_TO_STRING);
assertThat(getResult(read, splits, binaryRow(partition), bucket, STREAMING_ROW_TO_STRING))
.containsExactlyElementsOf(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 9b1495fe..8ff0b786 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -228,25 +228,46 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(GenericRowData.of(1, 60, 600L));
commit.commit("2", write.prepareCommit(true));
- write.close();
-
PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = table.newScan().plan().splits;
- TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+ // push down key filter a = 30
+ TableRead read = table.newRead().withFilter(builder.equal(1, 30));
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+
+ // push down value filter b = 300L
+ read = table.newRead().withFilter(builder.equal(2, 300L));
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+
+ // push down both key filter and value filter
+ read =
+ table.newRead()
+ .withFilter(
+ PredicateBuilder.or(builder.equal(1, 10), builder.equal(2, 300L)));
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("1|10|100", "1|20|200", "1|30|300", "1|40|400"));
+
+ // update pk 60, 10
+ write.write(GenericRowData.of(1, 60, 500L));
+ write.write(GenericRowData.of(1, 10, 10L));
+ commit.commit("3", write.prepareCommit(true));
+
+ write.close();
+
+ // cannot push down value filter b = 600L
+ splits = table.newScan().plan().splits;
+ read = table.newRead().withFilter(builder.equal(2, 600L));
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
- "1|10|100",
+ "1|10|10",
"1|20|200",
"1|30|300",
"1|40|400",
"1|50|500",
- "1|60|600"));
-
- read = table.newRead().withFilter(builder.equal(1, 30));
- assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+ "1|60|500"));
}
@Override
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
index 90618f1d..f908fae2 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -46,6 +46,8 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
@@ -65,7 +67,7 @@ public class AvroFileFormat extends FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<Predicate> filters) {
+ RowType type, int[][] projection, @Nullable List<Predicate> filters) {
// avro is a file format that keeps schemas in file headers,
// if the schema given to the reader is not equal to the schema in header,
// reader will automatically map the fields and give back records with our desired
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 880074c2..8b028210 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -38,6 +38,8 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.orc.TypeDescription;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -67,7 +69,7 @@ public class OrcFileFormat extends FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<Predicate> filters) {
+ RowType type, int[][] projection, @Nullable List<Predicate> filters) {
List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
if (filters != null) {