You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/18 03:49:48 UTC

[GitHub] [flink-table-store] JingsongLi opened a new pull request, #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

JingsongLi opened a new pull request, #388:
URL: https://github.com/apache/flink-table-store/pull/388

   The file store can find a suitable snapshot according to start timestamp and read incremental data from it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027644999


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java:
##########
@@ -114,6 +121,55 @@ public void testContinuousLatest() throws TimeoutException {
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", "11", "12"));
     }
 
+    @Test
+    public void testContinuousFromTimestamp() throws Exception {
+        String sql =
+                "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */";
+
+        // empty table
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(streamSqlIter(sql, 0));
+        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+        iterator.close();
+
+        SnapshotManager snapshotManager =
+                new SnapshotManager(
+                        new Path(path, "default_catalog.catalog/default_database.db/T1"));
+        List<Snapshot> snapshots =
+                new ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
+        snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
+        Snapshot first = snapshots.get(0);
+        Snapshot second = snapshots.get(1);
+
+        // before second snapshot
+        iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() - 1));
+        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+        assertThat(iterator.collect(3))
+                .containsExactlyInAnyOrder(
+                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), Row.of("13", "14", "15"));
+        iterator.close();
+
+        // from second snapshot
+        iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis()));
+        assertThat(iterator.collect(3))
+                .containsExactlyInAnyOrder(
+                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), Row.of("13", "14", "15"));
+        iterator.close();
+
+        // from start
+        iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() - 1));

