You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "JingsongLi (via GitHub)" <gi...@apache.org> on 2023/03/27 11:58:05 UTC
[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #698: [core][bug] TableScan#plan should not return null
JingsongLi commented on code in PR #698:
URL: https://github.com/apache/incubator-paimon/pull/698#discussion_r1149206451
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java:
##########
@@ -75,19 +75,16 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
PendingSplitsCheckpoint checkpoint) {
SnapshotManager snapshotManager = table.snapshotManager();
- Long snapshotId = null;
+ Long snapshotId;
Collection<FileStoreSourceSplit> splits;
if (checkpoint == null) {
- splits = new ArrayList<>();
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
-
// read all splits from scan
DataTableScan.DataFilePlan plan =
scanFactory.create(table).withFilter(predicate).plan();
- if (plan != null) {
- snapshotId = plan.snapshotId;
- splits.addAll(splitGenerator.createSplits(plan));
- }
+ snapshotId =
Review Comment:
snapshotId only needs by `LogHybridSourceFactory`, we can just introduce a `FlinkSource` only for `LogHybridSourceFactory`:
```
public static FlinkSource buildHybridSource(
Table table,
@Nullable int[][] projectedFields,
@Nullable Predicate predicate) {
if (!(table instanceof DataTable)) {
throw new UnsupportedOperationException("Only support DataTable, unsupported table: " + table.getClass());
}
DataTable dataTable = (DataTable) table;
return new FlinkHybridSource(
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
dataTable.snapshotManager(),
dataTable.coreOptions().toConfiguration());
}
private static class FlinkHybridSource extends FlinkSource {
private static final long serialVersionUID = 3L;
private final SnapshotManager snapshotManager;
private final Options options;
public FlinkHybridSource(ReadBuilder readBuilder, SnapshotManager snapshotManager, Options options) {
super(readBuilder, null);
this.snapshotManager = snapshotManager;
this.options = options;
}
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
@Override
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
Long snapshotId = null;
Collection<FileStoreSourceSplit> splits;
if (checkpoint == null) {
splits = new ArrayList<>();
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
// read all splits from scan
StreamTableScan scan = readBuilder.newStreamScan();
TableScan.Plan plan = scan.plan();
if (plan != null) {
snapshotId = scan.checkpoint();
splits.addAll(splitGenerator.createSplits(plan));
}
} else {
// restore from checkpoint
snapshotId = checkpoint.currentSnapshotId();
splits = checkpoint.splits();
}
Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
return new StaticFileStoreSplitEnumerator(
context,
snapshot,
splits,
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org