You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/18 09:23:53 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request #12: [FLINK-25689] Introduce atomic commit

tsreaper opened a new pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12


   This PR introduces an atomic commit transaction for file store.
   
   1. Before calling `FileStoreCommitImpl#commit`, user should first call `FileStoreCommitImpl#filterCommitted` to make sure this commit is not done before.
   2. Before committing, it will first check for conflicts by checking if all files to be removed currently exists.
   3. After that it use the external `FileStoreCommitImpl#lock` (if provided) or the atomic rename of the file system to ensure atomicity.
   4. If commit fails due to conflicts or exception it tries its best to clean up and aborts.
   5. If atomic rename fails it tries again after reading the latest snapshot from step 2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788532993



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
##########
@@ -38,6 +38,7 @@
     private final int bucket;
     private final int totalBuckets;
     private final SstFileMeta file;
+    private final Identifier identifier;

Review comment:
       `Identifier` is used in multiple classes so we can't.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788374933



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMITTER)
+    private final String committer;
+
+    // for deduplication
+    @JsonProperty(FIELD_HASH)
+    private final String hash;
+
+    @JsonProperty(FIELD_TYPE)
+    private final Type type;

Review comment:
       I think we need prefix, what is type of snapshot? 
   Another thing is we don't use type because of sql data type...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788377304



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());

Review comment:
       Can we also support compact only changes in filterCommitted too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788375580



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)

Review comment:
       What is `@JsonGetter` used? field name is same to class field name




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788575696



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)

Review comment:
       If you want jackson to change a class to a json string without any annotation, you'll need to provide both getter and setter methods. However `Snapshot` is immutable so I only want to provide getters. That's why we need annotations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788380094



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
##########
@@ -38,6 +38,7 @@
     private final int bucket;
     private final int totalBuckets;
     private final SstFileMeta file;
+    private final Identifier identifier;

Review comment:
       I mean you can extract this class as method local variable. No need to be a class member here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788308005



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -96,6 +108,31 @@ public String getPartitionString(BinaryRowData partition) {
                                 partition, "Partition row data is null. This is unexpected.")));
     }
 
+    @Nullable
+    public Long latestSnapshotId() {
+        try {
+            Path snapshotDir = new Path(root + "/snapshot");

Review comment:
       This is only a best-effort file. It is possible that the snapshot id in `CURRENT` file is not the latest. But for a short cut for most scenarios this file is useful.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -96,6 +108,31 @@ public String getPartitionString(BinaryRowData partition) {
                                 partition, "Partition row data is null. This is unexpected.")));
     }
 
