You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tsreaper (via GitHub)" <gi...@apache.org> on 2023/03/15 02:20:25 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request, #605: [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite

tsreaper opened a new pull request, #605:
URL: https://github.com/apache/flink-table-store/pull/605

   Currently `Table` and `TableWrite` in Flink Table Store have a fixed schema. However to consume schema changes, Flink Table Store CDC sinks should have the ability to change its schema during a streaming job.
   
   This require us to pause and store the states of a `TableWrite`, then create a `TableWrite` with newer schema and recover the states in the new `TableWrite`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi merged pull request #605: [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #605:
URL: https://github.com/apache/incubator-paimon/pull/605


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #605: [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #605:
URL: https://github.com/apache/flink-table-store/pull/605#discussion_r1136499298


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java:
##########
@@ -232,50 +210,131 @@ public void close() throws Exception {
         }
     }
 
+    @Override
+    public List<State> extractStateAndClose() throws Exception {

Review Comment:
   Can we just have two methods: `checkpoint` and `close`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java:
##########
@@ -51,6 +50,9 @@
      */
     void addNewFiles(List<DataFileMeta> files);
 
+    /** Get all files maintained by this writer. */
+    List<DataFileMeta> allFiles();

Review Comment:
   allFiles ->  dataFiles?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java:
##########
@@ -232,50 +210,131 @@ public void close() throws Exception {
         }
     }
 
+    @Override
+    public List<State> extractStateAndClose() throws Exception {
+        List<State> result = new ArrayList<>();
+        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitionEntry :
+                writers.entrySet()) {
+            BinaryRow partition = partitionEntry.getKey();
+            for (Map.Entry<Integer, WriterContainer<T>> bucketEntry :
+                    partitionEntry.getValue().entrySet()) {
+                int bucket = bucketEntry.getKey();
+                WriterContainer<T> writerContainer = bucketEntry.getValue();
+                CommitIncrement increment = writerContainer.writer.extractStateAndClose();
+                // writer.allFiles() must be fetched after writer.extractStateAndClose(), because
+                // compaction result might be updated when closing a writer
+                List<DataFileMeta> allFiles = writerContainer.writer.allFiles();
+                result.add(
+                        new State(
+                                partition,
+                                bucket,
+                                writerContainer.baseSnapshotId,
+                                writerContainer.lastModifiedCommitIdentifier,
+                                allFiles,
+                                increment));
+            }
+        }
+        writers.clear();
+        if (lazyCompactExecutor != null) {
+            lazyCompactExecutor.shutdownNow();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Extracted state " + result.toString());
+        }
+        return result;
+    }
+
+    @Override
+    public void recoverFromState(List<State> states) {

Review Comment:
   `restore`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #605: [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #605:
URL: https://github.com/apache/flink-table-store/pull/605#discussion_r1136656614


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java:
##########
@@ -232,50 +210,131 @@ public void close() throws Exception {
         }
     }
 
+    @Override
+    public List<State> extractStateAndClose() throws Exception {
+        List<State> result = new ArrayList<>();
+        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitionEntry :
+                writers.entrySet()) {
+            BinaryRow partition = partitionEntry.getKey();
+            for (Map.Entry<Integer, WriterContainer<T>> bucketEntry :
+                    partitionEntry.getValue().entrySet()) {
+                int bucket = bucketEntry.getKey();
+                WriterContainer<T> writerContainer = bucketEntry.getValue();
+                CommitIncrement increment = writerContainer.writer.extractStateAndClose();

Review Comment:
   We don't need to introduce `extractStateAndClose` to writer, we can just:
   ```
   1. CommitIncrement increment = writer. prepareCommit...;
   2. List<DataFileMeta> dataFiles = writer.dataFiles();
   3. writer.close();
   ```
   We should get data files before close.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org