Review Comment:
   Good~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027901067


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java:
##########
@@ -99,11 +103,48 @@ public static class EnumeratorResult {
 
         public final long snapshotId;
 
-        public final DataTableScan.DataFilePlan plan;
+        public final DataFilePlan plan;
 
-        private EnumeratorResult(long snapshotId, DataTableScan.DataFilePlan plan) {
+        private EnumeratorResult(long snapshotId, DataFilePlan plan) {
             this.snapshotId = snapshotId;
             this.plan = plan;
         }
     }
+
+    /** Startup snapshot enumerator, this is the first plan for continuous reading. */
+    public static DataFilePlan startup(DataTableScan scan) {
+        CoreOptions options = scan.options();
+        SnapshotManager snapshotManager = scan.snapshotManager();
+        CoreOptions.LogStartupMode startupMode = options.logStartupMode();
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (options.changelogProducer() == FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(options.numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:
+                return new DataFilePlan(
+                        snapshotManager.latestSnapshotId(), Collections.emptyList());
+            case FROM_TIMESTAMP:
+                Long timestampMills = options.logScanTimestampMills();
+                if (timestampMills == null) {

Review Comment:
   I will introduce `validateOptions` in `FileStoreTableFactory`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027638918


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {

Review Comment:
   I think we can extract this logical to `SnapshotEnumerator.startup`.
   We don't need to introduce `DataFilePlanBuilder`, it is just a simple call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027644463


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (table.options().changelogProducer()
+                        == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(table.options().numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:
+                return new DataFilePlan(
+                        snapshotManager.latestSnapshotId(), Collections.emptyList());
+            case FROM_TIMESTAMP:

Review Comment:
   Good point, I think we can improve this later, because now we can not unset option in Flink SQL, we can wait for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #388:
URL: https://github.com/apache/flink-table-store/pull/388


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1026090480


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java:
##########
@@ -150,20 +153,20 @@ private FileStoreSource buildFileSource(boolean isContinuous, boolean continuous
 
             LogStartupMode startupMode = conf.get(LOG_SCAN);
             if (logSourceProvider == null) {
-                return buildFileSource(true, startupMode == LogStartupMode.LATEST);
+                return buildFileSource(true, startupMode, conf.get(LOG_SCAN_TIMESTAMP_MILLS));
             } else {
                 if (startupMode != LogStartupMode.FULL) {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
-                                buildFileSource(false, false))
+                                buildFileSource(false, startupMode, null))
                         .addSource(
                                 new LogHybridSourceFactory(logSourceProvider),
                                 Boundedness.CONTINUOUS_UNBOUNDED)
                         .build();
             }
         } else {
-            return buildFileSource(false, false);
+            return buildFileSource(false, LogStartupMode.FULL, null);

Review Comment:
   Here should ignore it. `log.scan` should not affect batch mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zjureel commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
zjureel commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1026084973


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,22 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long earlierThanTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();
+            if (commitTime < timestampMills) {
+                return i;
+            }
+        }
+        return earliest - 1;

Review Comment:
   Why we return `earliest -1` here instead of `earliest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1026091038


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,22 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long earlierThanTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();
+            if (commitTime < timestampMills) {
+                return i;
+            }
+        }
+        return earliest - 1;

Review Comment:
   `earlierThanTimeMills`.
   Here should return previous snapshot for this time mills.
   And then use this `earlierSnapshot + 1` to read incremental data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027877730


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java:
##########
@@ -99,11 +103,48 @@ public static class EnumeratorResult {
 
         public final long snapshotId;
 
-        public final DataTableScan.DataFilePlan plan;
+        public final DataFilePlan plan;
 
-        private EnumeratorResult(long snapshotId, DataTableScan.DataFilePlan plan) {
+        private EnumeratorResult(long snapshotId, DataFilePlan plan) {
             this.snapshotId = snapshotId;
             this.plan = plan;
         }
     }
+
+    /** Startup snapshot enumerator, this is the first plan for continuous reading. */
+    public static DataFilePlan startup(DataTableScan scan) {
+        CoreOptions options = scan.options();
+        SnapshotManager snapshotManager = scan.snapshotManager();
+        CoreOptions.LogStartupMode startupMode = options.logStartupMode();
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (options.changelogProducer() == FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(options.numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:
+                return new DataFilePlan(
+                        snapshotManager.latestSnapshotId(), Collections.emptyList());
+            case FROM_TIMESTAMP:
+                Long timestampMills = options.logScanTimestampMills();
+                if (timestampMills == null) {

Review Comment:
   Could the `LOG_SCAN_TIMESTAMP_MILLS ` validation move before creating source?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027507379


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,22 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long earlierThanTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();
+            if (commitTime < timestampMills) {
+                return i;
+            }
+        }
+        return earliest - 1;

Review Comment:
   The earliest snapshots here refer to those that have not expired.
   earliest should be changed after expiration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1026074919


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java:
##########
@@ -150,20 +153,20 @@ private FileStoreSource buildFileSource(boolean isContinuous, boolean continuous
 
             LogStartupMode startupMode = conf.get(LOG_SCAN);
             if (logSourceProvider == null) {
-                return buildFileSource(true, startupMode == LogStartupMode.LATEST);
+                return buildFileSource(true, startupMode, conf.get(LOG_SCAN_TIMESTAMP_MILLS));
             } else {
                 if (startupMode != LogStartupMode.FULL) {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
-                                buildFileSource(false, false))
+                                buildFileSource(false, startupMode, null))
                         .addSource(
                                 new LogHybridSourceFactory(logSourceProvider),
                                 Boundedness.CONTINUOUS_UNBOUNDED)
                         .build();
             }
         } else {
-            return buildFileSource(false, false);
+            return buildFileSource(false, LogStartupMode.FULL, null);

Review Comment:
   Assert that startup mode is `FULL`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java:
##########
@@ -150,20 +153,20 @@ private FileStoreSource buildFileSource(boolean isContinuous, boolean continuous
 
             LogStartupMode startupMode = conf.get(LOG_SCAN);
             if (logSourceProvider == null) {
-                return buildFileSource(true, startupMode == LogStartupMode.LATEST);
+                return buildFileSource(true, startupMode, conf.get(LOG_SCAN_TIMESTAMP_MILLS));

Review Comment:
   In previous table store versions, when a table property starts with `log.`, it is meant to control the behavior of Kafka. Now you're using these properties to control both Kafka and file store.
   
   You might need to clarify some concepts. What is a "log"? Is it only about Kafka? Shall we also consider changelog produced by file store as log?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027010517


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (table.options().changelogProducer()
+                        == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(table.options().numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:
+                return new DataFilePlan(
+                        snapshotManager.latestSnapshotId(), Collections.emptyList());
+            case FROM_TIMESTAMP:

Review Comment:
   Could the `log.scan` and `log.scan.timestamp-millis` merge to `log.scan`? If the `log.scan` is configured to certain timestamp, this means the `startUpMode` is `FROM_TIMESTAMP`, which solution is referred to `read.start-commit` in Hudi.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027009308


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (table.options().changelogProducer()
+                        == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(table.options().numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:

Review Comment:
   Could the `LATEST` change to `INCREMENTAL` or the `FULL` change to `EARLIEST`? IMO, the `LATEST` and `EARLIEST` are a pair, meanwhile the `INCREMENTAL` and `FULL` are a pair. At present, the `FULL` and `LATEST` are abrupt for users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zjureel commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
zjureel commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1026091394


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,22 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long earlierThanTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();

Review Comment:
   Is the snapshot id sure to be continuous? Otherwise we need to check if the snapshot exists



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zjureel commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
zjureel commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1026103960


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,22 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long earlierThanTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();
+            if (commitTime < timestampMills) {
+                return i;
+            }
+        }
+        return earliest - 1;

Review Comment:
   Nice, we can doc it, otherwise it may cause misunderstanding by other ones



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027005215


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {

Review Comment:
   Is it better to introduce `DataFilePlanBuilder` to create `DataFilePlan` for different startup mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1026088012


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java:
##########
@@ -150,20 +153,20 @@ private FileStoreSource buildFileSource(boolean isContinuous, boolean continuous
 
             LogStartupMode startupMode = conf.get(LOG_SCAN);
             if (logSourceProvider == null) {
-                return buildFileSource(true, startupMode == LogStartupMode.LATEST);
+                return buildFileSource(true, startupMode, conf.get(LOG_SCAN_TIMESTAMP_MILLS));

Review Comment:
   > Shall we also consider changelog produced by file store as log?
   
   Yes.
   
   We have been using `log` on incremental reading of files, such as `log.scan` for full and latest.
   
   You can think log as changelog. It is not just for kafka.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027654165


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (table.options().changelogProducer()
+                        == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(table.options().numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:
+                return new DataFilePlan(
+                        snapshotManager.latestSnapshotId(), Collections.emptyList());
+            case FROM_TIMESTAMP:

Review Comment:
   I created https://issues.apache.org/jira/browse/FLINK-30110 for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1026107900


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,22 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long earlierThanTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();

Review Comment:
   Should be continuous, this is by design.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027639409


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (table.options().changelogProducer()
+                        == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(table.options().numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:

Review Comment:
   This is from kafka connector.
   I think `INCREMENTAL` contains `LATEST` `EARLIEST` `FROM-TIMESTAMP`....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027011255


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java:
##########
@@ -114,6 +121,55 @@ public void testContinuousLatest() throws TimeoutException {
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", "11", "12"));
     }
 
+    @Test
+    public void testContinuousFromTimestamp() throws Exception {
+        String sql =
+                "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */";
+
+        // empty table
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(streamSqlIter(sql, 0));
+        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+        iterator.close();
+
+        SnapshotManager snapshotManager =
+                new SnapshotManager(
+                        new Path(path, "default_catalog.catalog/default_database.db/T1"));
+        List<Snapshot> snapshots =
+                new ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
+        snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
+        Snapshot first = snapshots.get(0);
+        Snapshot second = snapshots.get(1);
+
+        // before second snapshot
+        iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() - 1));
+        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+        assertThat(iterator.collect(3))
+                .containsExactlyInAnyOrder(
+                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), Row.of("13", "14", "15"));
+        iterator.close();
+
+        // from second snapshot
+        iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis()));
+        assertThat(iterator.collect(3))
+                .containsExactlyInAnyOrder(
+                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), Row.of("13", "14", "15"));
+        iterator.close();
+
+        // from start
+        iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() - 1));

Review Comment:
   Could the test case add the case from end?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027015305


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,22 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long earlierThanTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();
+            if (commitTime < timestampMills) {
+                return i;
+            }
+        }
+        return earliest - 1;

Review Comment:
   @JingsongLi, would the snapshot be archived? If the snapshot is archived, the earliest snapshot may not be read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027645758


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (table.options().changelogProducer()
+                        == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(table.options().numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:
+                return new DataFilePlan(
+                        snapshotManager.latestSnapshotId(), Collections.emptyList());
+            case FROM_TIMESTAMP:

Review Comment:
   Maybe we can let the default value of `log.scan` is none, and explain in comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #388: [FLINK-29232] File store continuous reading support from_timestamp scan mode

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #388:
URL: https://github.com/apache/flink-table-store/pull/388#discussion_r1027645758


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -130,4 +119,42 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
+
+    private DataFilePlan batchScanSplits(DataTableScan scan) {
+        return scan.plan();
+    }
+
+    private DataFilePlan continuousFirstScanSplits(
+            DataTableScan scan, SnapshotManager snapshotManager) {
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (table.options().changelogProducer()
+                        == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max level.
+                    plan = scan.withLevel(table.options().numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:
+                return new DataFilePlan(
+                        snapshotManager.latestSnapshotId(), Collections.emptyList());
+            case FROM_TIMESTAMP:

Review Comment:
   Maybe we can let the default value of `log.scan` is none, and explain in documentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org