You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/20 02:49:36 UTC

[GitHub] [flink-table-store] tsreaper commented on a change in pull request #12: [FLINK-25689] Introduce atomic commit

tsreaper commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r788307495



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not done before.
+ *   <li>Before committing, it will first check for conflicts by checking if all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> committable.newFiles().isEmpty());

Review comment:
       This class supports pure compaction. When a `committable` is committed for the first time it should directly be passed into `commit` method, without going through `filterCommitted`. This piece of code will only filter out empty changes or compact only changes in `filterCommitted`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org