You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "JingsongLi (via GitHub)" <gi...@apache.org> on 2023/03/27 11:58:05 UTC

[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #698: [core][bug] TableScan#plan should not return null

JingsongLi commented on code in PR #698:
URL: https://github.com/apache/incubator-paimon/pull/698#discussion_r1149206451


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java:
##########
@@ -75,19 +75,16 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             PendingSplitsCheckpoint checkpoint) {
         SnapshotManager snapshotManager = table.snapshotManager();
 
-        Long snapshotId = null;
+        Long snapshotId;
         Collection<FileStoreSourceSplit> splits;
         if (checkpoint == null) {
-            splits = new ArrayList<>();
             FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
-
             // read all splits from scan
             DataTableScan.DataFilePlan plan =
                     scanFactory.create(table).withFilter(predicate).plan();
-            if (plan != null) {
-                snapshotId = plan.snapshotId;
-                splits.addAll(splitGenerator.createSplits(plan));
-            }
+            snapshotId =

Review Comment:
   snapshotId only needs by `LogHybridSourceFactory`, we can just introduce a `FlinkSource` only for `LogHybridSourceFactory`:
   ```
   public static FlinkSource buildHybridSource(
               Table table,
               @Nullable int[][] projectedFields,
               @Nullable Predicate predicate) {
           if (!(table instanceof DataTable)) {
               throw new UnsupportedOperationException("Only support DataTable, unsupported table: " + table.getClass());
           }
   
           DataTable dataTable = (DataTable) table;
   
           return new FlinkHybridSource(
                   table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
                   dataTable.snapshotManager(),
                   dataTable.coreOptions().toConfiguration());
       }
   
       private static class FlinkHybridSource extends FlinkSource {
   
           private static final long serialVersionUID = 3L;
   
           private final SnapshotManager snapshotManager;
           private final Options options;
   
           public FlinkHybridSource(ReadBuilder readBuilder, SnapshotManager snapshotManager, Options options) {
               super(readBuilder, null);
               this.snapshotManager = snapshotManager;
               this.options = options;
           }
   
           @Override
           public Boundedness getBoundedness() {
               return Boundedness.BOUNDED;
           }
   
           @Override
           public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
                   SplitEnumeratorContext<FileStoreSourceSplit> context,
                   PendingSplitsCheckpoint checkpoint) {
               Long snapshotId = null;
               Collection<FileStoreSourceSplit> splits;
               if (checkpoint == null) {
                   splits = new ArrayList<>();
                   FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
   
                   // read all splits from scan
                   StreamTableScan scan = readBuilder.newStreamScan();
                   TableScan.Plan plan = scan.plan();
                   if (plan != null) {
                       snapshotId = scan.checkpoint();
                       splits.addAll(splitGenerator.createSplits(plan));
                   }
               } else {
                   // restore from checkpoint
                   snapshotId = checkpoint.currentSnapshotId();
                   splits = checkpoint.splits();
               }
   
               Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
               return new StaticFileStoreSplitEnumerator(
                       context,
                       snapshot,
                       splits,
                       options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org