You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/29 07:07:58 UTC

[incubator-paimon] branch master updated: [FLINK-31462] Supports full calculation from the specified snapshot in streaming mode (#624)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e8bb7d09e [FLINK-31462] Supports full calculation from the specified snapshot in streaming mode (#624)
e8bb7d09e is described below

commit e8bb7d09e83986b36fc99d26e03625f683002640
Author: liming30 <69...@users.noreply.github.com>
AuthorDate: Wed Mar 29 15:07:53 2023 +0800

    [FLINK-31462] Supports full calculation from the specified snapshot in streaming mode (#624)
---
 docs/content/how-to/querying-tables.md             |   5 +
 .../shortcodes/generated/core_configuration.html   |   4 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   8 +-
 .../org/apache/paimon/schema/SchemaValidation.java |   6 +-
 .../paimon/table/AbstractFileStoreTable.java       |   1 +
 .../paimon/table/source/AbstractDataTableScan.java |   5 +-
 .../StaticFromSnapshotStartingScanner.java         |   5 +-
 .../paimon/table/source/StartupModeTest.java       | 264 +++++++++++++++++++++
 .../table/source/snapshot/ScannerTestBase.java     |   2 +-
 9 files changed, 289 insertions(+), 11 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index 0d2fe6803..55863e705 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -80,6 +80,11 @@ Produces the snapshot after the latest compaction on the table upon first startu
 <td>Produces a snapshot specified by "scan.snapshot-id".</td>
 <td>Continuously reads changes starting from a snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning.</td>
 </tr>
+<tr>
+<td>from-snapshot-full</td>
+<td>Produces a snapshot specified by "scan.snapshot-id".</td>
+<td>Produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes.</td>
+</tr>
 </tbody>
 </table>
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index ab6802a1b..ab7b648a6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -270,7 +270,7 @@
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">default</td>
             <td><p>Enum</p></td>
-            <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon f [...]
+            <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon f [...]
         </tr>
         <tr>
             <td><h5>scan.plan-sort-partition</h5></td>
@@ -282,7 +282,7 @@
             <td><h5>scan.snapshot-id</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Long</td>
-            <td>Optional snapshot id used in case of "from-snapshot" scan mode</td>
+            <td>Optional snapshot id used in case of "from-snapshot" or "from-snapshot-full" scan mode</td>
         </tr>
         <tr>
             <td><h5>scan.timestamp-millis</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 149ca6f24..b88c184c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -355,7 +355,7 @@ public class CoreOptions implements Serializable {
                     .longType()
                     .noDefaultValue()
                     .withDescription(
-                            "Optional snapshot id used in case of \"from-snapshot\" scan mode");
+                            "Optional snapshot id used in case of \"from-snapshot\" or \"from-snapshot-full\" scan mode");
 
     public static final ConfigOption<Long> SCAN_BOUNDED_WATERMARK =
             key("scan.bounded.watermark")
@@ -854,6 +854,12 @@ public class CoreOptions implements Serializable {
                 "For streaming sources, continuously reads changes "
                         + "starting from snapshot specified by \"scan.snapshot-id\", "
                         + "without producing a snapshot at the beginning. For batch sources, "
+                        + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."),
+
+        FROM_SNAPSHOT_FULL(
+                "from-snapshot-full",
+                "For streaming sources, produces from snapshot specified by \"scan.snapshot-id\" "
+                        + "on the table upon first startup, and continuously reads changes. For batch sources, "
                         + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes.");
 
         private final String value;
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index ab738197c..f531d3993 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -67,9 +67,9 @@ public class SchemaValidation {
             checkOptionExistInMode(
                     options, SCAN_TIMESTAMP_MILLIS, CoreOptions.StartupMode.FROM_TIMESTAMP);
             checkOptionsConflict(options, SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS);
-        } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
-            checkOptionExistInMode(
-                    options, SCAN_SNAPSHOT_ID, CoreOptions.StartupMode.FROM_SNAPSHOT);
+        } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT
+                || options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
+            checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
             checkOptionsConflict(options, SCAN_TIMESTAMP_MILLIS, SCAN_SNAPSHOT_ID);
         } else {
             checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 507b4a385..aea57e2c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -192,6 +192,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
 
         switch (coreOptions.startupMode()) {
             case FROM_SNAPSHOT:
+            case FROM_SNAPSHOT_FULL:
                 snapshotId = coreOptions.scanSnapshotId();
                 if (snapshotManager().snapshotExists(snapshotId)) {
                     long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index bc485b748..7c4f3bad7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -77,15 +77,16 @@ public abstract class AbstractDataTableScan implements DataTableScan {
                         ? new ContinuousFromTimestampStartingScanner(startupMillis)
                         : new StaticFromTimestampStartingScanner(startupMillis);
             case FROM_SNAPSHOT:
+            case FROM_SNAPSHOT_FULL:
                 Long snapshotId = options.scanSnapshotId();
                 Preconditions.checkNotNull(
                         snapshotId,
                         String.format(
                                 "%s can not be null when you use %s for %s",
                                 CoreOptions.SCAN_SNAPSHOT_ID.key(),
-                                CoreOptions.StartupMode.FROM_SNAPSHOT,
+                                startupMode,
                                 CoreOptions.SCAN_MODE.key()));
-                return isStreaming
+                return isStreaming && startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT
                         ? new ContinuousFromSnapshotStartingScanner(snapshotId)
                         : new StaticFromSnapshotStartingScanner(snapshotId);
             default:
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index a63475035..4fee6fafd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -25,8 +25,9 @@ import org.apache.paimon.utils.SnapshotManager;
 import javax.annotation.Nullable;
 
 /**
- * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
- * batch read.
+ * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch
+ * read.
  */
 public class StaticFromSnapshotStartingScanner implements StartingScanner {
     private final long snapshotId;
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
new file mode 100644
index 000000000..ace15b7c3
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.snapshot.ScannerTestBase;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.StartupMode;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StartupMode}. */
+public class StartupModeTest extends ScannerTestBase {
+
+    StreamTableWrite write;
+    StreamTableCommit commit;
+
+    @BeforeEach
+    @Override
+    public void before() throws Exception {
+        tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString());
+        fileIO = FileIOFinder.find(tablePath);
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @Test
+    public void testStartFromLatest() throws Exception {
+        initializeTable(StartupMode.LATEST);
+        initializeTestData(); // initialize 3 commits
+
+        // streaming Mode
+        StreamDataTableScan dataTableScan = table.newStreamScan();
+        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+        assertThat(firstPlan.splits).isEmpty();
+        assertThat(secondPlan.splits).isEmpty();
+
+        // write next data
+        writeAndCommit(4, rowData(1, 10, 103L));
+        DataTableScan.DataFilePlan thirdPlan = dataTableScan.plan();
+        assertThat(thirdPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
+
+        // batch mode
+        BatchDataTableScan batchScan = table.newScan();
+        DataTableScan.DataFilePlan plan = batchScan.plan();
+        assertThat(plan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
+    }
+
+    @Test
+    public void testStartFromLatestFull() throws Exception {
+        initializeTable(StartupMode.LATEST_FULL);
+        initializeTestData(); // initialize 3 commits
+
+        // streaming Mode
+        StreamDataTableScan dataTableScan = table.newStreamScan();
+        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+        assertThat(firstPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.ALL).splits());
+        assertThat(secondPlan.splits).isEmpty();
+
+        // write next data
+        writeAndCommit(4, rowData(1, 10, 103L));
+        DataTableScan.DataFilePlan thirdPlan = dataTableScan.plan();
+        assertThat(thirdPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
+
+        // batch mode
+        BatchDataTableScan batchScan = table.newScan();
+        DataTableScan.DataFilePlan plan = batchScan.plan();
+        assertThat(plan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
+    }
+
+    @Test
+    public void testStartFromTimestamp() throws Exception {
+        initializeTable(StartupMode.LATEST);
+        initializeTestData(); // initialize 3 commits
+
+        long timestamp = System.currentTimeMillis();
+        Thread.sleep(10L);
+
+        // write next data
+        writeAndCommit(4, rowData(1, 10, 103L));
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString());
+        properties.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(timestamp));
+        FileStoreTable readTable = table.copy(properties);
+
+        // streaming Mode
+        StreamDataTableScan dataTableScan = readTable.newStreamScan();
+        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+        assertThat(firstPlan.splits).isEmpty();
+        assertThat(secondPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
+
+        // batch mode
+        BatchDataTableScan batchScan = readTable.newScan();
+        DataTableScan.DataFilePlan plan = batchScan.plan();
+        assertThat(plan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.ALL).splits());
+    }
+
+    @Test
+    public void testStartFromCompactedFull() throws Exception {
+        initializeTable(StartupMode.COMPACTED_FULL);
+        initializeTestData(); // initialize 3 commits
+
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(4, write.prepareCommit(true, 4));
+        writeAndCommit(5, rowData(1, 10, 103L));
+
+        // streaming Mode
+        StreamDataTableScan dataTableScan = table.newStreamScan();
+        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+        assertThat(firstPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
+        assertThat(secondPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(5).withKind(ScanKind.DELTA).splits());
+
+        // batch mode
+        BatchDataTableScan batchScan = table.newScan();
+        DataTableScan.DataFilePlan plan = batchScan.plan();
+        assertThat(plan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
+    }
+
+    @Test
+    public void testStartFromSnapshot() throws Exception {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+        initializeTable(StartupMode.FROM_SNAPSHOT, properties);
+        initializeTestData(); // initialize 3 commits
+
+        // streaming Mode
+        StreamDataTableScan dataTableScan = table.newStreamScan();
+        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+        assertThat(firstPlan.splits).isEmpty();
+        assertThat(secondPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.DELTA).splits());
+
+        // batch mode
+        BatchDataTableScan batchScan = table.newScan();
+        DataTableScan.DataFilePlan plan = batchScan.plan();
+        assertThat(plan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
+    }
+
+    @Test
+    public void testStartFromSnapshotFull() throws Exception {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+        initializeTable(StartupMode.FROM_SNAPSHOT_FULL, properties);
+        initializeTestData(); // initialize 3 commits
+
+        StreamDataTableScan dataTableScan = table.newStreamScan();
+        DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+        DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+        assertThat(firstPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
+        assertThat(secondPlan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.DELTA).splits());
+
+        // batch mode
+        BatchDataTableScan batchScan = table.newScan();
+        DataTableScan.DataFilePlan plan = batchScan.plan();
+        assertThat(plan.splits)
+                .isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
+    }
+
+    private void initializeTable(CoreOptions.StartupMode startupMode) throws Exception {
+        initializeTable(startupMode, Collections.emptyMap());
+    }
+
+    private void initializeTable(
+            CoreOptions.StartupMode startupMode, Map<String, String> properties) throws Exception {
+        Options options = new Options();
+        options.set(PATH, tablePath.getPath());
+        options.set(CoreOptions.SCAN_MODE, startupMode);
+        for (Map.Entry<String, String> property : properties.entrySet()) {
+            options.set(property.getKey(), property.getValue());
+        }
+        table = createFileStoreTable(options);
+        snapshotSplitReader = table.newSnapshotSplitReader();
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+    }
+
+    private void initializeTestData() throws Exception {
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 30, 400L));
+        commit.commit(3, write.prepareCommit(true, 3));
+    }
+
+    private void writeAndCommit(long commitIdentifier, GenericRow... rows) throws Exception {
+        for (GenericRow row : rows) {
+            write.write(row);
+        }
+        commit.commit(commitIdentifier, write.prepareCommit(true, commitIdentifier));
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        IOUtils.closeAll(write, commit);
+    }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index 313585606..8d4f0b671 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -64,7 +64,7 @@ public abstract class ScannerTestBase {
                     new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
                     new String[] {"pt", "a", "b"});
 
-    @TempDir java.nio.file.Path tempDir;
+    protected @TempDir java.nio.file.Path tempDir;
 
     protected Path tablePath;
     protected FileIO fileIO;