+    @Nullable
+    public Long latestSnapshotId() {
+        try {
+            Path snapshotDir = new Path(root + "/snapshot");

Review comment:
       This is only a best-effort file. It is possible that the snapshot id in `CURRENT` file is not the latest. But as a short cut for most scenarios this file is useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788315085



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
##########
@@ -118,4 +121,94 @@ public String toString() {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(

Review comment:
       There are two calls to this method. Inlining will bring duplicated code fragments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788309671



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
##########
@@ -118,4 +121,94 @@ public String toString() {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(
+            List<ManifestFileMeta> metas,
+            ManifestFile manifestFile,
+            List<ManifestFileMeta> result,
+            List<ManifestFileMeta> newMetas) {
+        if (metas.size() > 1) {
+            ManifestFileMeta newMeta = merge(metas, manifestFile);
+            result.add(newMeta);
+            newMetas.add(newMeta);
+        } else {
+            result.addAll(metas);
+        }
+    }
+
+    private static ManifestFileMeta merge(List<ManifestFileMeta> metas, ManifestFile manifestFile) {
+        Preconditions.checkArgument(
+                metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a bug.");
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();

Review comment:
       Nice catch. So `Identifier` should also include level information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r787408231



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Options for {@link FileStore}. */
+public class FileStoreOptions {
+
+    public static final ConfigOption<Integer> BUCKET =
+            ConfigOptions.key("bucket")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Bucket number for file store and partition number for Kafka.");

Review comment:
       remove `and partition number for Kafka`, log system is abstraction.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
##########
@@ -118,4 +121,94 @@ public String toString() {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(

Review comment:
       Inline this method? So simple and too many arguements

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            .digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        entryWithPartition.getKey(),
+                                                        entryWithBucket.getKey(),
+                                                        fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =

Review comment:
       Add a `toTmpSnapshotPath` to pathFactory?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMITTER)
+    private final String committer;
+
+    // for deduplication
+    @JsonProperty(FIELD_HASH)
+    private final String hash;
+
+    @JsonProperty(FIELD_TYPE)
+    private final Type type;

Review comment:
       `CommitKind`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            .digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        entryWithPartition.getKey(),
+                                                        entryWithBucket.getKey(),
+                                                        fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =
+                    new Path(
+                            newSnapshotPath.getParent()
+                                    + "/."
+                                    + newSnapshotPath.getName()
+                                    + UUID.randomUUID());
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                detectConflicts(latestSnapshotId, changes);

Review comment:
       Add comments here, fail if conflict

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            .digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        entryWithPartition.getKey(),
+                                                        entryWithBucket.getKey(),
+                                                        fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =
+                    new Path(
+                            newSnapshotPath.getParent()
+                                    + "/."
+                                    + newSnapshotPath.getName()
+                                    + UUID.randomUUID());
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                detectConflicts(latestSnapshotId, changes);
+                latestSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+            }
+
+            Snapshot newSnapshot;
+            String manifestListName = null;
+            List<ManifestFileMeta> oldMetas = new ArrayList<>();
+            List<ManifestFileMeta> newMetas = new ArrayList<>();
+            try {
+                if (latestSnapshot != null) {
+                    // read all previous manifest files
+                    oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+                    // merge manifest files
+                    newMetas.addAll(
+                            ManifestFileMeta.merge(
+                                    oldMetas,
+                                    manifestFile,
+                                    fileStoreOptions.manifestSuggestedSize.getBytes()));
+                }
+                // write all changes to manifest file
+                newMetas.add(manifestFile.write(changes));
+                // prepare snapshot file
+                manifestListName = manifestList.write(newMetas);
+                newSnapshot = new Snapshot(newSnapshotId, manifestListName, committer, hash, type);
+                FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+            } catch (Throwable e) {
+                // fails when preparing for commit, we should clean up
+                cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when preparing snapshot #%d (path %s) by committer %s "
+                                        + "with hash %s and type %s. Clean up.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            boolean success;
+            try {
+                FileSystem fs = tmpSnapshotPath.getFileSystem();
+                // atomic rename
+                if (lock != null) {
+                    success =
+                            lock.runWithLock(
+                                    () ->
+                                            // fs.rename may not returns false if target file
+                                            // already exists, or even not atomic
+                                            // as we're relying on external locking, we can first
+                                            // check if file exist then rename to work around this
+                                            // case
+                                            !fs.exists(newSnapshotPath)
+                                                    && fs.rename(tmpSnapshotPath, newSnapshotPath));
+                } else {
+                    success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+                }
+            } catch (Throwable e) {
+                // exception when performing the atomic rename,
+                // we cannot clean up because we can't determine the success
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when committing snapshot #%d (path %s) by committer %s "
+                                        + "with hash %s and type %s. "
+                                        + "Cannot clean up because we can't determine the success.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            if (success) {
+                return;
+            }
+
+            // atomic rename fails, clean up and try again
+            LOG.warn(
+                    String.format(
+                            "Atomic rename failed for snapshot #%d (path %s) by committer %s "
+                                    + "with hash %s and type %s. "
+                                    + "Clean up and try again.",
+                            newSnapshotId,
+                            newSnapshotPath.toString(),
+                            committer,
+                            hash,
+                            type.name()));
+            cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+        }
+    }
+
+    private void detectConflicts(long snapshotId, List<ManifestEntry> changes) {

Review comment:
       validateConflicts?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Options for {@link FileStore}. */
+public class FileStoreOptions {
+
+    public static final ConfigOption<Integer> BUCKET =
+            ConfigOptions.key("bucket")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Bucket number for file store and partition number for Kafka.");
+
+    public static final ConfigOption<MemorySize> MANIFEST_SUGGESTED_SIZE =
+            ConfigOptions.key("manifest.suggested-size")

Review comment:
       manifest.target-file-size

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)

Review comment:
       Why we need to user `JsonProperty`? Anyway we need to serialize all fields.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMITTER)
+    private final String committer;

Review comment:
       `commitUser`? committer is something to help commit.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -96,6 +108,31 @@ public String getPartitionString(BinaryRowData partition) {
                                 partition, "Partition row data is null. This is unexpected.")));
     }
 
+    @Nullable
+    public Long latestSnapshotId() {

Review comment:
       `currentSnapshotId`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
##########
@@ -118,4 +121,94 @@ public String toString() {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(
+            List<ManifestFileMeta> metas,
+            ManifestFile manifestFile,
+            List<ManifestFileMeta> result,
+            List<ManifestFileMeta> newMetas) {
+        if (metas.size() > 1) {
+            ManifestFileMeta newMeta = merge(metas, manifestFile);
+            result.add(newMeta);
+            newMetas.add(newMeta);
+        } else {
+            result.addAll(metas);
+        }
+    }
+
+    private static ManifestFileMeta merge(List<ManifestFileMeta> metas, ManifestFile manifestFile) {
+        Preconditions.checkArgument(
+                metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a bug.");
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
+        for (ManifestFileMeta manifest : metas) {
+            for (ManifestEntry entry : manifestFile.read(manifest.fileName)) {
+                switch (entry.kind()) {
+                    case ADD:
+                        Preconditions.checkState(
+                                !map.containsKey(entry.identifier()),
+                                "Trying to add file %s which is already added. Manifest might be corrupted.",
+                                entry.identifier());
+                        map.put(entry.identifier(), entry);

Review comment:
       extract `entry.identifier()` to a field

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
##########
@@ -118,4 +121,94 @@ public String toString() {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(
+            List<ManifestFileMeta> metas,
+            ManifestFile manifestFile,
+            List<ManifestFileMeta> result,
+            List<ManifestFileMeta> newMetas) {
+        if (metas.size() > 1) {
+            ManifestFileMeta newMeta = merge(metas, manifestFile);
+            result.add(newMeta);
+            newMetas.add(newMeta);
+        } else {
+            result.addAll(metas);
+        }
+    }
+
+    private static ManifestFileMeta merge(List<ManifestFileMeta> metas, ManifestFile manifestFile) {
+        Preconditions.checkArgument(
+                metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a bug.");
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();

Review comment:
       Do you consider upgrade case?
   A file upgrade to level 2 from level 1?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Default implementation of {@link FileStoreScan}. */
+public class FileStoreScanImpl implements FileStoreScan {
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+
+    private Long snapshotId;
+    private List<ManifestFileMeta> manifests;
+
+    public FileStoreScanImpl(
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList) {
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+
+        this.snapshotId = null;
+        this.manifests = new ArrayList<>();
+    }
+
+    @Override
+    public FileStoreScan withPartitionFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withKeyFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withValueFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withBucket(int bucket) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withSnapshot(long snapshotId) {
+        this.snapshotId = snapshotId;
+        Snapshot snapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
+        this.manifests = manifestList.read(snapshot.manifestList());
+        return this;
+    }
+
+    @Override
+    public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
+        this.manifests = manifests;
+        return this;
+    }
+
+    @Override
+    public Plan plan() {
+        List<ManifestEntry> files = scan();
+
+        return new Plan() {
+            @Nullable
+            @Override
+            public Long snapshotId() {
+                return snapshotId;
+            }
+
+            @Override
+            public List<ManifestEntry> files() {
+                return files;
+            }
+        };
+    }
+
+    private List<ManifestEntry> scan() {
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
+        for (ManifestFileMeta manifest : manifests) {
+            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {

Review comment:
       Add a TODO here, Concurrent Reading

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMITTER)
+    private final String committer;
+
+    // for deduplication
+    @JsonProperty(FIELD_HASH)
+    private final String hash;
+
+    @JsonProperty(FIELD_TYPE)
+    private final Type type;
+
+    @JsonCreator
+    public Snapshot(
+            @JsonProperty(FIELD_ID) long id,
+            @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+            @JsonProperty(FIELD_COMMITTER) String committer,
+            @JsonProperty(FIELD_HASH) String hash,
+            @JsonProperty(FIELD_TYPE) Type type) {
+        this.id = id;
+        this.manifestList = manifestList;
+        this.committer = committer;
+        this.hash = hash;
+        this.type = type;
+    }
+
+    @JsonGetter(FIELD_ID)
+    public long id() {
+        return id;
+    }
+
+    @JsonGetter(FIELD_MANIFEST_LIST)
+    public String manifestList() {
+        return manifestList;
+    }
+
+    @JsonGetter(FIELD_COMMITTER)
+    public String committer() {
+        return committer;
+    }
+
+    @JsonGetter(FIELD_HASH)
+    public String hash() {
+        return hash;

Review comment:
       commitDigest?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());

Review comment:
       I am thinking maybe we need to support pure compaction too.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -96,6 +108,31 @@ public String getPartitionString(BinaryRowData partition) {
                                 partition, "Partition row data is null. This is unexpected.")));
     }
 
+    @Nullable
+    public Long latestSnapshotId() {
+        try {
+            Path snapshotDir = new Path(root + "/snapshot");

Review comment:
       Add a TODO here, we can write `CURRENT` file for better lookup in future.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            .digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        entryWithPartition.getKey(),
+                                                        entryWithBucket.getKey(),
+                                                        fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =
+                    new Path(
+                            newSnapshotPath.getParent()
+                                    + "/."
+                                    + newSnapshotPath.getName()
+                                    + UUID.randomUUID());
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                detectConflicts(latestSnapshotId, changes);
+                latestSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+            }
+
+            Snapshot newSnapshot;
+            String manifestListName = null;
+            List<ManifestFileMeta> oldMetas = new ArrayList<>();
+            List<ManifestFileMeta> newMetas = new ArrayList<>();
+            try {
+                if (latestSnapshot != null) {
+                    // read all previous manifest files
+                    oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+                    // merge manifest files
+                    newMetas.addAll(
+                            ManifestFileMeta.merge(
+                                    oldMetas,
+                                    manifestFile,
+                                    fileStoreOptions.manifestSuggestedSize.getBytes()));
+                }
+                // write all changes to manifest file
+                newMetas.add(manifestFile.write(changes));
+                // prepare snapshot file
+                manifestListName = manifestList.write(newMetas);
+                newSnapshot = new Snapshot(newSnapshotId, manifestListName, committer, hash, type);
+                FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+            } catch (Throwable e) {
+                // fails when preparing for commit, we should clean up
+                cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when preparing snapshot #%d (path %s) by committer %s "
+                                        + "with hash %s and type %s. Clean up.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            boolean success;
+            try {
+                FileSystem fs = tmpSnapshotPath.getFileSystem();
+                // atomic rename
+                if (lock != null) {
+                    success =
+                            lock.runWithLock(
+                                    () ->
+                                            // fs.rename may not returns false if target file
+                                            // already exists, or even not atomic
+                                            // as we're relying on external locking, we can first
+                                            // check if file exist then rename to work around this
+                                            // case
+                                            !fs.exists(newSnapshotPath)
+                                                    && fs.rename(tmpSnapshotPath, newSnapshotPath));
+                } else {
+                    success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+                }
+            } catch (Throwable e) {
+                // exception when performing the atomic rename,
+                // we cannot clean up because we can't determine the success
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when committing snapshot #%d (path %s) by committer %s "
+                                        + "with hash %s and type %s. "
+                                        + "Cannot clean up because we can't determine the success.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            if (success) {
+                return;
+            }
+
+            // atomic rename fails, clean up and try again
+            LOG.warn(
+                    String.format(
+                            "Atomic rename failed for snapshot #%d (path %s) by committer %s "
+                                    + "with hash %s and type %s. "
+                                    + "Clean up and try again.",
+                            newSnapshotId,
+                            newSnapshotPath.toString(),
+                            committer,
+                            hash,
+                            type.name()));
+            cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+        }
+    }
+
+    private void detectConflicts(long snapshotId, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(ValueKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        try {
+            for (ManifestEntry entry : scan.withSnapshot(snapshotId).plan().files()) {
+                removedFiles.remove(entry.identifier());
+            }
+        } catch (Throwable e) {
+            throw new RuntimeException("Cannot determine if conflicts exist.", e);
+        }
+
+        if (!removedFiles.isEmpty()) {
+            throw new RuntimeException(
+                    "Conflicts detected on:\n"
+                            + removedFiles.stream()
+                                    .map(
+                                            i ->
+                                                    pathFactory.getPartitionString(i.partition)
+                                                            + ", bucket "
+                                                            + i.bucket
+                                                            + ", file "
+                                                            + i.fileName)
+                                    .collect(Collectors.joining("\n")));
+        }
+    }
+
+    private void cleanUpManifests(

Review comment:
       cleanUpTempSnapshot?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            .digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        entryWithPartition.getKey(),
+                                                        entryWithBucket.getKey(),
+                                                        fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =
+                    new Path(
+                            newSnapshotPath.getParent()
+                                    + "/."
+                                    + newSnapshotPath.getName()
+                                    + UUID.randomUUID());
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                detectConflicts(latestSnapshotId, changes);
+                latestSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+            }
+
+            Snapshot newSnapshot;
+            String manifestListName = null;
+            List<ManifestFileMeta> oldMetas = new ArrayList<>();
+            List<ManifestFileMeta> newMetas = new ArrayList<>();
+            try {
+                if (latestSnapshot != null) {
+                    // read all previous manifest files
+                    oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+                    // merge manifest files
+                    newMetas.addAll(
+                            ManifestFileMeta.merge(
+                                    oldMetas,
+                                    manifestFile,
+                                    fileStoreOptions.manifestSuggestedSize.getBytes()));
+                }
+                // write all changes to manifest file
+                newMetas.add(manifestFile.write(changes));
+                // prepare snapshot file
+                manifestListName = manifestList.write(newMetas);
+                newSnapshot = new Snapshot(newSnapshotId, manifestListName, committer, hash, type);
+                FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+            } catch (Throwable e) {
+                // fails when preparing for commit, we should clean up
+                cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when preparing snapshot #%d (path %s) by committer %s "
+                                        + "with hash %s and type %s. Clean up.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            boolean success;
+            try {
+                FileSystem fs = tmpSnapshotPath.getFileSystem();
+                // atomic rename
+                if (lock != null) {
+                    success =
+                            lock.runWithLock(
+                                    () ->
+                                            // fs.rename may not returns false if target file
+                                            // already exists, or even not atomic
+                                            // as we're relying on external locking, we can first
+                                            // check if file exist then rename to work around this
+                                            // case
+                                            !fs.exists(newSnapshotPath)
+                                                    && fs.rename(tmpSnapshotPath, newSnapshotPath));
+                } else {
+                    success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+                }
+            } catch (Throwable e) {
+                // exception when performing the atomic rename,
+                // we cannot clean up because we can't determine the success
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when committing snapshot #%d (path %s) by committer %s "
+                                        + "with hash %s and type %s. "
+                                        + "Cannot clean up because we can't determine the success.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            if (success) {
+                return;
+            }
+
+            // atomic rename fails, clean up and try again
+            LOG.warn(
+                    String.format(
+                            "Atomic rename failed for snapshot #%d (path %s) by committer %s "
+                                    + "with hash %s and type %s. "
+                                    + "Clean up and try again.",
+                            newSnapshotId,
+                            newSnapshotPath.toString(),
+                            committer,
+                            hash,
+                            type.name()));
+            cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+        }
+    }
+
+    private void detectConflicts(long snapshotId, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(ValueKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        try {
+            for (ManifestEntry entry : scan.withSnapshot(snapshotId).plan().files()) {

Review comment:
       Add TODO here, partition filter is important.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788375174



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMIT_USER = "commitUser";
+    private static final String FIELD_DIGEST = "digest";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMIT_USER)
+    private final String commitUser;
+
+    // for deduplication
+    @JsonProperty(FIELD_DIGEST)
+    private final String digest;

Review comment:
       commitDigest too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788532993



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
##########
@@ -38,6 +38,7 @@
     private final int bucket;
     private final int totalBuckets;
     private final SstFileMeta file;
+    private final Identifier identifier;

Review comment:
       `Identifier` is used in multiple classes so we can't.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788308364



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -96,6 +108,31 @@ public String getPartitionString(BinaryRowData partition) {
                                 partition, "Partition row data is null. This is unexpected.")));
     }
 
+    @Nullable
+    public Long latestSnapshotId() {

Review comment:
       `latestSnapshotId` is better. This name indicates that each time this method is called it will look through all snapshots and find the latest one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788312849



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMITTER)
+    private final String committer;
+
+    // for deduplication
+    @JsonProperty(FIELD_HASH)
+    private final String hash;
+
+    @JsonProperty(FIELD_TYPE)
+    private final Type type;

Review comment:
       This is an inner enum class. No need to add a prefix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788313592



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)

Review comment:
       We still need `@JsonGetter`s and field names, so no harm to also add `@JsonProperty` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788307495



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());

Review comment:
       This class supports pure compaction. When a `committable` is committed for the first time it should directly be passed into `commit` method, without going through `filterCommitted`. This piece of code will only filter out empty changes or compact only changes in `filterCommitted`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788531241



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());

Review comment:
       We can't because we can't differentiate between these two cases:
   1. This change hasn't been committed.
   2. There is a conflict and this change is aborted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org