You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/28 04:20:44 UTC
[incubator-paimon] branch master updated: [core][bug] TableScan#plan should not return null (#698)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7032eb869 [core][bug] TableScan#plan should not return null (#698)
7032eb869 is described below
commit 7032eb869d40f3ae242e1a9d1aa7ee61cba5f505
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Mar 28 12:20:39 2023 +0800
[core][bug] TableScan#plan should not return null (#698)
---
.../paimon/table/AbstractFileStoreTable.java | 2 +-
.../paimon/table/source/AbstractDataTableScan.java | 27 ------
.../paimon/table/source/BatchDataTableScan.java | 15 +++
.../table/source/BatchDataTableScanImpl.java | 34 ++++++-
.../apache/paimon/table/source/DataTableScan.java | 10 +-
.../paimon/table/source/StreamDataTableScan.java | 15 +++
.../table/source/StreamDataTableScanImpl.java | 47 +++++++--
.../paimon/table/source/StreamTableScan.java | 2 +
.../paimon/table/source/TableStreamingReader.java | 4 +-
.../source/snapshot/CompactedStartingScanner.java | 9 +-
.../CompactionChangelogFollowUpScanner.java | 3 +-
.../ContinuousCompactorFollowUpScanner.java | 3 +-
.../ContinuousCompactorStartingScanner.java | 11 +--
.../ContinuousFromSnapshotStartingScanner.java | 12 +--
.../ContinuousFromTimestampStartingScanner.java | 9 +-
.../snapshot/ContinuousLatestStartingScanner.java | 9 +-
.../source/snapshot/DeltaFollowUpScanner.java | 3 +-
.../table/source/snapshot/FollowUpScanner.java | 4 +-
.../table/source/snapshot/FullStartingScanner.java | 9 +-
.../snapshot/InputChangelogFollowUpScanner.java | 3 +-
.../table/source/snapshot/StartingScanner.java | 32 ++++++-
.../StaticFromSnapshotStartingScanner.java | 9 +-
.../StaticFromTimestampStartingScanner.java | 11 +--
.../apache/paimon/table/system/AuditLogTable.java | 16 ++--
.../org/apache/paimon/table/system/FilesTable.java | 10 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 4 +-
.../table/source/BatchDataTableScanTest.java | 1 -
.../table/source/StreamDataTableScanTest.java | 40 ++++----
.../snapshot/CompactedStartingScannerTest.java | 13 ++-
.../CompactionChangelogFollowUpScannerTest.java | 8 +-
.../ContinuousCompactorFollowUpScannerTest.java | 8 +-
.../ContinuousCompactorStartingScannerTest.java | 11 +--
...ContinuousFromTimestampStartingScannerTest.java | 17 ++--
.../ContinuousLatestStartingScannerTest.java | 11 +--
.../source/snapshot/DeltaFollowUpScannerTest.java | 8 +-
.../source/snapshot/FullStartingScannerTest.java | 11 +--
.../InputChangelogFollowUpScannerTest.java | 8 +-
.../table/source/snapshot/ScannerTestBase.java | 5 +
.../flink/lookup/FileStoreLookupFunction.java | 2 +-
.../flink/source/CompactorSourceBuilder.java | 1 +
.../source/ContinuousFileSplitEnumerator.java | 16 ++--
.../flink/source/ContinuousFileStoreSource.java | 2 +-
.../paimon/flink/source/FlinkSourceBuilder.java | 3 +-
.../flink/source/LogHybridSourceFactory.java | 81 ++++++++++++++++
.../paimon/flink/source/StaticFileStoreSource.java | 17 +---
.../paimon/flink/action/CompactActionITCase.java | 1 -
.../source/ContinuousFileSplitEnumeratorTest.java | 106 ++++++++++++++++-----
.../source/FileStoreSourceSplitGeneratorTest.java | 2 +-
.../paimon/hive/mapred/PaimonInputFormat.java | 9 +-
.../java/org/apache/paimon/spark/SparkScan.java | 6 +-
50 files changed, 437 insertions(+), 263 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 4e83ed8e6..507b4a385 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -200,7 +200,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
return Optional.empty();
case FROM_TIMESTAMP:
Snapshot snapshot =
- StaticFromTimestampStartingScanner.getSnapshot(
+ StaticFromTimestampStartingScanner.timeTravelToTimestamp(
snapshotManager(), coreOptions.scanTimestampMills());
if (snapshot != null) {
long schemaId = snapshot.schemaId();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index eb5604209..bc485b748 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -21,8 +21,6 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.operation.FileStoreScan;
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
@@ -32,7 +30,6 @@ import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
-import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
@@ -46,30 +43,6 @@ public abstract class AbstractDataTableScan implements DataTableScan {
this.snapshotSplitReader = snapshotSplitReader;
}
- @Override
- public AbstractDataTableScan withSnapshot(long snapshotId) {
- snapshotSplitReader.withSnapshot(snapshotId);
- return this;
- }
-
- @Override
- public AbstractDataTableScan withFilter(Predicate predicate) {
- snapshotSplitReader.withFilter(predicate);
- return this;
- }
-
- @Override
- public AbstractDataTableScan withKind(ScanKind scanKind) {
- snapshotSplitReader.withKind(scanKind);
- return this;
- }
-
- @Override
- public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
- snapshotSplitReader.withLevelFilter(levelFilter);
- return this;
- }
-
@VisibleForTesting
public AbstractDataTableScan withBucket(int bucket) {
snapshotSplitReader.withBucket(bucket);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
index 4a3e0bcf7..e2839253e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
@@ -18,14 +18,29 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
import java.io.Serializable;
/** {@link DataTableScan} for batch planning. */
public interface BatchDataTableScan extends DataTableScan {
+ @Override
+ BatchDataTableScan withSnapshot(long snapshotId);
+
+ @Override
+ BatchDataTableScan withFilter(Predicate predicate);
+
+ @Override
+ BatchDataTableScan withKind(ScanKind scanKind);
+
+ @Override
+ BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter);
+
BatchDataTableScan withStartingScanner(StartingScanner startingScanner);
// ------------------------------------------------------------------------
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
index 2d180cb86..d0dead613 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
@@ -19,12 +19,13 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
/** {@link DataTableScan} for batch planning. */
public class BatchDataTableScanImpl extends AbstractDataTableScan implements BatchDataTableScan {
@@ -43,6 +44,30 @@ public class BatchDataTableScanImpl extends AbstractDataTableScan implements Bat
this.hasNext = true;
}
+ @Override
+ public BatchDataTableScan withSnapshot(long snapshotId) {
+ snapshotSplitReader.withSnapshot(snapshotId);
+ return this;
+ }
+
+ @Override
+ public BatchDataTableScan withFilter(Predicate predicate) {
+ snapshotSplitReader.withFilter(predicate);
+ return this;
+ }
+
+ @Override
+ public BatchDataTableScan withKind(ScanKind scanKind) {
+ snapshotSplitReader.withKind(scanKind);
+ return this;
+ }
+
+ @Override
+ public BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+ snapshotSplitReader.withLevelFilter(levelFilter);
+ return this;
+ }
+
@Override
public BatchDataTableScan withStartingScanner(StartingScanner startingScanner) {
this.startingScanner = startingScanner;
@@ -50,7 +75,6 @@ public class BatchDataTableScanImpl extends AbstractDataTableScan implements Bat
}
@Override
- @Nullable
public DataFilePlan plan() {
if (startingScanner == null) {
startingScanner = createStartingScanner(false);
@@ -58,7 +82,9 @@ public class BatchDataTableScanImpl extends AbstractDataTableScan implements Bat
if (hasNext) {
hasNext = false;
- return startingScanner.getPlan(snapshotManager, snapshotSplitReader);
+ StartingScanner.Result result =
+ startingScanner.scan(snapshotManager, snapshotSplitReader);
+ return DataFilePlan.fromResult(result);
} else {
throw new EndOfScanException();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
index 066f0fb1b..c5249c5bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
@@ -21,10 +21,12 @@ package org.apache.paimon.table.source;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.Filter;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
/** A {@link TableScan} for reading data. */
@@ -43,12 +45,10 @@ public interface DataTableScan extends InnerTableScan {
/** Scanning plan containing snapshot ID and input splits. */
class DataFilePlan implements Plan {
- @Nullable public final Long snapshotId;
public final List<DataSplit> splits;
@VisibleForTesting
- public DataFilePlan(@Nullable Long snapshotId, List<DataSplit> splits) {
- this.snapshotId = snapshotId;
+ public DataFilePlan(List<DataSplit> splits) {
this.splits = splits;
}
@@ -57,5 +57,9 @@ public interface DataTableScan extends InnerTableScan {
public List<Split> splits() {
return (List) splits;
}
+
+ public static DataFilePlan fromResult(@Nullable StartingScanner.Result result) {
+ return new DataFilePlan(result == null ? Collections.emptyList() : result.splits());
+ }
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
index d125aec90..471c8428d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
@@ -19,11 +19,14 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
import javax.annotation.Nullable;
@@ -33,6 +36,18 @@ import java.util.HashMap;
/** {@link DataTableScan} for streaming planning. */
public interface StreamDataTableScan extends DataTableScan, InnerStreamTableScan {
+ @Override
+ StreamDataTableScan withSnapshot(long snapshotId);
+
+ @Override
+ StreamDataTableScan withFilter(Predicate predicate);
+
+ @Override
+ StreamDataTableScan withKind(ScanKind scanKind);
+
+ @Override
+ StreamDataTableScan withLevelFilter(Filter<Integer> levelFilter);
+
boolean supportStreamingReadOverwrite();
StreamDataTableScan withStartingScanner(StartingScanner startingScanner);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
index bf5336c08..afa4a302b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
@@ -20,6 +20,8 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -29,6 +31,7 @@ import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -36,6 +39,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.Collections;
+
/** {@link DataTableScan} for streaming planning. */
public class StreamDataTableScanImpl extends AbstractDataTableScan implements StreamDataTableScan {
@@ -62,6 +67,30 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
}
+ @Override
+ public StreamDataTableScanImpl withSnapshot(long snapshotId) {
+ snapshotSplitReader.withSnapshot(snapshotId);
+ return this;
+ }
+
+ @Override
+ public StreamDataTableScanImpl withFilter(Predicate predicate) {
+ snapshotSplitReader.withFilter(predicate);
+ return this;
+ }
+
+ @Override
+ public StreamDataTableScanImpl withKind(ScanKind scanKind) {
+ snapshotSplitReader.withKind(scanKind);
+ return this;
+ }
+
+ @Override
+ public StreamDataTableScanImpl withLevelFilter(Filter<Integer> levelFilter) {
+ snapshotSplitReader.withLevelFilter(levelFilter);
+ return this;
+ }
+
@Override
public boolean supportStreamingReadOverwrite() {
return supportStreamingReadOverwrite;
@@ -94,7 +123,6 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
return this;
}
- @Nullable
@Override
public DataFilePlan plan() {
if (startingScanner == null) {
@@ -115,16 +143,15 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
}
private DataFilePlan tryFirstPlan() {
- DataTableScan.DataFilePlan plan =
- startingScanner.getPlan(snapshotManager, snapshotSplitReader);
- if (plan != null) {
- long snapshot = plan.snapshotId;
- nextSnapshotId = snapshot + 1;
- if (boundedChecker.shouldEndInput(snapshotManager.snapshot(snapshot))) {
+ StartingScanner.Result result = startingScanner.scan(snapshotManager, snapshotSplitReader);
+ if (result != null) {
+ long snapshotId = result.snapshotId();
+ nextSnapshotId = snapshotId + 1;
+ if (boundedChecker.shouldEndInput(snapshotManager.snapshot(snapshotId))) {
isEnd = true;
}
}
- return plan;
+ return DataFilePlan.fromResult(result);
}
private DataFilePlan nextPlan() {
@@ -137,7 +164,7 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
LOG.debug(
"Next snapshot id {} does not exist, wait for the snapshot generation.",
nextSnapshotId);
- return null;
+ return new DataFilePlan(Collections.emptyList());
}
Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
@@ -158,7 +185,7 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
} else if (followUpScanner.shouldScanSnapshot(snapshot)) {
LOG.debug("Find snapshot id {}.", nextSnapshotId);
DataTableScan.DataFilePlan plan =
- followUpScanner.getPlan(nextSnapshotId, snapshotSplitReader);
+ followUpScanner.scan(nextSnapshotId, snapshotSplitReader);
nextSnapshotId++;
return plan;
} else {
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 82d4d7d38..44f0ce43a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -24,6 +24,8 @@ import org.apache.paimon.utils.Restorable;
/**
* {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
*
+ * <p>NOTE: {@link #checkpoint} will return the next snapshot id.
+ *
* @since 0.4.0
*/
@Public
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
index e19eeef4f..28c2d9fed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
@@ -87,11 +87,9 @@ public class TableStreamingReader {
}
}
- @Nullable
public Iterator<InternalRow> nextBatch() throws Exception {
try {
- DataFilePlan plan = scan.plan();
- return plan == null ? null : read(plan);
+ return read(scan.plan());
} catch (EndOfScanException e) {
throw new IllegalArgumentException(
"TableStreamingReader does not support finished enumerator.", e);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index fed53fd02..63f5f7578 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -20,26 +20,27 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
/** {@link StartingScanner} for the {@link CoreOptions.StartupMode#COMPACTED_FULL} startup mode. */
public class CompactedStartingScanner implements StartingScanner {
private static final Logger LOG = LoggerFactory.getLogger(CompactedStartingScanner.class);
@Override
- public DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
Long startingSnapshotId = snapshotManager.latestCompactedSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no compact snapshot. Waiting for snapshot generation.");
return null;
}
- return new DataTableScan.DataFilePlan(
+ return new Result(
startingSnapshotId,
snapshotSplitReader
.withKind(ScanKind.ALL)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
index aeb5e5fb5..ac957587a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
@@ -49,10 +49,9 @@ public class CompactionChangelogFollowUpScanner implements FollowUpScanner {
}
@Override
- public DataTableScan.DataFilePlan getPlan(
+ public DataTableScan.DataFilePlan scan(
long snapshotId, SnapshotSplitReader snapshotSplitReader) {
return new DataTableScan.DataFilePlan(
- snapshotId,
snapshotSplitReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).splits());
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
index ccba3eef4..aa9e83e26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -45,10 +45,9 @@ public class ContinuousCompactorFollowUpScanner implements FollowUpScanner {
}
@Override
- public DataTableScan.DataFilePlan getPlan(
+ public DataTableScan.DataFilePlan scan(
long snapshotId, SnapshotSplitReader snapshotSplitReader) {
return new DataTableScan.DataFilePlan(
- snapshotId,
snapshotSplitReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).splits());
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
index 0900d9ecf..e92291e5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
@@ -19,13 +19,12 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import javax.annotation.Nullable;
/** {@link StartingScanner} used internally for stand-alone streaming compact job sources. */
public class ContinuousCompactorStartingScanner implements StartingScanner {
@@ -34,8 +33,8 @@ public class ContinuousCompactorStartingScanner implements StartingScanner {
LoggerFactory.getLogger(ContinuousCompactorStartingScanner.class);
@Override
- public DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (latestSnapshotId == null || earliestSnapshotId == null) {
@@ -47,13 +46,13 @@ public class ContinuousCompactorStartingScanner implements StartingScanner {
Snapshot snapshot = snapshotManager.snapshot(id);
if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
LOG.debug("Found latest compact snapshot {}, reading from the next snapshot.", id);
- return new DataTableScan.DataFilePlan(id, Collections.emptyList());
+ return new Result(id);
}
}
LOG.debug(
"No compact snapshot found, reading from the earliest snapshot {}.",
earliestSnapshotId);
- return new DataTableScan.DataFilePlan(earliestSnapshotId - 1, Collections.emptyList());
+ return new Result(earliestSnapshotId - 1);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
index 8b6b88add..355bd6d6f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -19,10 +19,9 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
-import java.util.Collections;
+import javax.annotation.Nullable;
/**
* {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
@@ -37,16 +36,15 @@ public class ContinuousFromSnapshotStartingScanner implements StartingScanner {
}
@Override
- public DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (earliestSnapshotId == null) {
return null;
}
// We should use `snapshotId - 1` here to start to scan delta data from specific snapshot
// id. If the snapshotId < earliestSnapshotId, start to scan from the earliest.
- return new DataTableScan.DataFilePlan(
- snapshotId >= earliestSnapshotId ? snapshotId - 1 : earliestSnapshotId - 1,
- Collections.emptyList());
+ return new Result(
+ snapshotId >= earliestSnapshotId ? snapshotId - 1 : earliestSnapshotId - 1);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 43087ca77..4f6f318e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -19,13 +19,12 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import javax.annotation.Nullable;
/**
* {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
@@ -43,13 +42,13 @@ public class ContinuousFromTimestampStartingScanner implements StartingScanner {
}
@Override
- public DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
Long startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis);
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
return null;
}
- return new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
+ return new Result(startingSnapshotId);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index 11c1441c7..f0c5c6fe1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -19,13 +19,12 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import javax.annotation.Nullable;
/**
* {@link StartingScanner} for the {@link CoreOptions.StartupMode#LATEST} startup mode of a
@@ -37,13 +36,13 @@ public class ContinuousLatestStartingScanner implements StartingScanner {
LoggerFactory.getLogger(ContinuousLatestStartingScanner.class);
@Override
- public DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
return null;
}
- return new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
+ return new Result(startingSnapshotId);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
index 10a698f5b..97e30d141 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
@@ -45,10 +45,9 @@ public class DeltaFollowUpScanner implements FollowUpScanner {
}
@Override
- public DataTableScan.DataFilePlan getPlan(
+ public DataTableScan.DataFilePlan scan(
long snapshotId, SnapshotSplitReader snapshotSplitReader) {
return new DataTableScan.DataFilePlan(
- snapshotId,
snapshotSplitReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).splits());
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
index b34871598..33886e5a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
@@ -27,11 +27,11 @@ public interface FollowUpScanner {
boolean shouldScanSnapshot(Snapshot snapshot);
- DataTableScan.DataFilePlan getPlan(long snapshotId, SnapshotSplitReader snapshotSplitReader);
+ DataTableScan.DataFilePlan scan(long snapshotId, SnapshotSplitReader snapshotSplitReader);
default DataTableScan.DataFilePlan getOverwriteChangesPlan(
long snapshotId, SnapshotSplitReader snapshotSplitReader) {
return new DataTableScan.DataFilePlan(
- snapshotId, snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
+ snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index c73ede04b..fdc93ab5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -20,26 +20,27 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
/** {@link StartingScanner} for the {@link CoreOptions.StartupMode#LATEST_FULL} startup mode. */
public class FullStartingScanner implements StartingScanner {
private static final Logger LOG = LoggerFactory.getLogger(FullStartingScanner.class);
@Override
- public DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
return null;
}
- return new DataTableScan.DataFilePlan(
+ return new Result(
startingSnapshotId,
snapshotSplitReader
.withKind(ScanKind.ALL)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
index 73187c245..c901ef528 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
@@ -45,10 +45,9 @@ public class InputChangelogFollowUpScanner implements FollowUpScanner {
}
@Override
- public DataTableScan.DataFilePlan getPlan(
+ public DataTableScan.DataFilePlan scan(
long snapshotId, SnapshotSplitReader snapshotSplitReader) {
return new DataTableScan.DataFilePlan(
- snapshotId,
snapshotSplitReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).splits());
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 39c73521e..2bedd924a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -19,16 +19,42 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.utils.SnapshotManager;
+import java.util.Collections;
+import java.util.List;
+
/**
* Helper class for the first planning of {@link BatchDataTableScan} and {@link
* StreamDataTableScan}.
*/
public interface StartingScanner {
- DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader);
+ Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader);
+
+ /** Scan result of {@link #scan}. */
+ class Result {
+
+ private final long snapshotId;
+ private final List<DataSplit> splits;
+
+ public Result(long snapshotId) {
+ this(snapshotId, Collections.emptyList());
+ }
+
+ public Result(long snapshotId, List<DataSplit> splits) {
+ this.snapshotId = snapshotId;
+ this.splits = splits;
+ }
+
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ public List<DataSplit> splits() {
+ return splits;
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index b84b3cbe3..a63475035 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -20,9 +20,10 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
/**
* {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
* batch read.
@@ -35,13 +36,13 @@ public class StaticFromSnapshotStartingScanner implements StartingScanner {
}
@Override
- public DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
if (snapshotManager.earliestSnapshotId() == null
|| snapshotId < snapshotManager.earliestSnapshotId()) {
return null;
}
- return new DataTableScan.DataFilePlan(
+ return new Result(
snapshotId,
snapshotSplitReader.withKind(ScanKind.ALL).withSnapshot(snapshotId).splits());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 037a9ce5e..f4437e41b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -45,16 +44,16 @@ public class StaticFromTimestampStartingScanner implements StartingScanner {
}
@Override
- public DataTableScan.DataFilePlan getPlan(
- SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
- Snapshot startingSnapshot = getSnapshot(snapshotManager, startupMillis);
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+ Snapshot startingSnapshot = timeTravelToTimestamp(snapshotManager, startupMillis);
if (startingSnapshot == null) {
LOG.debug(
"There is currently no snapshot earlier than or equal to timestamp[{}]",
startupMillis);
return null;
}
- return new DataTableScan.DataFilePlan(
+ return new Result(
startingSnapshot.id(),
snapshotSplitReader
.withKind(ScanKind.ALL)
@@ -63,7 +62,7 @@ public class StaticFromTimestampStartingScanner implements StartingScanner {
}
@Nullable
- public static Snapshot getSnapshot(SnapshotManager snapshotManager, long timestamp) {
+ public static Snapshot timeTravelToTimestamp(SnapshotManager snapshotManager, long timestamp) {
return snapshotManager.earlierOrEqualTimeMills(timestamp);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 9f1202d40..78536c951 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -231,25 +231,25 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
}
@Override
- public DataTableScan withFilter(Predicate predicate) {
+ public BatchDataTableScan withFilter(Predicate predicate) {
convert(predicate).ifPresent(batchScan::withFilter);
return this;
}
@Override
- public DataTableScan withKind(ScanKind kind) {
+ public BatchDataTableScan withKind(ScanKind kind) {
batchScan.withKind(kind);
return this;
}
@Override
- public DataTableScan withSnapshot(long snapshotId) {
+ public BatchDataTableScan withSnapshot(long snapshotId) {
batchScan.withSnapshot(snapshotId);
return this;
}
@Override
- public DataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+ public BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
batchScan.withLevelFilter(levelFilter);
return this;
}
@@ -274,25 +274,25 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
}
@Override
- public DataTableScan withFilter(Predicate predicate) {
+ public StreamDataTableScan withFilter(Predicate predicate) {
convert(predicate).ifPresent(streamScan::withFilter);
return this;
}
@Override
- public DataTableScan withKind(ScanKind kind) {
+ public StreamDataTableScan withKind(ScanKind kind) {
streamScan.withKind(kind);
return this;
}
@Override
- public DataTableScan withSnapshot(long snapshotId) {
+ public StreamDataTableScan withSnapshot(long snapshotId) {
streamScan.withSnapshot(snapshotId);
return this;
}
@Override
- public DataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+ public StreamDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
streamScan.withLevelFilter(levelFilter);
return this;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 2d2eaad7e..3c213b04c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -52,8 +52,6 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -175,7 +173,6 @@ public class FilesTable implements ReadonlyTable {
return plan.splits.stream().mapToLong(s -> s.files().size()).sum();
}
- @Nullable
private DataTableScan.DataFilePlan dataFilePlan() {
return storeTable.newScan().plan();
}
@@ -237,12 +234,7 @@ public class FilesTable implements ReadonlyTable {
// schema id directly
FieldStatsConverters fieldStatsConverters =
new FieldStatsConverters(
- sid -> schemaManager.schema(sid).fields(),
- dataFilePlan.snapshotId == null
- ? table.schema().id()
- : table.snapshotManager()
- .snapshot(dataFilePlan.snapshotId)
- .schemaId());
+ sid -> schemaManager.schema(sid).fields(), table.schema().id());
RowDataToObjectArrayConverter partitionConverter =
new RowDataToObjectArrayConverter(table.schema().logicalPartitionType());
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index fac1be471..61ece3833 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -457,7 +457,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
}
// no more changelog
- assertThat(scan.plan()).isNull();
+ assertThat(scan.plan().splits()).isEmpty();
// write another commit
StreamTableWrite write = table.newWrite(commitUser);
@@ -472,7 +472,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
assertNextSnapshot.apply(2);
// no more changelog
- assertThat(scan.plan()).isNull();
+ assertThat(scan.plan().splits()).isEmpty();
}
private void writeData() throws Exception {
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
index 14067842c..61e9236ca 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
@@ -54,7 +54,6 @@ public class BatchDataTableScanTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
DataTableScan.DataFilePlan plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(2);
assertThat(getResult(table.newRead(), plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
index 95adb855d..062a50856 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
@@ -48,8 +48,8 @@ public class StreamDataTableScanTest extends ScannerTestBase {
StreamTableCommit commit = table.newCommit(commitUser);
StreamDataTableScan scan = table.newStreamScan();
- // first call without any snapshot, should return null
- assertThat(scan.plan()).isNull();
+ // first call without any snapshot, should return empty plan
+ assertThat(scan.plan().splits()).isEmpty();
// write base data
write.write(rowData(1, 10, 100L));
@@ -64,12 +64,11 @@ public class StreamDataTableScanTest extends ScannerTestBase {
// first call with snapshot, should return complete records from 2nd commit
DataTableScan.DataFilePlan plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(2);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
- // incremental call without new snapshots, should return null
- assertThat(scan.plan()).isNull();
+ // incremental call without new snapshots, should return empty plan
+ assertThat(scan.plan().splits()).isEmpty();
// write incremental data
write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
@@ -85,18 +84,16 @@ public class StreamDataTableScanTest extends ScannerTestBase {
// first incremental call, should return incremental records from 3rd commit
plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(3);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
// second incremental call, should return incremental records from 4th commit
plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(4);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
- // no more new snapshots, should return null
- assertThat(scan.plan()).isNull();
+ // no more new snapshots, should return empty plan
+ assertThat(scan.plan().splits()).isEmpty();
write.close();
commit.close();
@@ -113,8 +110,8 @@ public class StreamDataTableScanTest extends ScannerTestBase {
StreamTableCommit commit = table.newCommit(commitUser);
StreamDataTableScan scan = table.newStreamScan();
- // first call without any snapshot, should return null
- assertThat(scan.plan()).isNull();
+ // first call without any snapshot, should return empty plan
+ assertThat(scan.plan().splits()).isEmpty();
// write base data
write.write(rowData(1, 10, 100L));
@@ -137,12 +134,11 @@ public class StreamDataTableScanTest extends ScannerTestBase {
// first call with snapshot, should return full compacted records from 3rd commit
DataTableScan.DataFilePlan plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(4);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
- // incremental call without new snapshots, should return null
- assertThat(scan.plan()).isNull();
+ // incremental call without new snapshots, should return empty plan
+ assertThat(scan.plan().splits()).isEmpty();
// write incremental data
write.write(rowData(1, 10, 103L));
@@ -150,15 +146,14 @@ public class StreamDataTableScanTest extends ScannerTestBase {
write.write(rowData(1, 50, 500L));
commit.commit(3, write.prepareCommit(true, 3));
- // no new compact snapshots, should return null
- assertThat(scan.plan()).isNull();
+ // no new compact snapshots, should return empty plan
+ assertThat(scan.plan().splits()).isEmpty();
write.compact(binaryRow(1), 0, true);
commit.commit(4, write.prepareCommit(true, 4));
// full compaction done, read new changelog
plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(6);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(
Arrays.asList(
@@ -168,8 +163,8 @@ public class StreamDataTableScanTest extends ScannerTestBase {
"+U 1|20|201",
"+I 1|50|500"));
- // no more new snapshots, should return null
- assertThat(scan.plan()).isNull();
+ // no more new snapshots, should return empty plan
+ assertThat(scan.plan().splits()).isEmpty();
write.close();
commit.close();
@@ -191,7 +186,6 @@ public class StreamDataTableScanTest extends ScannerTestBase {
commit.commit(committable);
DataTableScan.DataFilePlan plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(1);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
@@ -217,10 +211,9 @@ public class StreamDataTableScanTest extends ScannerTestBase {
commit.commit(committable);
DataTableScan.DataFilePlan plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(1);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
- assertThat(scan.plan()).isNull();
+ assertThat(scan.plan().splits()).isEmpty();
write.write(rowData(2, 20, 200L));
committable = new ManifestCommittable(0, 7L);
@@ -228,10 +221,9 @@ public class StreamDataTableScanTest extends ScannerTestBase {
commit.commit(committable);
plan = scan.plan();
- assertThat(plan.snapshotId).isEqualTo(2);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Collections.singletonList("+I 2|20|200"));
- assertThat(scan.plan()).isNull();
+ assertThat(scan.plan().splits()).isEmpty();
write.write(rowData(3, 30, 300L));
committable = new ManifestCommittable(0, 9L);
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index 245fb4c77..42ec5dc76 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -34,7 +33,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class CompactedStartingScannerTest extends ScannerTestBase {
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -57,9 +56,9 @@ public class CompactedStartingScannerTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
CompactedStartingScanner scanner = new CompactedStartingScanner();
- DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(3);
- assertThat(getResult(table.newRead(), plan.splits()))
+ StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+ assertThat(result.snapshotId()).isEqualTo(3);
+ assertThat(getResult(table.newRead(), toSplits(result.splits())))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
write.close();
@@ -70,7 +69,7 @@ public class CompactedStartingScannerTest extends ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
CompactedStartingScanner scanner = new CompactedStartingScanner();
- assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
}
@Test
@@ -87,7 +86,7 @@ public class CompactedStartingScannerTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
CompactedStartingScanner scanner = new CompactedStartingScanner();
- assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
write.close();
commit.close();
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index 43568f72e..bc3ed34d3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -39,7 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class CompactionChangelogFollowUpScannerTest extends ScannerTestBase {
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -79,8 +79,7 @@ public class CompactionChangelogFollowUpScannerTest extends ScannerTestBase {
snapshot = snapshotManager.snapshot(3);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- DataTableScan.DataFilePlan plan = scanner.getPlan(3, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(3);
+ DataTableScan.DataFilePlan plan = scanner.scan(3, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|200", "+I 1|30|300"));
@@ -91,8 +90,7 @@ public class CompactionChangelogFollowUpScannerTest extends ScannerTestBase {
snapshot = snapshotManager.snapshot(5);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- plan = scanner.getPlan(5, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(5);
+ plan = scanner.scan(5, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(
Arrays.asList("-U 1|10|102", "+U 1|10|103", "-D 1|30|300", "+I 1|40|401"));
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 1063d17de..3ee8d0d67 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -46,7 +46,7 @@ public class ContinuousCompactorFollowUpScannerTest extends ScannerTestBase {
private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -84,16 +84,14 @@ public class ContinuousCompactorFollowUpScannerTest extends ScannerTestBase {
Snapshot snapshot = snapshotManager.snapshot(1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- DataTableScan.DataFilePlan plan = scanner.getPlan(1, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(1);
+ DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
snapshot = snapshotManager.snapshot(2);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- plan = scanner.getPlan(2, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(2);
+ plan = scanner.scan(2, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Collections.singletonList("+I 2|2|0|1"));
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
index 7f3402c0b..c62fa0514 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -32,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ContinuousCompactorStartingScannerTest extends ScannerTestBase {
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -60,9 +59,9 @@ public class ContinuousCompactorStartingScannerTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
- DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(3);
- assertThat(plan.splits()).isEmpty();
+ StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+ assertThat(result.snapshotId()).isEqualTo(3);
+ assertThat(result.splits()).isEmpty();
write.close();
commit.close();
@@ -72,6 +71,6 @@ public class ContinuousCompactorStartingScannerTest extends ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
- assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
index 85aedecc5..69dcb55c4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -32,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase {
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -60,9 +59,9 @@ public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase
ContinuousFromTimestampStartingScanner scanner =
new ContinuousFromTimestampStartingScanner(timestamp);
- DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(2);
- assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+ StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+ assertThat(result.snapshotId()).isEqualTo(2);
+ assertThat(getResult(table.newRead(), toSplits(result.splits()))).isEmpty();
write.close();
commit.close();
@@ -73,7 +72,7 @@ public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase
SnapshotManager snapshotManager = table.snapshotManager();
ContinuousFromTimestampStartingScanner scanner =
new ContinuousFromTimestampStartingScanner(System.currentTimeMillis());
- assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
}
@Test
@@ -93,9 +92,9 @@ public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase
ContinuousFromTimestampStartingScanner scanner =
new ContinuousFromTimestampStartingScanner(timestamp);
- DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(0);
- assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+ StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+ assertThat(result.snapshotId()).isEqualTo(0);
+ assertThat(getResult(table.newRead(), toSplits(result.splits()))).isEmpty();
write.close();
commit.close();
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
index ba218410c..33ea201b7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -32,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ContinuousLatestStartingScannerTest extends ScannerTestBase {
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -50,9 +49,9 @@ public class ContinuousLatestStartingScannerTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
ContinuousLatestStartingScanner scanner = new ContinuousLatestStartingScanner();
- DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(2);
- assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+ StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+ assertThat(result.snapshotId()).isEqualTo(2);
+ assertThat(getResult(table.newRead(), toSplits(result.splits()))).isEmpty();
write.close();
commit.close();
@@ -62,6 +61,6 @@ public class ContinuousLatestStartingScannerTest extends ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
ContinuousLatestStartingScanner scanner = new ContinuousLatestStartingScanner();
- assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
index 6edc68669..116b17702 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
@@ -36,7 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class DeltaFollowUpScannerTest extends ScannerTestBase {
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -61,16 +61,14 @@ public class DeltaFollowUpScannerTest extends ScannerTestBase {
Snapshot snapshot = snapshotManager.snapshot(1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- DataTableScan.DataFilePlan plan = scanner.getPlan(1, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(1);
+ DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 1|40|400"));
snapshot = snapshotManager.snapshot(2);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- plan = scanner.getPlan(2, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(2);
+ plan = scanner.scan(2, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|30|300", "-D 1|40|400"));
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
index 0ecc7260d..7ce631d20 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -34,7 +33,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class FullStartingScannerTest extends ScannerTestBase {
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -52,9 +51,9 @@ public class FullStartingScannerTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
FullStartingScanner scanner = new FullStartingScanner();
- DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(2);
- assertThat(getResult(table.newRead(), plan.splits()))
+ StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+ assertThat(result.snapshotId()).isEqualTo(2);
+ assertThat(getResult(table.newRead(), toSplits(result.splits())))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
write.close();
@@ -65,6 +64,6 @@ public class FullStartingScannerTest extends ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
FullStartingScanner scanner = new FullStartingScanner();
- assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
index 69fac80f8..98a0bd20b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
@@ -39,7 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class InputChangelogFollowUpScannerTest extends ScannerTestBase {
@Test
- public void testGetPlan() throws Exception {
+ public void testScan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -64,16 +64,14 @@ public class InputChangelogFollowUpScannerTest extends ScannerTestBase {
Snapshot snapshot = snapshotManager.snapshot(1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- DataTableScan.DataFilePlan plan = scanner.getPlan(1, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(1);
+ DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 1|40|400"));
snapshot = snapshotManager.snapshot(2);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- plan = scanner.getPlan(2, snapshotSplitReader);
- assertThat(plan.snapshotId).isEqualTo(2);
+ plan = scanner.scan(2, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(
Arrays.asList("+I 1|10|101", "+I 1|30|300", "+I 1|10|102", "-D 1|40|400"));
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index c265141f2..313585606 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -34,6 +34,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
@@ -137,4 +138,8 @@ public abstract class ScannerTestBase {
""));
return FileStoreTableFactory.create(fileIO, tablePath, tableSchema, conf);
}
+
+ protected List<Split> toSplits(List<DataSplit> dataSplits) {
+ return new ArrayList<>(dataSplits);
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index c6ed73628..748850284 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -194,7 +194,7 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
private void refresh() throws Exception {
while (true) {
Iterator<InternalRow> batch = streamingReader.nextBatch();
- if (batch == null) {
+ if (!batch.hasNext()) {
return;
}
this.lookupTable.refresh(batch);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index da68d09b9..731d2d787 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -116,6 +116,7 @@ public class CompactorSourceBuilder {
null,
partitionPredicate,
null,
+ // static compactor source will compact all current files
table -> table.newScan().withStartingScanner(new FullStartingScanner()));
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index c18458908..4bdf75f7f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -41,7 +42,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -62,18 +62,18 @@ public class ContinuousFileSplitEnumerator
private final FileStoreSourceSplitGenerator splitGenerator;
- private final Callable<DataFilePlan> callable;
+ private final StreamDataTableScan scan;
- private Long nextSnapshotId;
+ @Nullable private Long nextSnapshotId;
private boolean finished = false;
public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> remainSplits,
- Long nextSnapshotId,
+ @Nullable Long nextSnapshotId,
long discoveryInterval,
- Callable<DataFilePlan> callable) {
+ StreamDataTableScan scan) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.bucketSplits = new HashMap<>();
@@ -82,7 +82,7 @@ public class ContinuousFileSplitEnumerator
this.discoveryInterval = discoveryInterval;
this.readersAwaitingSplit = new HashSet<>();
this.splitGenerator = new FileStoreSourceSplitGenerator();
- this.callable = callable;
+ this.scan = scan;
}
private void addSplits(Collection<FileStoreSourceSplit> splits) {
@@ -107,7 +107,7 @@ public class ContinuousFileSplitEnumerator
@Override
public void start() {
- context.callAsync(callable, this::processDiscoveredSplits, 0, discoveryInterval);
+ context.callAsync(scan::plan, this::processDiscoveredSplits, 0, discoveryInterval);
}
@Override
@@ -167,7 +167,7 @@ public class ContinuousFileSplitEnumerator
return;
}
- nextSnapshotId = plan.snapshotId + 1;
+ nextSnapshotId = scan.checkpoint();
addSplits(splitGenerator.createSplits(plan));
assignSplits();
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 719477542..909a736fe 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -85,7 +85,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
splits,
nextSnapshotId,
table.coreOptions().continuousDiscoveryInterval().toMillis(),
- scanFactory.create(table, nextSnapshotId).withFilter(predicate)::plan);
+ scanFactory.create(table, nextSnapshotId).withFilter(predicate));
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index d9bdee83a..0876926a5 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -132,7 +132,8 @@ public class FlinkSourceBuilder {
return logSourceProvider.createSource(null);
}
return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
- buildStaticFileSource())
+ LogHybridSourceFactory.buildHybridFirstSource(
+ table, projectedFields, predicate))
.addSource(
new LogHybridSourceFactory(logSourceProvider),
Boundedness.CONTINUOUS_UNBOUNDED)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index bb49967e2..ef2dac7c1 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -19,13 +19,27 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.base.source.hybrid.HybridSource.SourceFactory;
import org.apache.flink.table.data.RowData;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
import java.util.Map;
/** Log {@link SourceFactory} from {@link StaticFileStoreSplitEnumerator}. */
@@ -49,4 +63,71 @@ public class LogHybridSourceFactory
}
return provider.createSource(logOffsets);
}
+
+ public static FlinkSource buildHybridFirstSource(
+ Table table, @Nullable int[][] projectedFields, @Nullable Predicate predicate) {
+ if (!(table instanceof DataTable)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "FlinkHybridFirstSource only accepts DataTable. Unsupported table type: '%s'.",
+ table.getClass().getSimpleName()));
+ }
+
+ DataTable dataTable = (DataTable) table;
+
+ return new FlinkHybridFirstSource(
+ table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
+ dataTable.snapshotManager(),
+ dataTable.coreOptions().toConfiguration());
+ }
+
+ /** The first source of a log {@link HybridSource}. */
+ private static class FlinkHybridFirstSource extends FlinkSource {
+
+ private static final long serialVersionUID = 3L;
+
+ private final SnapshotManager snapshotManager;
+ private final Options options;
+
+ public FlinkHybridFirstSource(
+ ReadBuilder readBuilder, SnapshotManager snapshotManager, Options options) {
+ super(readBuilder, null);
+ this.snapshotManager = snapshotManager;
+ this.options = options;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+ SplitEnumeratorContext<FileStoreSourceSplit> context,
+ PendingSplitsCheckpoint checkpoint) {
+ Long snapshotId = null;
+ Collection<FileStoreSourceSplit> splits;
+ if (checkpoint == null) {
+ FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
+ // get snapshot id and splits from scan
+ StreamTableScan scan = readBuilder.newStreamScan();
+ splits = splitGenerator.createSplits(scan.plan());
+ Long nextSnapshotId = scan.checkpoint();
+ if (nextSnapshotId != null) {
+ snapshotId = nextSnapshotId - 1;
+ }
+ } else {
+ // restore from checkpoint
+ snapshotId = checkpoint.currentSnapshotId();
+ splits = checkpoint.splits();
+ }
+
+ Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
+ return new StaticFileStoreSplitEnumerator(
+ context,
+ snapshot,
+ splits,
+ options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index ae601717a..3af455a67 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,13 +18,11 @@
package org.apache.paimon.flink.source;
-import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.source.BatchDataTableScan;
import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -32,7 +30,6 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.Collection;
/** Bounded {@link FlinkSource} for reading records. It does not monitor new snapshots. */
@@ -73,31 +70,21 @@ public class StaticFileStoreSource extends FlinkSource {
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
- SnapshotManager snapshotManager = table.snapshotManager();
-
- Long snapshotId = null;
Collection<FileStoreSourceSplit> splits;
if (checkpoint == null) {
- splits = new ArrayList<>();
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
-
// read all splits from scan
DataTableScan.DataFilePlan plan =
scanFactory.create(table).withFilter(predicate).plan();
- if (plan != null) {
- snapshotId = plan.snapshotId;
- splits.addAll(splitGenerator.createSplits(plan));
- }
+ splits = splitGenerator.createSplits(plan);
} else {
// restore from checkpoint
- snapshotId = checkpoint.currentSnapshotId();
splits = checkpoint.splits();
}
- Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
return new StaticFileStoreSplitEnumerator(
context,
- snapshot,
+ null,
splits,
table.coreOptions()
.toConfiguration()
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 1cc40d216..e6f2e85a9 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -148,7 +148,6 @@ public class CompactActionITCase extends ActionITCaseBase {
// no full compaction has happened, so plan should be empty
StreamDataTableScan scan = table.newStreamScan();
DataTableScan.DataFilePlan plan = scan.plan();
- Assertions.assertEquals(1, (long) plan.snapshotId);
Assertions.assertTrue(plan.splits().isEmpty());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 742621f31..a06cd198e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -19,9 +19,16 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
+import org.apache.paimon.table.source.snapshot.FollowUpScanner;
+import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
@@ -34,7 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
@@ -149,21 +155,13 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(1, "test-host");
Queue<DataFilePlan> results = new LinkedBlockingQueue<>();
- Callable<DataFilePlan> callable =
- () -> {
- DataFilePlan plan = results.poll();
- if (plan == null) {
- throw new EndOfScanException();
- }
- return plan;
- };
-
+ StreamDataTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
.setSplitEnumeratorContext(context)
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
- .setCallable(callable)
+ .setScan(scan)
.build();
enumerator.start();
@@ -172,7 +170,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.add(new DataFilePlan(snapshot, splits));
+ results.add(new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -228,9 +226,8 @@ public class ContinuousFileSplitEnumeratorTest {
private static class Builder {
private SplitEnumeratorContext<FileStoreSourceSplit> context;
private Collection<FileStoreSourceSplit> initialSplits = Collections.emptyList();
- private Long nextSnapshotId;
private long discoveryInterval = Long.MAX_VALUE;
- private Callable<DataFilePlan> callable;
+ private StreamDataTableScan scan;
public Builder setSplitEnumeratorContext(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
@@ -243,24 +240,89 @@ public class ContinuousFileSplitEnumeratorTest {
return this;
}
- public Builder setNextSnapshotId(Long nextSnapshotId) {
- this.nextSnapshotId = nextSnapshotId;
- return this;
- }
-
public Builder setDiscoveryInterval(long discoveryInterval) {
this.discoveryInterval = discoveryInterval;
return this;
}
- public Builder setCallable(Callable<DataFilePlan> callable) {
- this.callable = callable;
+ public Builder setScan(StreamDataTableScan scan) {
+ this.scan = scan;
return this;
}
public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
- context, initialSplits, nextSnapshotId, discoveryInterval, callable);
+ context, initialSplits, null, discoveryInterval, scan);
+ }
+ }
+
+ private static class MockScan implements StreamDataTableScan {
+ private final Queue<DataFilePlan> results;
+
+ public MockScan(Queue<DataFilePlan> results) {
+ this.results = results;
+ }
+
+ @Override
+ public StreamDataTableScan withKind(ScanKind kind) {
+ return null;
+ }
+
+ @Override
+ public StreamDataTableScan withSnapshot(long snapshotId) {
+ return null;
+ }
+
+ @Override
+ public StreamDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+ return null;
+ }
+
+ @Override
+ public StreamDataTableScan withFilter(Predicate predicate) {
+ return null;
}
+
+ @Override
+ public DataFilePlan plan() {
+ DataFilePlan plan = results.poll();
+ if (plan == null) {
+ throw new EndOfScanException();
+ }
+ return plan;
+ }
+
+ @Override
+ public boolean supportStreamingReadOverwrite() {
+ return false;
+ }
+
+ @Override
+ public StreamDataTableScan withStartingScanner(StartingScanner startingScanner) {
+ return null;
+ }
+
+ @Override
+ public StreamDataTableScan withFollowUpScanner(FollowUpScanner followUpScanner) {
+ return null;
+ }
+
+ @Override
+ public StreamDataTableScan withBoundedChecker(BoundedChecker boundedChecker) {
+ return null;
+ }
+
+ @Override
+ public StreamDataTableScan withSnapshotStarting() {
+ return null;
+ }
+
+ @Override
+ public Long checkpoint() {
+ return null;
+ }
+
+ @Override
+ public void restore(Long state) {}
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 4863bb66a..45c53d18c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -80,7 +80,7 @@ public class FileStoreSourceSplitGeneratorTest {
false,
Collections::singletonList,
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD)));
- DataTableScan.DataFilePlan tableScanPlan = new DataTableScan.DataFilePlan(1L, scanSplits);
+ DataTableScan.DataFilePlan tableScanPlan = new DataTableScan.DataFilePlan(scanSplits);
List<FileStoreSourceSplit> splits =
new FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 98ca11288..f707ba843 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.ReadBuilder;
@@ -43,8 +42,6 @@ import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.Optional;
/**
@@ -58,11 +55,7 @@ public class PaimonInputFormat implements InputFormat<Void, RowDataContainer> {
FileStoreTable table = createFileStoreTable(jobConf);
DataTableScan scan = table.newScan();
createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
-
- // TODO: Roll back modification after refactoring scan interface
- DataTableScan.DataFilePlan plan = scan.plan();
- List<DataSplit> splits = plan == null ? Collections.emptyList() : plan.splits;
- return splits.stream()
+ return scan.plan().splits.stream()
.map(split -> new PaimonInputSplit(table.location().toString(), split))
.toArray(PaimonInputSplit[]::new);
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index ecf0c6de2..1c5c7b50a 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -20,7 +20,6 @@ package org.apache.paimon.spark;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableScan;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
@@ -30,7 +29,6 @@ import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.types.StructType;
-import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
@@ -79,9 +77,7 @@ public class SparkScan implements Scan, SupportsReportStatistics {
protected List<Split> splits() {
if (splits == null) {
- // TODO: Roll back modification after refactoring scan interface
- TableScan.Plan plan = readBuilder.newScan().plan();
- splits = plan == null ? Collections.emptyList() : plan.splits();
+ splits = readBuilder.newScan().plan().splits();
}
return splits;
}