You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/29 07:07:58 UTC
[incubator-paimon] branch master updated: [FLINK-31462] Supports full calculation from the specified snapshot in streaming mode (#624)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e8bb7d09e [FLINK-31462] Supports full calculation from the specified snapshot in streaming mode (#624)
e8bb7d09e is described below
commit e8bb7d09e83986b36fc99d26e03625f683002640
Author: liming30 <69...@users.noreply.github.com>
AuthorDate: Wed Mar 29 15:07:53 2023 +0800
[FLINK-31462] Supports full calculation from the specified snapshot in streaming mode (#624)
---
docs/content/how-to/querying-tables.md | 5 +
.../shortcodes/generated/core_configuration.html | 4 +-
.../main/java/org/apache/paimon/CoreOptions.java | 8 +-
.../org/apache/paimon/schema/SchemaValidation.java | 6 +-
.../paimon/table/AbstractFileStoreTable.java | 1 +
.../paimon/table/source/AbstractDataTableScan.java | 5 +-
.../StaticFromSnapshotStartingScanner.java | 5 +-
.../paimon/table/source/StartupModeTest.java | 264 +++++++++++++++++++++
.../table/source/snapshot/ScannerTestBase.java | 2 +-
9 files changed, 289 insertions(+), 11 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index 0d2fe6803..55863e705 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -80,6 +80,11 @@ Produces the snapshot after the latest compaction on the table upon first startu
<td>Produces a snapshot specified by "scan.snapshot-id".</td>
<td>Continuously reads changes starting from a snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning.</td>
</tr>
+<tr>
+<td>from-snapshot-full</td>
+<td>Produces a snapshot specified by "scan.snapshot-id".</td>
+<td>Produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes.</td>
+</tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index ab6802a1b..ab7b648a6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -270,7 +270,7 @@
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
- <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon f [...]
+ <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon f [...]
</tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
@@ -282,7 +282,7 @@
<td><h5>scan.snapshot-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
- <td>Optional snapshot id used in case of "from-snapshot" scan mode</td>
+ <td>Optional snapshot id used in case of "from-snapshot" or "from-snapshot-full" scan mode</td>
</tr>
<tr>
<td><h5>scan.timestamp-millis</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 149ca6f24..b88c184c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -355,7 +355,7 @@ public class CoreOptions implements Serializable {
.longType()
.noDefaultValue()
.withDescription(
- "Optional snapshot id used in case of \"from-snapshot\" scan mode");
+ "Optional snapshot id used in case of \"from-snapshot\" or \"from-snapshot-full\" scan mode");
public static final ConfigOption<Long> SCAN_BOUNDED_WATERMARK =
key("scan.bounded.watermark")
@@ -854,6 +854,12 @@ public class CoreOptions implements Serializable {
"For streaming sources, continuously reads changes "
+ "starting from snapshot specified by \"scan.snapshot-id\", "
+ "without producing a snapshot at the beginning. For batch sources, "
+ + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."),
+
+ FROM_SNAPSHOT_FULL(
+ "from-snapshot-full",
+ "For streaming sources, produces from snapshot specified by \"scan.snapshot-id\" "
+ + "on the table upon first startup, and continuously reads changes. For batch sources, "
+ "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes.");
private final String value;
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index ab738197c..f531d3993 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -67,9 +67,9 @@ public class SchemaValidation {
checkOptionExistInMode(
options, SCAN_TIMESTAMP_MILLIS, CoreOptions.StartupMode.FROM_TIMESTAMP);
checkOptionsConflict(options, SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS);
- } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
- checkOptionExistInMode(
- options, SCAN_SNAPSHOT_ID, CoreOptions.StartupMode.FROM_SNAPSHOT);
+ } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT
+ || options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
+ checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
checkOptionsConflict(options, SCAN_TIMESTAMP_MILLIS, SCAN_SNAPSHOT_ID);
} else {
checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 507b4a385..aea57e2c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -192,6 +192,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
switch (coreOptions.startupMode()) {
case FROM_SNAPSHOT:
+ case FROM_SNAPSHOT_FULL:
snapshotId = coreOptions.scanSnapshotId();
if (snapshotManager().snapshotExists(snapshotId)) {
long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index bc485b748..7c4f3bad7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -77,15 +77,16 @@ public abstract class AbstractDataTableScan implements DataTableScan {
? new ContinuousFromTimestampStartingScanner(startupMillis)
: new StaticFromTimestampStartingScanner(startupMillis);
case FROM_SNAPSHOT:
+ case FROM_SNAPSHOT_FULL:
Long snapshotId = options.scanSnapshotId();
Preconditions.checkNotNull(
snapshotId,
String.format(
"%s can not be null when you use %s for %s",
CoreOptions.SCAN_SNAPSHOT_ID.key(),
- CoreOptions.StartupMode.FROM_SNAPSHOT,
+ startupMode,
CoreOptions.SCAN_MODE.key()));
- return isStreaming
+ return isStreaming && startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT
? new ContinuousFromSnapshotStartingScanner(snapshotId)
: new StaticFromSnapshotStartingScanner(snapshotId);
default:
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index a63475035..4fee6fafd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -25,8 +25,9 @@ import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
/**
- * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
- * batch read.
+ * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch
+ * read.
*/
public class StaticFromSnapshotStartingScanner implements StartingScanner {
private final long snapshotId;
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
new file mode 100644
index 000000000..ace15b7c3
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.snapshot.ScannerTestBase;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.StartupMode;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StartupMode}. */
+public class StartupModeTest extends ScannerTestBase {
+
+ StreamTableWrite write;
+ StreamTableCommit commit;
+
+ @BeforeEach
+ @Override
+ public void before() throws Exception {
+ tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString());
+ fileIO = FileIOFinder.find(tablePath);
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @Test
+ public void testStartFromLatest() throws Exception {
+ initializeTable(StartupMode.LATEST);
+ initializeTestData(); // initialize 3 commits
+
+ // streaming Mode
+ StreamDataTableScan dataTableScan = table.newStreamScan();
+ DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+ DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+ assertThat(firstPlan.splits).isEmpty();
+ assertThat(secondPlan.splits).isEmpty();
+
+ // write next data
+ writeAndCommit(4, rowData(1, 10, 103L));
+ DataTableScan.DataFilePlan thirdPlan = dataTableScan.plan();
+ assertThat(thirdPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
+
+ // batch mode
+ BatchDataTableScan batchScan = table.newScan();
+ DataTableScan.DataFilePlan plan = batchScan.plan();
+ assertThat(plan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
+ }
+
+ @Test
+ public void testStartFromLatestFull() throws Exception {
+ initializeTable(StartupMode.LATEST_FULL);
+ initializeTestData(); // initialize 3 commits
+
+ // streaming Mode
+ StreamDataTableScan dataTableScan = table.newStreamScan();
+ DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+ DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+ assertThat(firstPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.ALL).splits());
+ assertThat(secondPlan.splits).isEmpty();
+
+ // write next data
+ writeAndCommit(4, rowData(1, 10, 103L));
+ DataTableScan.DataFilePlan thirdPlan = dataTableScan.plan();
+ assertThat(thirdPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
+
+ // batch mode
+ BatchDataTableScan batchScan = table.newScan();
+ DataTableScan.DataFilePlan plan = batchScan.plan();
+ assertThat(plan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
+ }
+
+ @Test
+ public void testStartFromTimestamp() throws Exception {
+ initializeTable(StartupMode.LATEST);
+ initializeTestData(); // initialize 3 commits
+
+ long timestamp = System.currentTimeMillis();
+ Thread.sleep(10L);
+
+ // write next data
+ writeAndCommit(4, rowData(1, 10, 103L));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString());
+ properties.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(timestamp));
+ FileStoreTable readTable = table.copy(properties);
+
+ // streaming Mode
+ StreamDataTableScan dataTableScan = readTable.newStreamScan();
+ DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+ DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+ assertThat(firstPlan.splits).isEmpty();
+ assertThat(secondPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
+
+ // batch mode
+ BatchDataTableScan batchScan = readTable.newScan();
+ DataTableScan.DataFilePlan plan = batchScan.plan();
+ assertThat(plan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.ALL).splits());
+ }
+
+ @Test
+ public void testStartFromCompactedFull() throws Exception {
+ initializeTable(StartupMode.COMPACTED_FULL);
+ initializeTestData(); // initialize 3 commits
+
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(4, write.prepareCommit(true, 4));
+ writeAndCommit(5, rowData(1, 10, 103L));
+
+ // streaming Mode
+ StreamDataTableScan dataTableScan = table.newStreamScan();
+ DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+ DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+ assertThat(firstPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
+ assertThat(secondPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(5).withKind(ScanKind.DELTA).splits());
+
+ // batch mode
+ BatchDataTableScan batchScan = table.newScan();
+ DataTableScan.DataFilePlan plan = batchScan.plan();
+ assertThat(plan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
+ }
+
+ @Test
+ public void testStartFromSnapshot() throws Exception {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+ initializeTable(StartupMode.FROM_SNAPSHOT, properties);
+ initializeTestData(); // initialize 3 commits
+
+ // streaming Mode
+ StreamDataTableScan dataTableScan = table.newStreamScan();
+ DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+ DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+ assertThat(firstPlan.splits).isEmpty();
+ assertThat(secondPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.DELTA).splits());
+
+ // batch mode
+ BatchDataTableScan batchScan = table.newScan();
+ DataTableScan.DataFilePlan plan = batchScan.plan();
+ assertThat(plan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
+ }
+
+ @Test
+ public void testStartFromSnapshotFull() throws Exception {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+ initializeTable(StartupMode.FROM_SNAPSHOT_FULL, properties);
+ initializeTestData(); // initialize 3 commits
+
+ StreamDataTableScan dataTableScan = table.newStreamScan();
+ DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
+ DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+
+ assertThat(firstPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
+ assertThat(secondPlan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.DELTA).splits());
+
+ // batch mode
+ BatchDataTableScan batchScan = table.newScan();
+ DataTableScan.DataFilePlan plan = batchScan.plan();
+ assertThat(plan.splits)
+ .isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
+ }
+
+ private void initializeTable(CoreOptions.StartupMode startupMode) throws Exception {
+ initializeTable(startupMode, Collections.emptyMap());
+ }
+
+ private void initializeTable(
+ CoreOptions.StartupMode startupMode, Map<String, String> properties) throws Exception {
+ Options options = new Options();
+ options.set(PATH, tablePath.getPath());
+ options.set(CoreOptions.SCAN_MODE, startupMode);
+ for (Map.Entry<String, String> property : properties.entrySet()) {
+ options.set(property.getKey(), property.getValue());
+ }
+ table = createFileStoreTable(options);
+ snapshotSplitReader = table.newSnapshotSplitReader();
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ }
+
+ private void initializeTestData() throws Exception {
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 30, 400L));
+ commit.commit(3, write.prepareCommit(true, 3));
+ }
+
+ private void writeAndCommit(long commitIdentifier, GenericRow... rows) throws Exception {
+ for (GenericRow row : rows) {
+ write.write(row);
+ }
+ commit.commit(commitIdentifier, write.prepareCommit(true, commitIdentifier));
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ IOUtils.closeAll(write, commit);
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index 313585606..8d4f0b671 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -64,7 +64,7 @@ public abstract class ScannerTestBase {
new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
new String[] {"pt", "a", "b"});
- @TempDir java.nio.file.Path tempDir;
+ protected @TempDir java.nio.file.Path tempDir;
protected Path tablePath;
protected FileIO fileIO;