You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/09 12:58:29 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #150: [FLINK-27973] Introduce TableWrite and TableCommit as an abstraction layer above FileStore for writing RowData

JingsongLi commented on code in PR #150:
URL: https://github.com/apache/flink-table-store/pull/150#discussion_r893469308


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java:
##########
@@ -40,6 +45,10 @@ public interface FileStoreTable extends Serializable {
 
     TableRead newRead(boolean incremental);
 
+    TableWrite newWrite(boolean overwrite);
+
+    TableCommit newCommit(@Nullable Map<String, String> overwritePartition);

Review Comment:
   I prefer a builder style:
   - Normal commit: `table.newCommit().commit(...)`
   - Overwrite commit: `table.newCommit().withOverwritePartition(part).commit(...)`
   
   VS Current:
   - Normal commit: `table.newCommit(null).commit(...)`
   - Overwrite commit: `table.newCommit(overwritePartition).commit(...)`
   
   This is a test and user-oriented API. Assuming that 90% of the scenarios are not overwrite, it seems unfriendly that all these scenarios need to force a null parameter to be written.
   
   For other method interfaces, I think it's for similar reasons.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java:
##########
@@ -126,6 +158,35 @@ protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
         };
     }
 
+    @Override
+    public TableWrite newWrite(boolean overwrite) {
+        SinkRecordConverter recordConverter =
+                new SinkRecordConverter(store.options().bucket(), schema);
+        return new TableWrite(store.newWrite(), recordConverter, overwrite) {
+            @Override
+            protected void writeImpl(SinkRecord record, RecordWriter writer) throws Exception {
+                switch (record.row().getRowKind()) {
+                    case INSERT:
+                    case UPDATE_AFTER:
+                        writer.write(ValueKind.ADD, record.primaryKey(), record.row());
+                        break;
+                    case UPDATE_BEFORE:
+                    case DELETE:
+                        writer.write(ValueKind.DELETE, record.primaryKey(), record.row());
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unknown row kind " + record.row().getRowKind());
+                }
+            }
+        };
+    }
+
+    @Override
+    public TableCommit newCommit(@Nullable Map<String, String> overwritePartition) {
+        return new TableCommit(store.newCommit(), store.newExpire(), overwritePartition);

Review Comment:
   What is the different? Just provide in `AbstractFileStoreTable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org