You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/17 08:08:35 UTC

[GitHub] [flink-table-store] JingsongLi opened a new pull request, #163: [FLINK-28066] Use FileSystem.createRecoverableWriter in FileStoreCommit

JingsongLi opened a new pull request, #163:
URL: https://github.com/apache/flink-table-store/pull/163

   In FileStoreCommitImpl, currently, it uses `rename` to support atomic commit.
   But this is not work for object store like S3. We can use RecoverableWriter to support atomic commit for object store.
   We can introduce `AtomicFileWriter`:
   - Use rename if createRecoverableWriter is not supported or the filesystem is FILE_SYSTEM
   - Use RecoverableWriter if FileSystem supports createRecoverableWriter
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #163: [FLINK-28066] Use FileSystem.createRecoverableWriter in FileStoreCommit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #163:
URL: https://github.com/apache/flink-table-store/pull/163#discussion_r901465342


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecoverableAtomicFileWriter.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+
+/** A {@link AtomicFileWriter} to wrap {@link RecoverableWriter}. */
+public class RecoverableAtomicFileWriter implements AtomicFileWriter {
+
+    private final RecoverableWriter recoverableWriter;
+
+    public RecoverableAtomicFileWriter(RecoverableWriter recoverableWriter) {
+        this.recoverableWriter = recoverableWriter;
+    }
+
+    @Override
+    public AtomicFsDataOutputStream open(Path path) throws IOException {
+        RecoverableFsDataOutputStream out = recoverableWriter.open(path);
+        return new RecoverableAtomicFsDataOutputStream(out);
+    }
+
+    private static class RecoverableAtomicFsDataOutputStream extends AtomicFsDataOutputStream {
+
+        private final RecoverableFsDataOutputStream out;
+
+        public RecoverableAtomicFsDataOutputStream(RecoverableFsDataOutputStream out) {
+            this.out = out;
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            this.out.write(b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            this.out.write(b, off, len);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            this.out.flush();
+        }
+
+        @Override
+        public void sync() throws IOException {
+            this.out.sync();
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return this.out.getPos();
+        }
+
+        @Override
+        public boolean closeAndCommit() throws IOException {
+            RecoverableFsDataOutputStream.Committer committer = this.out.closeForCommit();
+            committer.commit();
+
+            /*
+             * When using RecoverableWriter, we can only return true, and there are
+             * two cases where commit actually fails.
+             * 1. Commit has an exception, here it will not return true, throwing an
+             *    exception for the outer caller to handle.
+             * 2. Commit fails but there is no exception. This case requires the outer
+             *    caller to have a lock to ensure that there is no concurrent writing of
+             *    the current file.

Review Comment:
   This case happens for object stores. Why not read the file after commit and check if it is the actual file we've written?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -400,16 +396,24 @@ private boolean tryCommitOnce(
 
         boolean success;
         try {
-            FileSystem fs = tmpSnapshotPath.getFileSystem();
-            // atomic rename
-            // TODO rename is not work for object store, use recoverable writer
+            FileSystem fs = newSnapshotPath.getFileSystem();
             Callable<Boolean> callable =
                     () -> {
-                        boolean committed = fs.rename(tmpSnapshotPath, newSnapshotPath);
-                        if (committed) {
+                        if (fs.exists(newSnapshotPath)) {
+                            return false;
+                        }
+
+                        AtomicFsDataOutputStream out =
+                                AtomicFileWriter.create(fs).open(newSnapshotPath);
+                        try {
+                            FileUtils.writeFileUtf8(out, newSnapshot.toJson());
+                            boolean committed = out.closeAndCommit();
                             snapshotManager.commitLatestHint(newSnapshotId);

Review Comment:
   ```java
   if (committed) {
       snapshotManager.commitLatestHint(newSnapshotId);
   }
   ```



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -461,13 +465,8 @@ private boolean tryCommitOnce(
                         "Atomic rename failed for snapshot #%d (path %s) by user %s "

Review Comment:
   Change exception message, `rename` -> `commit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #163: [FLINK-28066] Use FileSystem.createRecoverableWriter in FileStoreCommit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #163:
URL: https://github.com/apache/flink-table-store/pull/163#discussion_r902204598


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/AtomicFileWriter.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * The AtomicFileWriter creates {@link AtomicFsDataOutputStream}.The streams do not make the files
+ * they write to immediately visible, but instead write to temp files or other temporary storage. To
+ * publish the data atomically in the end, the stream offers the {@link
+ * RenamingAtomicFsDataOutputStream#closeAndCommit()} method to publish the result.
+ */
+public interface AtomicFileWriter {
+
+    /** Opens a new atomic stream to write to the given path. */
+    AtomicFsDataOutputStream open(Path path) throws IOException;
+
+    /**
+     * Write an utf8 string to file. In addition to that, it checks if the committed file is equal
+     * to the input {@code content}, if not, the committing has failed.
+     *
+     * <p>But this does not solve overwritten committing.
+     *
+     * @return True if the committing was successful, False otherwise
+     */
+    default boolean writeFileSafety(Path path, String content) throws IOException {
+        AtomicFsDataOutputStream out = open(path);
+        try {
+            FileUtils.writeOutputStreamUtf8(out, content);
+            boolean success = out.closeAndCommit();
+            if (success) {
+                return content.equals(FileUtils.readFileUtf8(path));
+            } else {
+                return false;
+            }

Review Comment:
   Move this logic to its user, or change the interface name? `AtomicFileWriter` should only guarantee atomic write. It should return true even if the file is overwritten afterwards. We can only perform this check if this new file is a read only file.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/AtomicFileWriter.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * The AtomicFileWriter creates {@link AtomicFsDataOutputStream}.The streams do not make the files
+ * they write to immediately visible, but instead write to temp files or other temporary storage. To
+ * publish the data atomically in the end, the stream offers the {@link
+ * RenamingAtomicFsDataOutputStream#closeAndCommit()} method to publish the result.
+ */
+public interface AtomicFileWriter {
+
+    /** Opens a new atomic stream to write to the given path. */
+    AtomicFsDataOutputStream open(Path path) throws IOException;
+
+    /**
+     * Write an utf8 string to file. In addition to that, it checks if the committed file is equal
+     * to the input {@code content}, if not, the committing has failed.
+     *
+     * <p>But this does not solve overwritten committing.
+     *
+     * @return True if the committing was successful, False otherwise
+     */
+    default boolean writeFileSafety(Path path, String content) throws IOException {
+        AtomicFsDataOutputStream out = open(path);
+        try {
+            FileUtils.writeOutputStreamUtf8(out, content);
+            boolean success = out.closeAndCommit();
+            if (success) {
+                return content.equals(FileUtils.readFileUtf8(path));
+            } else {
+                return false;
+            }

Review Comment:
   Move this logic to its user, or change the interface name? `AtomicFileWriter` should only guarantee atomic write. It should return true even if the file is overwritten afterwards. We can only perform this check if this new file is a read-only file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #163: [FLINK-28066] Use FileSystem.createRecoverableWriter in FileStoreCommit

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #163:
URL: https://github.com/apache/flink-table-store/pull/163#discussion_r902204598


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/AtomicFileWriter.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * The AtomicFileWriter creates {@link AtomicFsDataOutputStream}.The streams do not make the files
+ * they write to immediately visible, but instead write to temp files or other temporary storage. To
+ * publish the data atomically in the end, the stream offers the {@link
+ * RenamingAtomicFsDataOutputStream#closeAndCommit()} method to publish the result.
+ */
+public interface AtomicFileWriter {
+
+    /** Opens a new atomic stream to write to the given path. */
+    AtomicFsDataOutputStream open(Path path) throws IOException;
+
+    /**
+     * Write an utf8 string to file. In addition to that, it checks if the committed file is equal
+     * to the input {@code content}, if not, the committing has failed.
+     *
+     * <p>But this does not solve overwritten committing.
+     *
+     * @return True if the committing was successful, False otherwise
+     */
+    default boolean writeFileSafety(Path path, String content) throws IOException {
+        AtomicFsDataOutputStream out = open(path);
+        try {
+            FileUtils.writeOutputStreamUtf8(out, content);
+            boolean success = out.closeAndCommit();
+            if (success) {
+                return content.equals(FileUtils.readFileUtf8(path));
+            } else {
+                return false;
+            }

Review Comment:
   Move this logic to its user, or change the interface name? `AtomicFileWriter` should only guarantee atomic write. It should return true even if the file is overwritten afterwards.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/AtomicFsDataOutputStream.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A stream initially writes to hidden files or temp files and only creates the target file once it
+ * is closed and "committed".
+ */
+public abstract class AtomicFsDataOutputStream extends FSDataOutputStream {
+
+    /**
+     * Closes the stream, ensuring persistence of all data (similar to {@link #sync()}). And commits
+     * the file, publish (make visible) the file that the stream was writing to.
+     *
+     * @return True if the committing was successful, False otherwise

Review Comment:
   Returns true if the commit may be successful. Returns false if the commit definitely fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #163: [FLINK-28066] Use FileSystem.createRecoverableWriter in FileStoreCommit

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #163:
URL: https://github.com/apache/flink-table-store/pull/163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org