You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "JingsongLi (via GitHub)" <gi...@apache.org> on 2023/03/01 05:31:51 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #550: [FLINK-31178] Public Writer API

JingsongLi commented on code in PR #550:
URL: https://github.com/apache/flink-table-store/pull/550#discussion_r1121169759


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java:
##########
@@ -18,121 +18,76 @@
 
 package org.apache.flink.table.store.table.sink;
 
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.operation.FileStoreCommit;
-import org.apache.flink.table.store.file.operation.FileStoreExpire;
-import org.apache.flink.table.store.file.operation.Lock;
-import org.apache.flink.table.store.file.operation.PartitionExpire;
-
-import javax.annotation.Nullable;
+import org.apache.flink.table.store.annotation.Experimental;
+import org.apache.flink.table.store.table.Table;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 
 /**
- * An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide
- * snapshot commit and expiration.
+ * Create and commit snapshots of a {@link Table}. Snapshots are produced from {@link
+ * CommitMessage}s, which themselves are generated by {@link TableWrite}. Also see {@link
+ * TableWrite#prepareCommit}.
+ *
+ * <ol>
+ *   <li>Before calling {@link TableCommit#commit}, if user cannot determine if this commit is done
+ *       before, user should first call {@link TableCommit#filterCommitted}.
+ *   <li>Commit may fail and throw an exception. Before committing, it will first check for
+ *       conflicts by checking if all files to be removed currently exists.
+ * </ol>
+ *
+ * @since 0.4.0
  */
-public class TableCommit implements AutoCloseable {
-
-    private final FileStoreCommit commit;
-    @Nullable private final FileStoreExpire expire;
-    @Nullable private final PartitionExpire partitionExpire;
-
-    @Nullable private List<Map<String, String>> overwritePartitions = null;
-    @Nullable private Lock lock;
-
-    public TableCommit(
-            FileStoreCommit commit,
-            @Nullable FileStoreExpire expire,
-            @Nullable PartitionExpire partitionExpire) {
-        this.commit = commit;
-        this.expire = expire;
-        this.partitionExpire = partitionExpire;
-    }
-
-    public TableCommit withOverwritePartition(@Nullable Map<String, String> overwritePartition) {
-        if (overwritePartition != null) {
-            this.overwritePartitions = Collections.singletonList(overwritePartition);
-        }
-        return this;
-    }
-
-    public TableCommit withOverwritePartitions(
-            @Nullable List<Map<String, String>> overwritePartitions) {
-        this.overwritePartitions = overwritePartitions;
-        return this;
-    }
-
-    public TableCommit withLock(Lock lock) {
-        commit.withLock(lock);
-
-        if (expire != null) {
-            expire.withLock(lock);
-        }
-
-        if (partitionExpire != null) {
-            partitionExpire.withLock(lock);
-        }
-
-        this.lock = lock;
-        return this;
-    }
-
-    public TableCommit withCreateEmptyCommit(boolean createEmptyCommit) {
-        commit.withCreateEmptyCommit(createEmptyCommit);
-        return this;
+@Experimental
+public interface TableCommit extends AutoCloseable {
+
+    /**
+     * Default ignore empty commit, if this is set to false, when there is no new data, an empty
+     * commit will also be created.
+     *
+     * <p>NOTE: It is recommended to set 'ignoreEmptyCommit' to false in streaming write, in order
+     * to better remove duplicate commits (See {@link #filterCommitted}).
+     */
+    TableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
+
+    /** Filter committed commit. This method is used for failover cases. */

Review Comment:
   I will remove this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org