You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/11 04:02:03 UTC
[flink-table-store] branch master updated: [FLINK-26544] Apply key & value filter in FileStoreScanImpl
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new f69c037 [FLINK-26544] Apply key & value filter in FileStoreScanImpl
f69c037 is described below
commit f69c03757fb56af33020d21d226355c98f2867e3
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Mar 11 12:01:58 2022 +0800
[FLINK-26544] Apply key & value filter in FileStoreScanImpl
This closes #38
---
.../store/file/operation/FileStoreScanImpl.java | 6 +-
.../store/file/operation/FileStoreScanTest.java | 72 +++++++++++++++++++---
2 files changed, 69 insertions(+), 9 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index b5d1370..ab1dd15 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -232,10 +232,12 @@ public class FileStoreScanImpl implements FileStoreScan {
}
private boolean filterManifestEntry(ManifestEntry entry) {
- // TODO apply key & value filter after field stats are collected in
- // SstFile.RollingFile#finish
return (partitionFilter == null
|| partitionFilter.test(partitionConverter.convert(entry.partition())))
+ && (keyFilter == null
+ || keyFilter.test(entry.file().rowCount(), entry.file().keyStats()))
+ && (valueFilter == null
+ || valueFilter.test(entry.file().rowCount(), entry.file().valueStats()))
&& (specifiedBucket == null || entry.bucket() == specifiedBucket);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 1c5d1b6..4a71b75 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -27,7 +27,10 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.IntType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -102,7 +105,47 @@ public class FileStoreScanTest {
wantedPartitions.contains(
gen.getPartition(kv)))
.collect(Collectors.toList()));
- runTest(scan, snapshot.id(), expected);
+ runTestExactMatch(scan, snapshot.id(), expected);
+ }
+
+ @Test
+ public void testWithKeyFilter() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<KeyValue> data = generateData(random.nextInt(1000) + 1);
+ Snapshot snapshot = writeData(data);
+
+ int wantedShopId = data.get(random.nextInt(data.size())).key().getInt(0);
+
+ FileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withKeyFilter(new Equal(0, new Literal(new IntType(false), wantedShopId)));
+
+ Map<BinaryRowData, BinaryRowData> expected =
+ store.toKvMap(
+ data.stream()
+ .filter(kv -> kv.key().getInt(0) == wantedShopId)
+ .collect(Collectors.toList()));
+ runTestContainsAll(scan, snapshot.id(), expected);
+ }
+
+ @Test
+ public void testWithValueFilter() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<KeyValue> data = generateData(random.nextInt(1000) + 1);
+ Snapshot snapshot = writeData(data);
+
+ int wantedShopId = data.get(random.nextInt(data.size())).value().getInt(2);
+
+ FileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withValueFilter(new Equal(2, new Literal(new IntType(false), wantedShopId)));
+
+ Map<BinaryRowData, BinaryRowData> expected =
+ store.toKvMap(
+ data.stream()
+ .filter(kv -> kv.value().getInt(2) == wantedShopId)
+ .collect(Collectors.toList()));
+ runTestContainsAll(scan, snapshot.id(), expected);
}
@Test
@@ -122,7 +165,7 @@ public class FileStoreScanTest {
data.stream()
.filter(kv -> getBucket(kv) == wantedBucket)
.collect(Collectors.toList()));
- runTest(scan, snapshot.id(), expected);
+ runTestExactMatch(scan, snapshot.id(), expected);
}
@Test
@@ -148,7 +191,7 @@ public class FileStoreScanTest {
allData.subList(0, wantedCommit + 1).stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
- runTest(scan, wantedSnapshot, expected);
+ runTestExactMatch(scan, wantedSnapshot, expected);
}
@Test
@@ -171,19 +214,34 @@ public class FileStoreScanTest {
List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId);
gen.sort(expectedKvs);
Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
- runTest(scan, null, expected);
+ runTestExactMatch(scan, null, expected);
}
- private void runTest(
+ private void runTestExactMatch(
FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRowData, BinaryRowData> expected)
throws Exception {
+ Map<BinaryRowData, BinaryRowData> actual = getActualKvMap(scan, expectedSnapshotId);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private void runTestContainsAll(
+ FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRowData, BinaryRowData> expected)
+ throws Exception {
+ Map<BinaryRowData, BinaryRowData> actual = getActualKvMap(scan, expectedSnapshotId);
+ for (Map.Entry<BinaryRowData, BinaryRowData> entry : expected.entrySet()) {
+ assertThat(actual).containsKey(entry.getKey());
+ assertThat(actual.get(entry.getKey())).isEqualTo(entry.getValue());
+ }
+ }
+
+ private Map<BinaryRowData, BinaryRowData> getActualKvMap(
+ FileStoreScan scan, Long expectedSnapshotId) throws Exception {
FileStoreScan.Plan plan = scan.plan();
assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
List<KeyValue> actualKvs = store.readKvsFromManifestEntries(plan.files());
gen.sort(actualKvs);
- Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
- assertThat(actual).isEqualTo(expected);
+ return store.toKvMap(actualKvs);
}
private List<KeyValue> generateData(int numRecords) {