You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/12/08 11:51:44 UTC
[flink-table-store] branch master updated: [FLINK-30293] Introduce StaticDataFileSnapshotEnumerator and change read.compacted as a scan mode
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e7690d7c [FLINK-30293] Introduce StaticDataFileSnapshotEnumerator and change read.compacted as a scan mode
e7690d7c is described below
commit e7690d7c40c9a4371feafc170a147c5b03f70358
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Dec 8 19:51:39 2022 +0800
[FLINK-30293] Introduce StaticDataFileSnapshotEnumerator and change read.compacted as a scan mode
This closes #425
---
.../shortcodes/generated/core_configuration.html | 8 +-
.../source/ContinuousFileStoreSource.java | 4 +-
.../connector/source/StaticFileStoreSource.java | 22 +-
.../store/connector/BatchFileStoreITCase.java | 4 +-
.../connector/FullCompactionFileStoreITCase.java | 6 +-
.../source/StaticFileStoreSplitEnumeratorTest.java | 108 ---------
.../org/apache/flink/table/store/CoreOptions.java | 23 +-
.../table/store/file/AppendOnlyFileStore.java | 3 +-
.../flink/table/store/file/KeyValueFileStore.java | 3 +-
.../file/operation/AbstractFileStoreScan.java | 10 +-
.../file/operation/AppendOnlyFileStoreScan.java | 6 +-
.../file/operation/KeyValueFileStoreScan.java | 6 +-
.../store/table/source/TableStreamingReader.java | 13 +-
.../source/snapshot/CompactedStartingScanner.java | 45 ++++
...ava => CompactionChangelogFollowUpScanner.java} | 32 +--
.../ContinuousDataFileSnapshotEnumerator.java | 147 ++++++++++++
.../ContinuousFromTimestampStartingScanner.java | 54 +++++
.../snapshot/ContinuousLatestStartingScanner.java | 47 ++++
.../snapshot/DataFileSnapshotEnumerator.java | 159 -------------
...otEnumerator.java => DeltaFollowUpScanner.java} | 27 +--
.../table/source/snapshot/FollowUpScanner.java | 30 +++
.../table/source/snapshot/FullStartingScanner.java | 45 ++++
...tor.java => InputChangelogFollowUpScanner.java} | 31 +--
.../table/source/snapshot/StartingScanner.java | 28 +++
.../snapshot/StaticDataFileSnapshotEnumerator.java | 77 +++++++
.../table/ChangelogWithKeyFileStoreTableTest.java | 12 +-
.../table/store/table/FileStoreTableTestBase.java | 56 -----
.../snapshot/CompactedStartingScannerTest.java | 99 ++++++++
.../CompactionChangelogFollowUpScannerTest.java | 111 +++++++++
... ContinuousDataFileSnapshotEnumeratorTest.java} | 129 +++--------
...ContinuousFromTimestampStartingScannerTest.java | 107 +++++++++
.../ContinuousLatestStartingScannerTest.java | 70 ++++++
.../source/snapshot/DeltaFollowUpScannerTest.java | 87 +++++++
...lCompactionChangelogSnapshotEnumeratorTest.java | 256 ---------------------
.../source/snapshot/FullStartingScannerTest.java | 73 ++++++
.../InputChangelogFollowUpScannerTest.java | 97 ++++++++
.../InputChangelogSnapshotEnumeratorTest.java | 239 -------------------
...stBase.java => SnapshotEnumeratorTestBase.java} | 11 +-
.../StaticDataFileSnapshotEnumeratorTest.java | 72 ++++++
39 files changed, 1313 insertions(+), 1044 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 14c721f3..0d7d205b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -188,17 +188,11 @@
<td>String</td>
<td>The default partition name in case the dynamic partition column value is null/empty string.</td>
</tr>
- <tr>
- <td><h5>read.compacted</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to read compacted snapshot only.</td>
- </tr>
<tr>
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">deafult</td>
<td><p>Enum</p></td>
- <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"deafult": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "full".</li><li>"full": For streaming sources, produces a snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot but doe [...]
+ <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"deafult": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "full".</li><li>"full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the lates [...]
</tr>
<tr>
<td><h5>scan.timestamp-millis</h5></td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
index 581d746c..bf9dd90d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.snapshot.DataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import javax.annotation.Nullable;
@@ -76,6 +76,6 @@ public class ContinuousFileStoreSource extends FlinkSource {
splits,
nextSnapshotId,
discoveryInterval,
- DataFileSnapshotEnumerator.create(table, scan, nextSnapshotId));
+ ContinuousDataFileSnapshotEnumerator.create(table, scan, nextSnapshotId));
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
index 2223bbad..e3f67955 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
@@ -26,9 +26,12 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.StaticDataFileSnapshotEnumerator;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collection;
/** Bounded {@link FlinkSource} for reading records. It does not monitor new snapshots. */
@@ -62,12 +65,23 @@ public class StaticFileStoreSource extends FlinkSource {
scan.withFilter(predicate);
}
- Long snapshotId;
+ Long snapshotId = null;
Collection<FileStoreSourceSplit> splits;
if (checkpoint == null) {
- DataTableScan.DataFilePlan plan = scan.plan();
- snapshotId = plan.snapshotId;
- splits = new FileStoreSourceSplitGenerator().createSplits(plan);
+ splits = new ArrayList<>();
+ FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
+
+ // read all splits from the enumerator in one go
+ SnapshotEnumerator snapshotEnumerator =
+ StaticDataFileSnapshotEnumerator.create(table, table.newScan());
+ while (true) {
+ DataTableScan.DataFilePlan plan = snapshotEnumerator.enumerate();
+ if (plan == null) {
+ break;
+ }
+ snapshotId = plan.snapshotId;
+ splits.addAll(splitGenerator.createSplits(plan));
+ }
} else {
// restore from checkpoint
snapshotId = checkpoint.currentSnapshotId();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index 6355859f..adb21bb9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -45,8 +45,8 @@ public class BatchFileStoreITCase extends FileStoreTableITCase {
}
@Test
- public void testReadCompactedEmpty() {
+ public void testCompactedScanModeEmpty() {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
- assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('read.compacted'='true') */")).isEmpty();
+ assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') */")).isEmpty();
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
index b17f43c3..81d7f3bc 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
@@ -55,11 +55,11 @@ public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
}
@Test
- public void testReadCompacted() throws Exception {
+ public void testCompactedScanMode() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
- "SELECT * FROM %s /*+ OPTIONS('read.compacted'='true') */", table));
+ "SELECT * FROM %s /*+ OPTIONS('scan.mode'='compacted') */", table));
batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
assertThat(iterator.collect(2))
@@ -68,7 +68,7 @@ public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
- assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('read.compacted'='true') */"))
+ assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') */"))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
deleted file mode 100644
index 8b28c155..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.source;
-
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-
-import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
-import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
-import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */
-public class StaticFileStoreSplitEnumeratorTest {
-
- @Test
- public void testCheckpointNoSplitRequested() {
- final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
- new TestingSplitEnumeratorContext<>(4);
- final FileStoreSourceSplit split = createRandomSplit();
- final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split);
-
- final PendingSplitsCheckpoint checkpoint = enumerator.snapshotState(1L);
-
- assertThat(checkpoint.splits()).contains(split);
- }
-
- @Test
- public void testSplitRequestForRegisteredReader() {
- final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
- new TestingSplitEnumeratorContext<>(4);
- final FileStoreSourceSplit split = createRandomSplit();
- final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split);
-
- context.registerReader(3, "somehost");
- enumerator.addReader(3);
- enumerator.handleSplitRequest(3, "somehost");
-
- assertThat(enumerator.snapshotState(1L).splits()).isEmpty();
- assertThat(context.getSplitAssignments().get(3).getAssignedSplits()).contains(split);
- }
-
- @Test
- public void testSplitRequestForNonRegisteredReader() {
- final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
- new TestingSplitEnumeratorContext<>(4);
- final FileStoreSourceSplit split = createRandomSplit();
- final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split);
-
- enumerator.handleSplitRequest(3, "somehost");
-
- assertThat(context.getSplitAssignments().containsKey(3)).isFalse();
- assertThat(enumerator.snapshotState(1L).splits()).contains(split);
- }
-
- @Test
- public void testNoMoreSplits() {
- final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
- new TestingSplitEnumeratorContext<>(4);
- final FileStoreSourceSplit split = createRandomSplit();
- final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split);
-
- // first split assignment
- context.registerReader(1, "somehost");
- enumerator.addReader(1);
- enumerator.handleSplitRequest(1, "somehost");
-
- // second request has no more split
- enumerator.handleSplitRequest(1, "somehost");
-
- assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
- assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue();
- }
-
- // ------------------------------------------------------------------------
- // test setup helpers
- // ------------------------------------------------------------------------
-
- private static FileStoreSourceSplit createRandomSplit() {
- return newSourceSplit("split", row(1), 2, Arrays.asList(newFile(0), newFile(1)));
- }
-
- private static StaticFileStoreSplitEnumerator createEnumerator(
- final SplitEnumeratorContext<FileStoreSourceSplit> context,
- final FileStoreSourceSplit... splits) {
- return new StaticFileStoreSplitEnumerator(context, null, Arrays.asList(splits));
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 7f43b0bb..744e7dee 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -173,12 +173,6 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to skip compaction on write.");
- public static final ConfigOption<Boolean> READ_COMPACTED =
- ConfigOptions.key("read.compacted")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to read compacted snapshot only.");
-
public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
ConfigOptions.key("source.split.target-size")
.memoryType()
@@ -569,10 +563,6 @@ public class CoreOptions implements Serializable {
return options.get(WRITE_COMPACTION_SKIP);
}
- public boolean readCompacted() {
- return options.get(READ_COMPACTED);
- }
-
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
@@ -610,9 +600,9 @@ public class CoreOptions implements Serializable {
FULL(
"full",
- "For streaming sources, produces a snapshot on the table upon first startup,"
- + " and continue to read the latest changes. "
- + "For batch sources, just produce a snapshot but does not read new changes."),
+ "For streaming sources, produces the latest snapshot on the table upon first startup, "
+ + "and continue to read the latest changes. "
+ + "For batch sources, just produce the latest snapshot but does not read new changes."),
LATEST(
"latest",
@@ -620,6 +610,13 @@ public class CoreOptions implements Serializable {
+ "without producing a snapshot at the beginning. "
+ "For batch sources, behaves the same as the \"full\" startup mode."),
+ COMPACTED(
+ "compacted",
+ "For streaming sources, produces a snapshot after the latest compaction on the table "
+ + "upon first startup, and continue to read the latest changes. "
+ + "For batch sources, just produce a snapshot after the latest compaction "
+ + "but does not read new changes."),
+
FROM_TIMESTAMP(
"from-timestamp",
"For streaming sources, continuously reads changes "
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index d9359d4e..0d889213 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -81,8 +81,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
manifestFileFactory(),
manifestListFactory(),
options.bucket(),
- checkNumOfBuckets,
- options.readCompacted());
+ checkNumOfBuckets);
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 802c1944..780eabb4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -111,8 +111,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
manifestFileFactory(),
manifestListFactory(),
options.bucket(),
- checkNumOfBuckets,
- options.readCompacted());
+ checkNumOfBuckets);
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index e7e563e0..3aafa8d8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -60,7 +60,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private final ManifestList manifestList;
private final int numOfBuckets;
private final boolean checkNumOfBuckets;
- private final boolean readCompacted;
private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final SchemaManager schemaManager;
@@ -84,8 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
- boolean checkNumOfBuckets,
- boolean readCompacted) {
+ boolean checkNumOfBuckets) {
this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType);
this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
Preconditions.checkArgument(
@@ -98,7 +96,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
this.checkNumOfBuckets = checkNumOfBuckets;
- this.readCompacted = readCompacted;
this.tableSchemas = new ConcurrentHashMap<>();
}
@@ -180,10 +177,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
Long snapshotId = specifiedSnapshotId;
if (manifests == null) {
if (snapshotId == null) {
- snapshotId =
- readCompacted
- ? snapshotManager.latestCompactedSnapshotId()
- : snapshotManager.latestSnapshotId();
+ snapshotId = snapshotManager.latestSnapshotId();
}
if (snapshotId == null) {
manifests = Collections.emptyList();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index 86bcca3b..ac65b0ab 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -55,8 +55,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
- boolean checkNumOfBuckets,
- boolean readCompacted) {
+ boolean checkNumOfBuckets) {
super(
partitionType,
bucketKeyType,
@@ -66,8 +65,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
manifestFileFactory,
manifestListFactory,
numOfBuckets,
- checkNumOfBuckets,
- readCompacted);
+ checkNumOfBuckets);
this.schemaRowStatsConverters = new ConcurrentHashMap<>();
this.rowType = rowType;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index 68e44f0a..25391021 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -60,8 +60,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
- boolean checkNumOfBuckets,
- boolean readCompacted) {
+ boolean checkNumOfBuckets) {
super(
partitionType,
bucketKeyType,
@@ -71,8 +70,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
manifestFileFactory,
manifestListFactory,
numOfBuckets,
- checkNumOfBuckets,
- readCompacted);
+ checkNumOfBuckets);
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
this.schemaKeyStatsConverters = new ConcurrentHashMap<>();
this.keyType = keyType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index b875d220..0a830780 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -19,13 +19,14 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateFilter;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.source.snapshot.DeltaSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.DeltaFollowUpScanner;
+import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.apache.flink.table.store.utils.TypeUtils;
@@ -88,8 +89,12 @@ public class TableStreamingReader {
scan.withFilter(predicate);
}
enumerator =
- new DeltaSnapshotEnumerator(
- table.location(), scan, CoreOptions.StartupMode.FULL, null, null);
+ new ContinuousDataFileSnapshotEnumerator(
+ table.location(),
+ scan,
+ new FullStartingScanner(),
+ new DeltaFollowUpScanner(),
+ null);
}
@Nullable
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
new file mode 100644
index 00000000..1eea117c
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#COMPACTED} startup mode.
+ */
+public class CompactedStartingScanner implements StartingScanner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactedStartingScanner.class);
+
+ @Override
+ public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+ Long startingSnapshotId = snapshotManager.latestCompactedSnapshotId();
+ if (startingSnapshotId == null) {
+ LOG.debug("There is currently no compact snapshot. Waiting for snapshot generation.");
+ return null;
+ }
+ return scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScanner.java
similarity index 54%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScanner.java
index ed9e5f16..153bb59b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScanner.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.table.source.snapshot;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.table.source.DataTableScan;
@@ -27,33 +25,17 @@ import org.apache.flink.table.store.table.source.DataTableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
/**
- * A {@link DataFileSnapshotEnumerator} which scans incremental changes in {@link
- * Snapshot#changelogManifestList()} for each newly created snapshots.
- *
- * <p>This enumerator looks for changelog files produced from full compaction. Can only be used when
- * {@link CoreOptions#CHANGELOG_PRODUCER} is set to {@link
- * CoreOptions.ChangelogProducer#FULL_COMPACTION}.
+ * {@link FollowUpScanner} for {@link
+ * org.apache.flink.table.store.CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog producer.
*/
-public class FullCompactionChangelogSnapshotEnumerator extends DataFileSnapshotEnumerator {
+public class CompactionChangelogFollowUpScanner implements FollowUpScanner {
private static final Logger LOG =
- LoggerFactory.getLogger(FullCompactionChangelogSnapshotEnumerator.class);
-
- public FullCompactionChangelogSnapshotEnumerator(
- Path tablePath,
- DataTableScan scan,
- int maxLevel,
- CoreOptions.StartupMode startupMode,
- @Nullable Long startupMillis,
- @Nullable Long nextSnapshotId) {
- super(tablePath, scan.withLevel(maxLevel), startupMode, startupMillis, nextSnapshotId);
- }
+ LoggerFactory.getLogger(CompactionChangelogFollowUpScanner.class);
@Override
- protected boolean shouldReadSnapshot(Snapshot snapshot) {
+ public boolean shouldScanSnapshot(Snapshot snapshot) {
if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
return true;
}
@@ -66,7 +48,7 @@ public class FullCompactionChangelogSnapshotEnumerator extends DataFileSnapshotE
}
@Override
- protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
- return scan.withKind(ScanKind.CHANGELOG).plan();
+ public DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan) {
+ return scan.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).plan();
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
new file mode 100644
index 00000000..326f7e62
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/** {@link SnapshotEnumerator} for streaming read. */
+public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContinuousDataFileSnapshotEnumerator.class);
+
+ private final SnapshotManager snapshotManager;
+ private final DataTableScan scan;
+ private final StartingScanner startingScanner;
+ private final FollowUpScanner followUpScanner;
+
+ private @Nullable Long nextSnapshotId;
+
+ public ContinuousDataFileSnapshotEnumerator(
+ Path tablePath,
+ DataTableScan scan,
+ StartingScanner startingScanner,
+ FollowUpScanner followUpScanner,
+ @Nullable Long nextSnapshotId) {
+ this.snapshotManager = new SnapshotManager(tablePath);
+ this.scan = scan;
+ this.startingScanner = startingScanner;
+ this.followUpScanner = followUpScanner;
+
+ this.nextSnapshotId = nextSnapshotId;
+ }
+
+ @Nullable
+ @Override
+ public DataTableScan.DataFilePlan enumerate() {
+ if (nextSnapshotId == null) {
+ return tryFirstEnumerate();
+ } else {
+ return nextEnumerate();
+ }
+ }
+
+ private DataTableScan.DataFilePlan tryFirstEnumerate() {
+ DataTableScan.DataFilePlan plan = startingScanner.getPlan(snapshotManager, scan);
+ if (plan != null) {
+ nextSnapshotId = plan.snapshotId + 1;
+ }
+ return plan;
+ }
+
+ private DataTableScan.DataFilePlan nextEnumerate() {
+ while (true) {
+ if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+ LOG.debug(
+ "Next snapshot id {} does not exist, wait for the snapshot generation.",
+ nextSnapshotId);
+ return null;
+ }
+
+ Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
+
+ if (followUpScanner.shouldScanSnapshot(snapshot)) {
+ LOG.debug("Find snapshot id {}.", nextSnapshotId);
+ DataTableScan.DataFilePlan plan = followUpScanner.getPlan(nextSnapshotId, scan);
+ nextSnapshotId++;
+ return plan;
+ } else {
+ nextSnapshotId++;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // static create methods
+ // ------------------------------------------------------------------------
+
+ public static ContinuousDataFileSnapshotEnumerator create(
+ FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
+ CoreOptions.StartupMode startupMode = table.options().startupMode();
+ Long startupMillis = table.options().logScanTimestampMills();
+ StartingScanner startingScanner;
+ if (startupMode == CoreOptions.StartupMode.FULL) {
+ startingScanner = new FullStartingScanner();
+ } else if (startupMode == CoreOptions.StartupMode.LATEST) {
+ startingScanner = new ContinuousLatestStartingScanner();
+ } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
+ startingScanner = new CompactedStartingScanner();
+ } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+ Preconditions.checkNotNull(
+ startupMillis,
+ String.format(
+ "%s can not be null when you use %s for %s",
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+ CoreOptions.StartupMode.FROM_TIMESTAMP,
+ CoreOptions.SCAN_MODE.key()));
+ startingScanner = new ContinuousFromTimestampStartingScanner(startupMillis);
+ } else {
+ throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
+ }
+
+ CoreOptions.ChangelogProducer changelogProducer = table.options().changelogProducer();
+ FollowUpScanner followUpScanner;
+ if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
+ followUpScanner = new DeltaFollowUpScanner();
+ } else if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
+ followUpScanner = new InputChangelogFollowUpScanner();
+ } else if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+ // this change in scan will affect both starting scanner and follow-up scanner
+ scan.withLevel(table.options().numLevels() - 1);
+ followUpScanner = new CompactionChangelogFollowUpScanner();
+ } else {
+ throw new UnsupportedOperationException(
+ "Unknown changelog producer " + changelogProducer.name());
+ }
+
+ return new ContinuousDataFileSnapshotEnumerator(
+ table.location(), scan, startingScanner, followUpScanner, nextSnapshotId);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
new file mode 100644
index 00000000..d598cb34
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a streaming
+ * read.
+ */
+public class ContinuousFromTimestampStartingScanner implements StartingScanner {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContinuousFromTimestampStartingScanner.class);
+
+ private final long startupMillis;
+
+ public ContinuousFromTimestampStartingScanner(long startupMillis) {
+ this.startupMillis = startupMillis;
+ }
+
+ @Override
+ public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+ Long startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis);
+ if (startingSnapshotId == null) {
+ LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
+ return null;
+ }
+ return new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScanner.java
new file mode 100644
index 00000000..2a0a69d3
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#LATEST} startup mode of a streaming read.
+ */
+public class ContinuousLatestStartingScanner implements StartingScanner {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContinuousLatestStartingScanner.class);
+
+ @Override
+ public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+ Long startingSnapshotId = snapshotManager.latestSnapshotId();
+ if (startingSnapshotId == null) {
+ LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
+ return null;
+ }
+ return new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
deleted file mode 100644
index 54d201dd..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source.snapshot;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.operation.ScanKind;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-
-/** Abstract class for all {@link SnapshotEnumerator}s which enumerate record related data files. */
-public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
-
- private static final Logger LOG = LoggerFactory.getLogger(DataFileSnapshotEnumerator.class);
-
- private final SnapshotManager snapshotManager;
- private final DataTableScan scan;
- private final CoreOptions.StartupMode startupMode;
- private @Nullable final Long startupMillis;
-
- private @Nullable Long nextSnapshotId;
-
- public DataFileSnapshotEnumerator(
- Path tablePath,
- DataTableScan scan,
- CoreOptions.StartupMode startupMode,
- @Nullable Long startupMillis,
- @Nullable Long nextSnapshotId) {
- this.snapshotManager = new SnapshotManager(tablePath);
- this.scan = scan;
- this.startupMode = startupMode;
- this.startupMillis = startupMillis;
-
- this.nextSnapshotId = nextSnapshotId;
- }
-
- @Override
- public DataTableScan.DataFilePlan enumerate() {
- if (nextSnapshotId == null) {
- return tryFirstEnumerate();
- } else {
- return nextEnumerate();
- }
- }
-
- private DataTableScan.DataFilePlan tryFirstEnumerate() {
- Long startingSnapshotId = snapshotManager.latestSnapshotId();
- if (startingSnapshotId == null) {
- LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
- return null;
- }
-
- DataTableScan.DataFilePlan plan;
- switch (startupMode) {
- case FULL:
- plan = scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
- break;
- case FROM_TIMESTAMP:
- Preconditions.checkNotNull(
- startupMillis,
- String.format(
- "%s can not be null when you use %s for %s",
- CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
- CoreOptions.StartupMode.FROM_TIMESTAMP,
- CoreOptions.SCAN_MODE.key()));
- startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis);
- plan = new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
- break;
- case LATEST:
- plan = new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown log startup mode " + startupMode.name());
- }
-
- nextSnapshotId = startingSnapshotId + 1;
- return plan;
- }
-
- private DataTableScan.DataFilePlan nextEnumerate() {
- while (true) {
- if (!snapshotManager.snapshotExists(nextSnapshotId)) {
- LOG.debug(
- "Next snapshot id {} does not exist, wait for the snapshot generation.",
- nextSnapshotId);
- return null;
- }
-
- Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
-
- if (shouldReadSnapshot(snapshot)) {
- LOG.debug("Find snapshot id {}.", nextSnapshotId);
- DataTableScan.DataFilePlan plan = getPlan(scan.withSnapshot(nextSnapshotId));
- nextSnapshotId++;
- return plan;
- } else {
- nextSnapshotId++;
- }
- }
- }
-
- protected abstract boolean shouldReadSnapshot(Snapshot snapshot);
-
- protected abstract DataTableScan.DataFilePlan getPlan(DataTableScan scan);
-
- public static DataFileSnapshotEnumerator create(
- FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
- Path location = table.location();
- CoreOptions.StartupMode startupMode = table.options().startupMode();
- Long startupMillis = table.options().logScanTimestampMills();
-
- switch (table.options().changelogProducer()) {
- case NONE:
- return new DeltaSnapshotEnumerator(
- location, scan, startupMode, startupMillis, nextSnapshotId);
- case INPUT:
- return new InputChangelogSnapshotEnumerator(
- location, scan, startupMode, startupMillis, nextSnapshotId);
- case FULL_COMPACTION:
- return new FullCompactionChangelogSnapshotEnumerator(
- location,
- scan,
- table.options().numLevels() - 1,
- startupMode,
- startupMillis,
- nextSnapshotId);
- default:
- throw new UnsupportedOperationException(
- "Unknown changelog producer " + table.options().changelogProducer().name());
- }
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScanner.java
similarity index 63%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScanner.java
index 36642ff8..4173aab0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScanner.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.table.source.snapshot;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.table.source.DataTableScan;
@@ -27,27 +25,16 @@ import org.apache.flink.table.store.table.source.DataTableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
/**
- * A {@link DataFileSnapshotEnumerator} which scans incremental changes in {@link
- * Snapshot#deltaManifestList()} for each newly created snapshots.
+ * {@link FollowUpScanner} for {@link
+ * org.apache.flink.table.store.CoreOptions.ChangelogProducer#NONE} changelog producer.
*/
-public class DeltaSnapshotEnumerator extends DataFileSnapshotEnumerator {
-
- private static final Logger LOG = LoggerFactory.getLogger(DeltaSnapshotEnumerator.class);
+public class DeltaFollowUpScanner implements FollowUpScanner {
- public DeltaSnapshotEnumerator(
- Path tablePath,
- DataTableScan scan,
- CoreOptions.StartupMode startupMode,
- @Nullable Long startupMillis,
- @Nullable Long nextSnapshotId) {
- super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(DeltaFollowUpScanner.class);
@Override
- protected boolean shouldReadSnapshot(Snapshot snapshot) {
+ public boolean shouldScanSnapshot(Snapshot snapshot) {
if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
return true;
}
@@ -60,7 +47,7 @@ public class DeltaSnapshotEnumerator extends DataFileSnapshotEnumerator {
}
@Override
- protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
- return scan.withKind(ScanKind.DELTA).plan();
+ public DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan) {
+ return scan.withKind(ScanKind.DELTA).withSnapshot(snapshotId).plan();
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FollowUpScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FollowUpScanner.java
new file mode 100644
index 00000000..91d6da66
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FollowUpScanner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+/** Helper class for the follow-up enumeration of a {@link SnapshotEnumerator}. */
+public interface FollowUpScanner {
+
+ boolean shouldScanSnapshot(Snapshot snapshot);
+
+ DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
new file mode 100644
index 00000000..ccb4a4b4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StartingScanner} for the {@link org.apache.flink.table.store.CoreOptions.StartupMode#FULL}
+ * startup mode.
+ */
+public class FullStartingScanner implements StartingScanner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FullStartingScanner.class);
+
+ @Override
+ public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+ Long startingSnapshotId = snapshotManager.latestSnapshotId();
+ if (startingSnapshotId == null) {
+ LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
+ return null;
+ }
+ return scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScanner.java
similarity index 55%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScanner.java
index fffe71f4..d9b3a397 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScanner.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.table.source.snapshot;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.table.source.DataTableScan;
@@ -27,31 +25,16 @@ import org.apache.flink.table.store.table.source.DataTableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
/**
- * A {@link DataFileSnapshotEnumerator} which scans incremental changes in {@link
- * Snapshot#changelogManifestList()} for each newly created snapshots.
- *
- * <p>This enumerator looks for changelog files produced directly from input. Can only be used when
- * {@link CoreOptions#CHANGELOG_PRODUCER} is set to {@link CoreOptions.ChangelogProducer#INPUT}.
+ * {@link FollowUpScanner} for {@link
+ * org.apache.flink.table.store.CoreOptions.ChangelogProducer#INPUT} changelog producer.
*/
-public class InputChangelogSnapshotEnumerator extends DataFileSnapshotEnumerator {
+public class InputChangelogFollowUpScanner implements FollowUpScanner {
- private static final Logger LOG =
- LoggerFactory.getLogger(InputChangelogSnapshotEnumerator.class);
-
- public InputChangelogSnapshotEnumerator(
- Path tablePath,
- DataTableScan scan,
- CoreOptions.StartupMode startupMode,
- @Nullable Long startupMillis,
- Long nextSnapshotId) {
- super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(InputChangelogFollowUpScanner.class);
@Override
- protected boolean shouldReadSnapshot(Snapshot snapshot) {
+ public boolean shouldScanSnapshot(Snapshot snapshot) {
if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
return true;
}
@@ -64,7 +47,7 @@ public class InputChangelogSnapshotEnumerator extends DataFileSnapshotEnumerator
}
@Override
- protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
- return scan.withKind(ScanKind.CHANGELOG).plan();
+ public DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan) {
+ return scan.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).plan();
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StartingScanner.java
new file mode 100644
index 00000000..7b658066
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StartingScanner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+/** Helper class for the first enumeration of a {@link SnapshotEnumerator}. */
+public interface StartingScanner {
+
+ DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
new file mode 100644
index 00000000..f12e9ac1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import javax.annotation.Nullable;
+
+/** {@link SnapshotEnumerator} for batch read. */
+public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
+
+ private final SnapshotManager snapshotManager;
+ private final DataTableScan scan;
+ private final StartingScanner startingScanner;
+
+ private boolean hasNext;
+
+ private StaticDataFileSnapshotEnumerator(
+ Path tablePath, DataTableScan scan, StartingScanner startingScanner) {
+ this.snapshotManager = new SnapshotManager(tablePath);
+ this.scan = scan;
+ this.startingScanner = startingScanner;
+
+ this.hasNext = true;
+ }
+
+ @Nullable
+ @Override
+ public DataTableScan.DataFilePlan enumerate() {
+ if (hasNext) {
+ hasNext = false;
+ return startingScanner.getPlan(snapshotManager, scan);
+ } else {
+ return null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // static create methods
+ // ------------------------------------------------------------------------
+
+ public static StaticDataFileSnapshotEnumerator create(
+ FileStoreTable table, DataTableScan scan) {
+ CoreOptions.StartupMode startupMode = table.options().startupMode();
+ StartingScanner startingScanner;
+ if (startupMode == CoreOptions.StartupMode.FULL
+ || startupMode == CoreOptions.StartupMode.LATEST) {
+ startingScanner = new FullStartingScanner();
+ } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
+ startingScanner = new CompactedStartingScanner();
+ } else {
+ throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
+ }
+
+ return new StaticDataFileSnapshotEnumerator(table.location(), scan, startingScanner);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index aae4a784..34ab1a17 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -38,7 +38,9 @@ import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.table.store.table.source.snapshot.InputChangelogSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
+import org.apache.flink.table.store.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.apache.flink.table.store.utils.CompatibilityTestUtils;
import org.apache.flink.table.types.logical.LogicalType;
@@ -373,8 +375,12 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
Collections.singletonList("-D 2|10|301|binary|varbinary")));
SnapshotEnumerator enumerator =
- new InputChangelogSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, 1L);
+ new ContinuousDataFileSnapshotEnumerator(
+ tablePath,
+ table.newScan(),
+ new FullStartingScanner(),
+ new InputChangelogFollowUpScanner(),
+ 1L);
FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
i -> {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 3bde3586..8965e97f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
@@ -43,7 +42,6 @@ import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataSplit;
-import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.types.logical.LogicalType;
@@ -60,7 +58,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -72,7 +69,6 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.store.CoreOptions.BUCKET;
import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
import static org.apache.flink.table.store.CoreOptions.COMPACTION_MAX_FILE_NUM;
-import static org.apache.flink.table.store.CoreOptions.READ_COMPACTED;
import static org.apache.flink.table.store.CoreOptions.WRITE_COMPACTION_SKIP;
import static org.assertj.core.api.Assertions.assertThat;
@@ -320,58 +316,6 @@ public abstract class FileStoreTableTestBase {
}
}
- @Test
- public void testReadCompactedSnapshot() throws Exception {
- writeCompactData();
- FileStoreTable table = createFileStoreTable(conf -> conf.set(READ_COMPACTED, true));
-
- DataTableScan.DataFilePlan plan = table.newScan().plan();
- Snapshot compactedSnapshot = table.snapshotManager().snapshot(plan.snapshotId);
- Iterator<Snapshot> snapshotIterator = table.snapshotManager().snapshots();
- while (snapshotIterator.hasNext()) {
- Snapshot snapshot = snapshotIterator.next();
- if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
- assertThat(snapshot.id()).isLessThanOrEqualTo(compactedSnapshot.id());
- }
- }
-
- assertThat(compactedSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
- List<Split> splits = plan.splits();
- TableRead read = table.newRead();
- assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING).size())
- .isGreaterThan(0);
- assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING).size())
- .isGreaterThan(0);
-
- List<Split> compactedSplits =
- table.newScan().withSnapshot(compactedSnapshot.id()).plan().splits();
- assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(getResult(read, compactedSplits, binaryRow(1), 0, BATCH_ROW_TO_STRING));
- assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(getResult(read, compactedSplits, binaryRow(2), 0, BATCH_ROW_TO_STRING));
- }
-
- private void writeCompactData() throws Exception {
- final int batchCount = 10;
- final int countPerBatch = 10000;
- FileStoreTable table =
- createFileStoreTable(
- conf ->
- conf.set(CoreOptions.BUCKET, 1)
- .set(CoreOptions.READ_COMPACTED, true)
- .set(CoreOptions.COMPACTION_MAX_FILE_NUM, 10));
- TableWrite write = table.newWrite("user");
- TableCommit commit = table.newCommit("user");
-
- for (int i = 0; i < batchCount; i++) {
- for (int j = 0; j < countPerBatch; j++) {
- write.write(rowData(j % 2 + 1, i * j, j * 10L));
- }
- commit.commit(i, write.prepareCommit(true, i));
- }
- write.close();
- }
-
protected List<String> getResult(
TableRead read,
List<Split> splits,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScannerTest.java
new file mode 100644
index 00000000..b7bf76e3
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScannerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CompactedStartingScanner}. */
+public class CompactedStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+ @Test
+ public void testGetPlan() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 20, 201L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
+
+ CompactedStartingScanner scanner = new CompactedStartingScanner();
+ DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(getResult(table.newRead(), plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testNoSnapshot() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ CompactedStartingScanner scanner = new CompactedStartingScanner();
+ assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+ }
+
+ @Test
+ public void testNoCompactSnapshot() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
+
+ CompactedStartingScanner scanner = new CompactedStartingScanner();
+ assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+
+ write.close();
+ commit.close();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
new file mode 100644
index 00000000..10f22cb0
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CompactionChangelogFollowUpScanner}. */
+public class CompactionChangelogFollowUpScannerTest extends SnapshotEnumeratorTestBase {
+
+ @Test
+ public void testGetPlan() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 30, 300L));
+ write.write(rowData(1, 40, 401L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
+
+ DataTableScan scan = table.newScan().withLevel(table.options().numLevels() - 1);
+ TableRead read = table.newRead();
+ CompactionChangelogFollowUpScanner scanner = new CompactionChangelogFollowUpScanner();
+
+ Snapshot snapshot = snapshotManager.snapshot(1);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+ snapshot = snapshotManager.snapshot(2);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+ snapshot = snapshotManager.snapshot(3);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+ DataTableScan.DataFilePlan plan = scanner.getPlan(3, scan);
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|200", "+I 1|30|300"));
+
+ snapshot = snapshotManager.snapshot(4);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+ snapshot = snapshotManager.snapshot(5);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+ plan = scanner.getPlan(5, scan);
+ assertThat(plan.snapshotId).isEqualTo(5);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(
+ Arrays.asList("-U 1|10|102", "+U 1|10|103", "-D 1|30|300", "+I 1|40|401"));
+
+ write.close();
+ commit.close();
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.FULL_COMPACTION);
+ return createFileStoreTable(conf);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumeratorTest.java
similarity index 59%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumeratorTest.java
index 3252d022..666c22c3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumeratorTest.java
@@ -33,18 +33,17 @@ import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link DeltaSnapshotEnumerator}. */
-public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestBase {
+/** Tests for {@link ContinuousDataFileSnapshotEnumerator}. */
+public class ContinuousDataFileSnapshotEnumeratorTest extends SnapshotEnumeratorTestBase {
@Test
- public void testFullStartupMode() throws Exception {
+ public void testEnumerate() throws Exception {
FileStoreTable table = createFileStoreTable();
TableRead read = table.newRead();
TableWrite write = table.newWrite(commitUser);
TableCommit commit = table.newCommit(commitUser);
SnapshotEnumerator enumerator =
- new DeltaSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, null);
+ ContinuousDataFileSnapshotEnumerator.create(table, table.newScan(), null);
// first call without any snapshot, should return null
assertThat(enumerator.enumerate()).isNull();
@@ -101,134 +100,76 @@ public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestB
}
@Test
- public void testFromTimestampStartupMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
+ public void testFullCompactionChangelog() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.FULL_COMPACTION);
+
+ FileStoreTable table = createFileStoreTable(conf);
TableRead read = table.newRead();
TableWrite write = table.newWrite(commitUser);
TableCommit commit = table.newCommit(commitUser);
+ SnapshotEnumerator enumerator =
+ ContinuousDataFileSnapshotEnumerator.create(table, table.newScan(), null);
+
+ // first call without any snapshot, should return null
+ assertThat(enumerator.enumerate()).isNull();
+ // write base data
write.write(rowData(1, 10, 100L));
write.write(rowData(1, 20, 200L));
write.write(rowData(1, 40, 400L));
commit.commit(0, write.prepareCommit(true, 0));
- // log current time millis, we'll start from here
- Thread.sleep(50);
- long startMillis = System.currentTimeMillis();
- Thread.sleep(50);
-
write.write(rowData(1, 10, 101L));
write.write(rowData(1, 30, 300L));
write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
commit.commit(1, write.prepareCommit(true, 1));
+ // some more records
write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
write.write(rowData(1, 20, 201L));
write.write(rowData(1, 10, 102L));
write.write(rowData(1, 40, 400L));
commit.commit(2, write.prepareCommit(true, 2));
- // first call with snapshot, should return empty plan
- SnapshotEnumerator enumerator =
- new DeltaSnapshotEnumerator(
- tablePath,
- table.newScan(),
- CoreOptions.StartupMode.FROM_TIMESTAMP,
- startMillis,
- null);
-
+ // first call with snapshot, should return full compacted records from 3rd commit
DataTableScan.DataFilePlan plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(1);
- assertThat(plan.splits()).isEmpty();
-
- // first incremental call, should return incremental records from 2nd commit
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(2);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|30|300", "-D 1|40|400"));
-
- // second incremental call, should return incremental records from 3rd commit
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(plan.snapshotId).isEqualTo(4);
assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
- // no new snapshots
+ // incremental call without new snapshots, should return null
assertThat(enumerator.enumerate()).isNull();
- // more incremental records
+ // write incremental data
write.write(rowData(1, 10, 103L));
write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
write.write(rowData(1, 50, 500L));
commit.commit(3, write.prepareCommit(true, 3));
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(4);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
-
- assertThat(enumerator.enumerate()).isNull();
-
- write.close();
- commit.close();
- }
-
- @Test
- public void testLatestStartupMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
- TableRead read = table.newRead();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
-
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(1, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(1, 10, 101L));
- write.write(rowData(1, 30, 300L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- commit.commit(1, write.prepareCommit(true, 1));
-
- SnapshotEnumerator enumerator =
- new DeltaSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.StartupMode.LATEST, null, null);
-
- DataTableScan.DataFilePlan plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(2);
- assertThat(plan.splits()).isEmpty();
-
+ // no new compact snapshots, should return null
assertThat(enumerator.enumerate()).isNull();
- write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
- write.write(rowData(1, 20, 201L));
- write.write(rowData(1, 10, 102L));
- write.write(rowData(1, 40, 400L));
- commit.commit(2, write.prepareCommit(true, 2));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(4, write.prepareCommit(true, 4));
+ // full compaction done, read new changelog
plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(plan.snapshotId).isEqualTo(6);
assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
- assertThat(enumerator.enumerate()).isNull();
-
- write.write(rowData(1, 10, 103L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.write(rowData(1, 50, 500L));
- commit.commit(3, write.prepareCommit(true, 3));
+ .hasSameElementsAs(
+ Arrays.asList(
+ "-U 1|10|101",
+ "+U 1|10|103",
+ "-U 1|20|200",
+ "+U 1|20|201",
+ "+I 1|50|500"));
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(4);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
+ // no more new snapshots, should return null
assertThat(enumerator.enumerate()).isNull();
write.close();
commit.close();
}
-
- @Override
- protected Configuration getConf() {
- return new Configuration();
- }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
new file mode 100644
index 00000000..eb68eb6c
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ContinuousFromTimestampStartingScanner}. */
+public class ContinuousFromTimestampStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+ @Test
+ public void testGetPlan() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // wait for a little while
+ Thread.sleep(50);
+
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 20, 201L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3);
+ // snapshot 3 should not be read
+ long timestamp = snapshotManager.snapshot(3).timeMillis();
+
+ ContinuousFromTimestampStartingScanner scanner =
+ new ContinuousFromTimestampStartingScanner(timestamp);
+ DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testNoSnapshot() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ ContinuousFromTimestampStartingScanner scanner =
+ new ContinuousFromTimestampStartingScanner(System.currentTimeMillis());
+ assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+ }
+
+ @Test
+ public void testNoSnapshotBeforeTimestamp() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
+ // snapshot 1 should not be read
+ long timestamp = snapshotManager.snapshot(1).timeMillis();
+
+ ContinuousFromTimestampStartingScanner scanner =
+ new ContinuousFromTimestampStartingScanner(timestamp);
+ DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+ assertThat(plan.snapshotId).isEqualTo(0);
+ assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+
+ write.close();
+ commit.close();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScannerTest.java
new file mode 100644
index 00000000..d4917792
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScannerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ContinuousLatestStartingScanner}. */
+public class ContinuousLatestStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+ @Test
+ public void testGetPlan() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
+
+ ContinuousLatestStartingScanner scanner = new ContinuousLatestStartingScanner();
+ DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testNoSnapshot() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ ContinuousLatestStartingScanner scanner = new ContinuousLatestStartingScanner();
+ assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScannerTest.java
new file mode 100644
index 00000000..080812a2
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScannerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DeltaFollowUpScanner}. */
+public class DeltaFollowUpScannerTest extends SnapshotEnumeratorTestBase {
+
+ @Test
+ public void testGetPlan() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3);
+
+ DataTableScan scan = table.newScan();
+ TableRead read = table.newRead();
+ DeltaFollowUpScanner scanner = new DeltaFollowUpScanner();
+
+ Snapshot snapshot = snapshotManager.snapshot(1);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+ DataTableScan.DataFilePlan plan = scanner.getPlan(1, scan);
+ assertThat(plan.snapshotId).isEqualTo(1);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 1|40|400"));
+
+ snapshot = snapshotManager.snapshot(2);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+ plan = scanner.getPlan(2, scan);
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|30|300", "-D 1|40|400"));
+
+ snapshot = snapshotManager.snapshot(3);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+ write.close();
+ commit.close();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
deleted file mode 100644
index 9a3e890f..00000000
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source.snapshot;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.types.RowKind;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link FullCompactionChangelogSnapshotEnumerator}. */
-public class FullCompactionChangelogSnapshotEnumeratorTest
- extends DataFileSnapshotEnumeratorTestBase {
-
- @Test
- public void testFullStartupMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
- TableRead read = table.newRead();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
- SnapshotEnumerator enumerator =
- new FullCompactionChangelogSnapshotEnumerator(
- tablePath,
- table.newScan(),
- table.options().numLevels() - 1,
- CoreOptions.StartupMode.FULL,
- null,
- null);
-
- // first call without any snapshot, should return null
- assertThat(enumerator.enumerate()).isNull();
-
- // write base data
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(1, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(1, 10, 101L));
- write.write(rowData(1, 30, 300L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.compact(binaryRow(1), 0, true);
- commit.commit(1, write.prepareCommit(true, 1));
-
- // some more records
- write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
- write.write(rowData(1, 20, 201L));
- write.write(rowData(1, 10, 102L));
- write.write(rowData(1, 40, 400L));
- commit.commit(2, write.prepareCommit(true, 2));
-
- // first call with snapshot, should return full compacted records from 3rd commit
- DataTableScan.DataFilePlan plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(4);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
-
- // incremental call without new snapshots, should return null
- assertThat(enumerator.enumerate()).isNull();
-
- // write incremental data
- write.write(rowData(1, 10, 103L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.write(rowData(1, 50, 500L));
- commit.commit(3, write.prepareCommit(true, 3));
-
- // no new compact snapshots, should return null
- assertThat(enumerator.enumerate()).isNull();
-
- write.compact(binaryRow(1), 0, true);
- commit.commit(4, write.prepareCommit(true, 4));
-
- // full compaction done, read new changelog
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(6);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(
- Arrays.asList(
- "-U 1|10|101",
- "+U 1|10|103",
- "-U 1|20|200",
- "+U 1|20|201",
- "+I 1|50|500"));
-
- // no more new snapshots, should return null
- assertThat(enumerator.enumerate()).isNull();
-
- write.close();
- commit.close();
- }
-
- @Test
- public void testFromTimestampStartupMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
- TableRead read = table.newRead();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
-
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(1, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(1, 10, 101L));
- write.write(rowData(1, 30, 300L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.compact(binaryRow(1), 0, true);
- commit.commit(1, write.prepareCommit(true, 1));
-
- // log current time millis, we'll start from here
- Thread.sleep(50);
- long startMillis = System.currentTimeMillis();
- Thread.sleep(50);
-
- write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
- write.write(rowData(1, 20, 201L));
- write.write(rowData(1, 10, 102L));
- write.write(rowData(1, 40, 400L));
- commit.commit(2, write.prepareCommit(true, 2));
-
- SnapshotEnumerator enumerator =
- new FullCompactionChangelogSnapshotEnumerator(
- tablePath,
- table.newScan(),
- table.options().numLevels() - 1,
- CoreOptions.StartupMode.FROM_TIMESTAMP,
- startMillis,
- null);
-
- // first call, should return empty plan
- DataTableScan.DataFilePlan plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(3);
- assertThat(plan.splits()).isEmpty();
-
- // first incremental call, no new compact snapshot, should be null
- assertThat(enumerator.enumerate()).isNull();
-
- write.write(rowData(1, 10, 103L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.write(rowData(1, 50, 500L));
- write.compact(binaryRow(1), 0, true);
- commit.commit(3, write.prepareCommit(true, 3));
-
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(6);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(
- Arrays.asList(
- "-U 1|10|101",
- "+U 1|10|103",
- "-U 1|20|200",
- "+U 1|20|201",
- "+I 1|50|500"));
-
- assertThat(enumerator.enumerate()).isNull();
-
- write.close();
- commit.close();
- }
-
- @Test
- public void testLatestStartupMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
- TableRead read = table.newRead();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
-
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(1, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(1, 10, 101L));
- write.write(rowData(1, 30, 300L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.compact(binaryRow(1), 0, true);
- commit.commit(1, write.prepareCommit(true, 1));
-
- SnapshotEnumerator enumerator =
- new FullCompactionChangelogSnapshotEnumerator(
- tablePath,
- table.newScan(),
- table.options().numLevels() - 1,
- CoreOptions.StartupMode.LATEST,
- null,
- null);
-
- // first call, should be empty plan
- DataTableScan.DataFilePlan plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(3);
- assertThat(plan.splits()).isEmpty();
-
- write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
- write.write(rowData(1, 20, 201L));
- write.write(rowData(1, 10, 102L));
- write.write(rowData(1, 40, 400L));
- commit.commit(2, write.prepareCommit(true, 2));
-
- // first incremental call, no new compact snapshot, should be null
- assertThat(enumerator.enumerate()).isNull();
-
- write.write(rowData(1, 10, 103L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.write(rowData(1, 50, 500L));
- write.compact(binaryRow(1), 0, true);
- commit.commit(3, write.prepareCommit(true, 3));
-
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(6);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(
- Arrays.asList(
- "-U 1|10|101",
- "+U 1|10|103",
- "-U 1|20|200",
- "+U 1|20|201",
- "+I 1|50|500"));
-
- assertThat(enumerator.enumerate()).isNull();
-
- write.close();
- commit.close();
- }
-
- @Override
- protected Configuration getConf() {
- Configuration conf = new Configuration();
- conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.FULL_COMPACTION);
- return conf;
- }
-}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScannerTest.java
new file mode 100644
index 00000000..c8ca7c56
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScannerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FullStartingScanner}. */
+public class FullStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+ @Test
+ public void testGetPlan() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
+
+ FullStartingScanner scanner = new FullStartingScanner();
+ DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(table.newRead(), plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testNoSnapshot() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ FullStartingScanner scanner = new FullStartingScanner();
+ assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScannerTest.java
new file mode 100644
index 00000000..aec46df2
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScannerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link InputChangelogFollowUpScanner}. */
+public class InputChangelogFollowUpScannerTest extends SnapshotEnumeratorTestBase {
+
+ @Test
+ public void testGetPlan() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3);
+
+ DataTableScan scan = table.newScan();
+ TableRead read = table.newRead();
+ InputChangelogFollowUpScanner scanner = new InputChangelogFollowUpScanner();
+
+ Snapshot snapshot = snapshotManager.snapshot(1);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+ DataTableScan.DataFilePlan plan = scanner.getPlan(1, scan);
+ assertThat(plan.snapshotId).isEqualTo(1);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 1|40|400"));
+
+ snapshot = snapshotManager.snapshot(2);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+ plan = scanner.getPlan(2, scan);
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(
+ Arrays.asList("+I 1|10|101", "+I 1|30|300", "+I 1|10|102", "-D 1|40|400"));
+
+ snapshot = snapshotManager.snapshot(3);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+ write.close();
+ commit.close();
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
+ return createFileStoreTable(conf);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
deleted file mode 100644
index 2c2fc091..00000000
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source.snapshot;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.types.RowKind;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link InputChangelogSnapshotEnumerator}. */
-public class InputChangelogSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestBase {
-
- @Test
- public void testFullStartupMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
- TableRead read = table.newRead();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
- SnapshotEnumerator enumerator =
- new InputChangelogSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, null);
-
- // first call without any snapshot, should return null
- assertThat(enumerator.enumerate()).isNull();
-
- // write base data
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(1, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(1, 10, 101L));
- write.write(rowData(1, 30, 300L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- commit.commit(1, write.prepareCommit(true, 1));
-
- // first call with snapshot, should return complete records from 2nd commit
- DataTableScan.DataFilePlan plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(2);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
-
- // incremental call without new snapshots, should return null
- assertThat(enumerator.enumerate()).isNull();
-
- // write incremental data
- write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
- write.write(rowData(1, 20, 201L));
- write.write(rowData(1, 10, 102L));
- write.write(rowData(1, 40, 400L));
- commit.commit(2, write.prepareCommit(true, 2));
-
- write.write(rowData(1, 10, 103L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.write(rowData(1, 50, 500L));
- commit.commit(3, write.prepareCommit(true, 3));
-
- // first incremental call, should return incremental records from 3rd commit
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(3);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(
- Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
-
- // second incremental call, should return incremental records from 4th commit
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(4);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
-
- // no more new snapshots, should return null
- assertThat(enumerator.enumerate()).isNull();
-
- write.close();
- commit.close();
- }
-
- @Test
- public void testFromTimestampStartupMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
- TableRead read = table.newRead();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
-
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(1, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- // log current time millis, we'll start from here
- Thread.sleep(50);
- long startMillis = System.currentTimeMillis();
- Thread.sleep(50);
-
- write.write(rowData(1, 10, 101L));
- write.write(rowData(1, 30, 300L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- commit.commit(1, write.prepareCommit(true, 1));
-
- write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
- write.write(rowData(1, 20, 201L));
- write.write(rowData(1, 10, 102L));
- write.write(rowData(1, 40, 400L));
- commit.commit(2, write.prepareCommit(true, 2));
-
- // first call with snapshot, should return empty plan
- SnapshotEnumerator enumerator =
- new InputChangelogSnapshotEnumerator(
- tablePath,
- table.newScan(),
- CoreOptions.StartupMode.FROM_TIMESTAMP,
- startMillis,
- null);
-
- DataTableScan.DataFilePlan plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(1);
- assertThat(plan.splits()).isEmpty();
-
- // first incremental call, should return incremental records from 2nd commit
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(2);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|30|300", "-D 1|40|400"));
-
- // second incremental call, should return incremental records from 3rd commit
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(3);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(
- Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
-
- // no new snapshots
- assertThat(enumerator.enumerate()).isNull();
-
- // more incremental records
- write.write(rowData(1, 10, 103L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.write(rowData(1, 50, 500L));
- commit.commit(3, write.prepareCommit(true, 3));
-
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(4);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
-
- assertThat(enumerator.enumerate()).isNull();
-
- write.close();
- commit.close();
- }
-
- @Test
- public void testLatestStartupMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
- TableRead read = table.newRead();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
-
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(1, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(1, 10, 101L));
- write.write(rowData(1, 30, 300L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- commit.commit(1, write.prepareCommit(true, 1));
-
- SnapshotEnumerator enumerator =
- new InputChangelogSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.StartupMode.LATEST, null, null);
-
- DataTableScan.DataFilePlan plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(2);
- assertThat(plan.splits()).isEmpty();
-
- assertThat(enumerator.enumerate()).isNull();
-
- write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
- write.write(rowData(1, 20, 201L));
- write.write(rowData(1, 10, 102L));
- write.write(rowData(1, 40, 400L));
- commit.commit(2, write.prepareCommit(true, 2));
-
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(3);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(
- Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
- assertThat(enumerator.enumerate()).isNull();
-
- write.write(rowData(1, 10, 103L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.write(rowData(1, 50, 500L));
- commit.commit(3, write.prepareCommit(true, 3));
-
- plan = enumerator.enumerate();
- assertThat(plan.snapshotId).isEqualTo(4);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
- assertThat(enumerator.enumerate()).isNull();
-
- write.close();
- commit.close();
- }
-
- @Override
- protected Configuration getConf() {
- Configuration conf = new Configuration();
- conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
- return conf;
- }
-}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
similarity index 94%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
index 916df0cc..9a7f628b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
@@ -50,11 +50,11 @@ import java.util.List;
import java.util.UUID;
/**
- * Base test class for {@link DataFileSnapshotEnumerator}.
+ * Base test class for {@link SnapshotEnumerator} and related classes.
*
* <p>TODO: merge this class with FileStoreTableTestBase.
*/
-public abstract class DataFileSnapshotEnumeratorTestBase {
+public abstract class SnapshotEnumeratorTestBase {
private static final RowType ROW_TYPE =
RowType.of(
@@ -115,7 +115,10 @@ public abstract class DataFileSnapshotEnumeratorTestBase {
}
protected FileStoreTable createFileStoreTable() throws Exception {
- Configuration conf = getConf();
+ return createFileStoreTable(new Configuration());
+ }
+
+ protected FileStoreTable createFileStoreTable(Configuration conf) throws Exception {
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema tableSchema =
schemaManager.commitNewVersion(
@@ -127,6 +130,4 @@ public abstract class DataFileSnapshotEnumeratorTestBase {
""));
return FileStoreTableFactory.create(tablePath, tableSchema, conf);
}
-
- protected abstract Configuration getConf();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumeratorTest.java
new file mode 100644
index 00000000..a56efc33
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumeratorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StaticDataFileSnapshotEnumerator}. */
+public class StaticDataFileSnapshotEnumeratorTest extends SnapshotEnumeratorTestBase {
+
+ @Test
+ public void testEnumerate() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+ SnapshotEnumerator enumerator =
+ StaticDataFileSnapshotEnumerator.create(table, table.newScan());
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
+
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(table.newRead(), plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
+
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 30, 301L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+}