You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/24 03:18:45 UTC

[flink-table-store] branch master updated: [FLINK-28812] Support to read compacted snapshot only

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 638c34b3 [FLINK-28812] Support to read compacted snapshot only
638c34b3 is described below

commit 638c34b39ab0b57b14cef9b050c749de7b1bf7d3
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Nov 24 11:18:41 2022 +0800

    [FLINK-28812] Support to read compacted snapshot only
    
    This closes #351
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../store/connector/source/FileStoreSource.java    |  5 +-
 .../store/connector/BatchFileStoreITCase.java      |  6 ++
 .../connector/FullCompactionFileStoreITCase.java   | 75 ++++++++++++++++++++++
 .../org/apache/flink/table/store/CoreOptions.java  | 10 +++
 .../file/operation/AbstractFileStoreScan.java      | 12 +++-
 .../table/store/file/operation/FileStoreScan.java  |  2 +
 .../table/store/file/utils/SnapshotManager.java    | 19 ++++++
 .../table/store/table/source/DataTableScan.java    |  5 ++
 .../store/table/source/SnapshotEnumerator.java     |  5 +-
 .../table/store/table/FileStoreTableTestBase.java  | 55 ++++++++++++++++
 11 files changed, 197 insertions(+), 3 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index e6efa021..1f7401ad 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -200,6 +200,12 @@
             <td>String</td>
             <td>The default partition name in case the dynamic partition column value is null/empty string.</td>
         </tr>
