You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/11 04:02:03 UTC

[flink-table-store] branch master updated: [FLINK-26544] Apply key & value filter in FileStoreScanImpl

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new f69c037  [FLINK-26544] Apply key & value filter in FileStoreScanImpl
f69c037 is described below

commit f69c03757fb56af33020d21d226355c98f2867e3
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Mar 11 12:01:58 2022 +0800

    [FLINK-26544] Apply key & value filter in FileStoreScanImpl
    
    This closes #38
---
 .../store/file/operation/FileStoreScanImpl.java    |  6 +-
 .../store/file/operation/FileStoreScanTest.java    | 72 +++++++++++++++++++---
 2 files changed, 69 insertions(+), 9 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index b5d1370..ab1dd15 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -232,10 +232,12 @@ public class FileStoreScanImpl implements FileStoreScan {
     }
 
     private boolean filterManifestEntry(ManifestEntry entry) {
-        // TODO apply key & value filter after field stats are collected in
-        //  SstFile.RollingFile#finish
         return (partitionFilter == null
                         || partitionFilter.test(partitionConverter.convert(entry.partition())))
+                && (keyFilter == null
+                        || keyFilter.test(entry.file().rowCount(), entry.file().keyStats()))
+                && (valueFilter == null
+                        || valueFilter.test(entry.file().rowCount(), entry.file().valueStats()))
                 && (specifiedBucket == null || entry.bucket() == specifiedBucket);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 1c5d1b6..4a71b75 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -27,7 +27,10 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.IntType;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -102,7 +105,47 @@ public class FileStoreScanTest {
                                                         wantedPartitions.contains(
                                                                 gen.getPartition(kv)))
                                         .collect(Collectors.toList()));
-        runTest(scan, snapshot.id(), expected);
+        runTestExactMatch(scan, snapshot.id(), expected);
+    }
+
+    @Test
+    public void testWithKeyFilter() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<KeyValue> data = generateData(random.nextInt(1000) + 1);
+        Snapshot snapshot = writeData(data);
+
+        int wantedShopId = data.get(random.nextInt(data.size())).key().getInt(0);
+
+        FileStoreScan scan = store.newScan();
+        scan.withSnapshot(snapshot.id());
+        scan.withKeyFilter(new Equal(0, new Literal(new IntType(false), wantedShopId)));
+
+        Map<BinaryRowData, BinaryRowData> expected =
+                store.toKvMap(
+                        data.stream()
+                                .filter(kv -> kv.key().getInt(0) == wantedShopId)
+                                .collect(Collectors.toList()));
+        runTestContainsAll(scan, snapshot.id(), expected);
+    }
+
+    @Test
+    public void testWithValueFilter() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<KeyValue> data = generateData(random.nextInt(1000) + 1);
+        Snapshot snapshot = writeData(data);
+
+        int wantedShopId = data.get(random.nextInt(data.size())).value().getInt(2);
+
+        FileStoreScan scan = store.newScan();
+        scan.withSnapshot(snapshot.id());
+        scan.withValueFilter(new Equal(2, new Literal(new IntType(false), wantedShopId)));
+
+        Map<BinaryRowData, BinaryRowData> expected =
+                store.toKvMap(
+                        data.stream()
+                                .filter(kv -> kv.value().getInt(2) == wantedShopId)
+                                .collect(Collectors.toList()));
+        runTestContainsAll(scan, snapshot.id(), expected);
     }
 
     @Test
@@ -122,7 +165,7 @@ public class FileStoreScanTest {
                         data.stream()
                                 .filter(kv -> getBucket(kv) == wantedBucket)
                                 .collect(Collectors.toList()));
-        runTest(scan, snapshot.id(), expected);
+        runTestExactMatch(scan, snapshot.id(), expected);
     }
 
     @Test
@@ -148,7 +191,7 @@ public class FileStoreScanTest {
                         allData.subList(0, wantedCommit + 1).stream()
                                 .flatMap(Collection::stream)
                                 .collect(Collectors.toList()));
-        runTest(scan, wantedSnapshot, expected);
+        runTestExactMatch(scan, wantedSnapshot, expected);
     }
 
     @Test
@@ -171,19 +214,34 @@ public class FileStoreScanTest {
         List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId);
         gen.sort(expectedKvs);
         Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
-        runTest(scan, null, expected);
+        runTestExactMatch(scan, null, expected);
     }
 
-    private void runTest(
+    private void runTestExactMatch(
             FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRowData, BinaryRowData> expected)
             throws Exception {
+        Map<BinaryRowData, BinaryRowData> actual = getActualKvMap(scan, expectedSnapshotId);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private void runTestContainsAll(
+            FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRowData, BinaryRowData> expected)
+            throws Exception {
+        Map<BinaryRowData, BinaryRowData> actual = getActualKvMap(scan, expectedSnapshotId);
+        for (Map.Entry<BinaryRowData, BinaryRowData> entry : expected.entrySet()) {
+            assertThat(actual).containsKey(entry.getKey());
+            assertThat(actual.get(entry.getKey())).isEqualTo(entry.getValue());
+        }
+    }
+
+    private Map<BinaryRowData, BinaryRowData> getActualKvMap(
+            FileStoreScan scan, Long expectedSnapshotId) throws Exception {
         FileStoreScan.Plan plan = scan.plan();
         assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
 
         List<KeyValue> actualKvs = store.readKvsFromManifestEntries(plan.files());
         gen.sort(actualKvs);
-        Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
-        assertThat(actual).isEqualTo(expected);
+        return store.toKvMap(actualKvs);
     }
 
     private List<KeyValue> generateData(int numRecords) {