+        <tr>
+            <td><h5>read.compacted</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to read compacted snapshot only.</td>
+        </tr>
         <tr>
             <td><h5>sequence.field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 573e5f98..33ead92a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -78,7 +78,10 @@ public class FileStoreSource extends FlinkSource {
         Long snapshotId;
         Collection<FileStoreSourceSplit> splits;
         if (checkpoint == null) {
-            DataFilePlan plan = isContinuous ? SnapshotEnumerator.startup(scan) : scan.plan();
+            DataFilePlan plan =
+                    isContinuous
+                            ? SnapshotEnumerator.startup(scan)
+                            : scan.withReadCompacted(table.options().readCompacted()).plan();
             snapshotId = plan.snapshotId;
             splits = new FileStoreSourceSplitGenerator().createSplits(plan);
         } else {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index 60a48760..6355859f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -43,4 +43,10 @@ public class BatchFileStoreITCase extends FileStoreTableITCase {
         batchSql("INSERT OVERWRITE T SELECT * FROM T WHERE 1 <> 1");
         assertThat(batchSql("SELECT * FROM T")).isEmpty();
     }
+
+    @Test
+    public void testReadCompactedEmpty() {
+        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
+        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('read.compacted'='true') */")).isEmpty();
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
new file mode 100644
index 00000000..b17f43c3
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** SQL ITCase for continuous file store. */
+public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
+    private final String table = "T";
+
+    @Override
+    protected List<String> ddl() {
+        String options =
+                " WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')";
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS T (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+                        + options);
+    }
+
+    @Test
+    public void testStreamingRead() throws Exception {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
+
+        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+
+        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+    }
+
+    @Test
+    public void testReadCompacted() throws Exception {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM %s /*+ OPTIONS('read.compacted'='true') */", table));
+
+        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+
+        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+
+        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('read.compacted'='true') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 584bc2af..096d0b18 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -169,6 +169,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether to skip compaction on write.");
 
+    public static final ConfigOption<Boolean> READ_COMPACTED =
+            ConfigOptions.key("read.compacted")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to read compacted snapshot only.");
+
     public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
             ConfigOptions.key("source.split.target-size")
                     .memoryType()
@@ -544,6 +550,10 @@ public class CoreOptions implements Serializable {
         return options.get(WRITE_COMPACTION_SKIP);
     }
 
+    public boolean readCompacted() {
+        return options.get(READ_COMPACTED);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index bd8af8aa..551548bc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -75,6 +75,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
     private List<ManifestFileMeta> specifiedManifests = null;
     private boolean isIncremental = false;
     private Integer specifiedLevel = null;
+    private boolean readCompacted = false;
 
     public AbstractFileStoreScan(
             RowType partitionType,
@@ -175,13 +176,22 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withReadCompacted(boolean readCompacted) {
+        this.readCompacted = readCompacted;
+        return this;
+    }
+
     @Override
     public Plan plan() {
         List<ManifestFileMeta> manifests = specifiedManifests;
         Long snapshotId = specifiedSnapshotId;
         if (manifests == null) {
             if (snapshotId == null) {
-                snapshotId = snapshotManager.latestSnapshotId();
+                snapshotId =
+                        readCompacted
+                                ? snapshotManager.latestCompactedSnapshotId()
+                                : snapshotManager.latestSnapshotId();
             }
             if (snapshotId == null) {
                 manifests = Collections.emptyList();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index df3289a0..820baec0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -51,6 +51,8 @@ public interface FileStoreScan {
 
     FileStoreScan withLevel(int level);
 
+    FileStoreScan withReadCompacted(boolean readCompacted);
+
     /** Produce a {@link Plan}. */
     Plan plan();
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
index 515c32ed..8400a0cb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -90,6 +90,25 @@ public class SnapshotManager {
         }
     }
 
+    public @Nullable Long latestCompactedSnapshotId() {
+        Long latestId = latestSnapshotId();
+        Long earliestId = earliestSnapshotId();
+        if (latestId == null || earliestId == null) {
+            return null;
+        }
+
+        for (long snapshotId = latestId; snapshotId >= earliestId; snapshotId--) {
+            if (snapshotExists(snapshotId)) {
+                Snapshot snapshot = snapshot(snapshotId);
+                if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                    return snapshot.id();
+                }
+            }
+        }
+
+        return null;
+    }
+
     /**
      * Returns a snapshot earlier than the timestamp mills. A non-existent snapshot may be returned
      * if all snapshots are later than the timestamp mills.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index e155ec1f..9cdda41b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -103,6 +103,11 @@ public abstract class DataTableScan implements TableScan {
         return this;
     }
 
+    public DataTableScan withReadCompacted(boolean readCompacted) {
+        scan.withReadCompacted(readCompacted);
+        return this;
+    }
+
     @VisibleForTesting
     public DataTableScan withBucket(int bucket) {
         scan.withBucket(bucket);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
index 067ef812..e42dcbda 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
@@ -122,7 +122,10 @@ public class SnapshotEnumerator implements Callable<SnapshotEnumerator.Enumerato
                 if (options.changelogProducer() == FULL_COMPACTION) {
                     // Read the results of the last full compaction.
                     // Only full compaction results will appear on the max level.
-                    plan = scan.withLevel(options.numLevels() - 1).plan();
+                    plan =
+                            scan.withReadCompacted(options.readCompacted())
+                                    .withLevel(options.numLevels() - 1)
+                                    .plan();
                 } else {
                     plan = scan.plan();
                 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 58c7d088..95a2ad2b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
@@ -42,6 +43,7 @@ import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -58,6 +60,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -316,6 +319,58 @@ public abstract class FileStoreTableTestBase {
         }
     }
 
+    @Test
+    public void testReadCompactedSnapshot() throws Exception {
+        writeCompactData();
+        FileStoreTable table = createFileStoreTable();
+
+        DataTableScan.DataFilePlan plan = table.newScan().withReadCompacted(true).plan();
+        Snapshot compactedSnapshot = table.snapshotManager().snapshot(plan.snapshotId);
+        Iterator<Snapshot> snapshotIterator = table.snapshotManager().snapshots();
+        while (snapshotIterator.hasNext()) {
+            Snapshot snapshot = snapshotIterator.next();
+            if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                assertThat(snapshot.id()).isLessThanOrEqualTo(compactedSnapshot.id());
+            }
+        }
+
+        assertThat(compactedSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        List<Split> splits = plan.splits();
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING).size())
+                .isGreaterThan(0);
+        assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING).size())
+                .isGreaterThan(0);
+
+        List<Split> compactedSplits =
+                table.newScan().withSnapshot(compactedSnapshot.id()).plan().splits();
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+                .isEqualTo(getResult(read, compactedSplits, binaryRow(1), 0, BATCH_ROW_TO_STRING));
+        assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
+                .isEqualTo(getResult(read, compactedSplits, binaryRow(2), 0, BATCH_ROW_TO_STRING));
+    }
+
+    private void writeCompactData() throws Exception {
+        final int batchCount = 10;
+        final int countPerBatch = 10000;
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf ->
+                                conf.set(CoreOptions.BUCKET, 1)
+                                        .set(CoreOptions.READ_COMPACTED, true)
+                                        .set(CoreOptions.COMPACTION_MAX_FILE_NUM, 10));
+        TableWrite write = table.newWrite("user");
+        TableCommit commit = table.newCommit("user");
+
+        for (int i = 0; i < batchCount; i++) {
+            for (int j = 0; j < countPerBatch; j++) {
+                write.write(rowData(j % 2 + 1, i * j, j * 10L));
+            }
+            commit.commit(i, write.prepareCommit(true, i));
+        }
+        write.close();
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,