You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/05/10 05:22:20 UTC
[GitHub] flink pull request #5982: [FLINK-9325][checkpoint]generate the meta file for...
GitHub user sihuazhou opened a pull request:
https://github.com/apache/flink/pull/5982
[FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful
## What is the purpose of the change
*This PR in order to enhance the checkpoint's finalization a bit, write the metadata file first to a temp file and then atomically rename it (with an equivalent workaround for S3).*
## Brief change log
- *write the metadata file first to a temp file and then atomically rename it*
## Verifying this change
This change is already covered by existing tests.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
no
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sihuazhou/flink generateTheMetaFileAtomaticly
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5982.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5982
----
commit ed37d6d55f3b133aed70e403a7c8b155787462dc
Author: sihuazhou <su...@...>
Date: 2018-05-10T05:17:09Z
generate the meta file only when the writing is truly successful.
----
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
Hi @StephanEwen,
- I prefer to introduce the `create(Path, WriteMode)`, because I feel this is more extensible (it could allow us to introduce other WriteMode's in the future).
- I would choose to throw unsupported operations exception for the others schema, I think that make the program more deterministic.
What do you think?
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5982
Thanks for preparing this.
I looked at the `TwoPhraseFSDatautputStream` - maybe we can make this simpler. Do we need the distinction between phases? Is it not enough to behave as a regular stream, just overriding `close()` to do `super.close()` + `rename()`? That may be enough. When the stream is closed, all the writing methods anyways fail with a "stream closed exception".
Also, we need this method to be implemented in all FileSystem subclasses.
Typos: "Phrase" --> "Phase"
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
Hi @StephanEwen, I added more tests for the `FileSystem#createAtomically()`, concerning the `TwoPhaseFsDataoutputStream`, can we introduce a `commit_on_close` option for it to make it easier to use? If the `commit_on_close=true`, we commit the writing in `close()` atomically. If the `commit_on_close=false`, we need user to call `commit()` manually, this could be useful in try-with-resource situation, what do you think?
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
Hi @StephanEwen Thank you very much for your reply, I'm not sure whether just overriding `close()` to do `super.close()` + `rename()` is enough, for example.
```
try (outputStream = new TwoPhraseFSDatautputStream(...)) {
outputStream.write("part1");
throw new RuntimeException("xxx");
outputStream.write("part2");
}
```
This will also rename the `tmp file` to the `target file`, because we just `rename()` in `close()`. And the current `TwoPhraseFSDatautputStream` works as a wrapper so it should have supported all the file system. I'm not sure whether I misunderstand what your meaning... please let knows if I misunderstand something and your opinion, Thanks!
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
Hi @StephanEwen Could you please have a look at this?
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5982
I think this fix here might not work for S3, because a rename() with the S3 file systems will actually trigger a copy (or even a download and upload), so it is not a cheap operation.
The we can fix this by adding a `create(...)` method (or mode) to the FileSystem API that does not publish data in the file until `close()` is called. For hdfs://, file://, this would be using a temp file with renaming, for S3 we don't write to a temp file, because S3 makes the file only visible on close() anyways.
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5982
How about adding the method `createAtomically` or so, with otherwise the same signature as the `create(Path, WriteMode)` method?
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
@StephanEwen After thinking your comments again, I think I misunderstood the `Also, we need this method to be implemented in all FileSystem subclasses.` 😭 , I will address that. But the question related to `TwoPhaseFsDataoutputStream` still a bit confuse to me...
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
@StephanEwen I've addressed your comments, could you please have a look again?
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5982
Good point about the renaming on `close()` in case close is called for cleanup, rather than success.
We could follow the same semantics as in [CheckpointStateOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java#L61)
There the semantics are:
- `close()` means "close on error / cleanup" and closes the stream and deletes the temp file.
- `closeAndPublish()` would mean "close on success" and close the stream and rename the file.
- After ``closeAndPublish()` has been called, `close()` becomes a no-op.
The [FsCheckpointMetadataOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java) implements that pattern, I think it worked well and is easy to use.
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
@StephanEwen Thank you very much for your great suggestion! I will address this that way.
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
Could anybody have a look at this?
---
[GitHub] flink pull request #5982: [FLINK-9325][checkpoint]generate the meta file for...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5982#discussion_r192371034
--- Diff: flink-core/src/main/java/org/apache/flink/core/fs/TwoPhaseFSDataOutputStream.java ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Operates the output stream in two phrases, any exception during the operation of {@link TwoPhaseFSDataOutputStream} will
+ * lead the {@link #targetFile} to be invisible.
+ *
+ * <p>PHASE 1, write the data into the {@link #preparingFile}.
+ * PHASE 2, close the {@link #preparingFile} and rename it to the {@link #targetFile}.
+ */
+@Internal
+public class TwoPhaseFSDataOutputStream extends AtomicCreatingFsDataOutputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseFSDataOutputStream.class);
+
+ /**
+ * the target file system.
+ */
+ private final FileSystem fs;
+
+ /**
+ * the target file which the preparing file will be renamed to in the {@link #closeAndPublish()}.
+ */
+ private final Path targetFile;
+
+ /**
+ * the preparing file to store the on flying data.
+ */
+ private final Path preparingFile;
+
+ /**
+ * the output stream of the preparing file.
+ */
+ private final FSDataOutputStream preparedOutputStream;
+
+ private volatile boolean closed;
+
+ public TwoPhaseFSDataOutputStream(FileSystem fs, Path f, FileSystem.WriteMode writeMode) throws IOException {
+
+ Preconditions.checkArgument(FileSystem.WriteMode.OVERWRITE != writeMode, "WriteMode.OVERWRITE is unsupported yet.");
+
+ this.fs = fs;
+ this.targetFile = f;
+ this.preparingFile = generateTemporaryFilename(f);
+ this.closed = false;
+
+ if (writeMode == FileSystem.WriteMode.NO_OVERWRITE && fs.exists(targetFile)) {
+ throw new IOException("Target file " + targetFile + " is already exists.");
+ }
+
+ this.preparedOutputStream = fs.create(this.preparingFile, writeMode);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.preparedOutputStream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ this.preparedOutputStream.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ this.preparedOutputStream.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ this.preparedOutputStream.sync();
+ }
+
+ /**
+ * Does the cleanup things, close the stream and delete the {@link #preparingFile}.
+ */
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
--- End diff --
Same here.
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
@StephanEwen Thanks! Looking forward~
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5982
I think we need to have a special output stream type (`AtomicCreatingFsDataOutputStream` or similar) as the return type of `FileSystem.createAtomic()`. Otherwise, how can a user actually create a file? The `closeAndPublish()` method is not part of any API class.
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
Yes, @StephanEwen thanks for the continuous suggestions, will follow your suggestion.
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5982
@sihuazhou I got caught up in some other tasks - will try to get back to this here soon, I would like to have this feature in as a base for "search for completed checkpoint".
---
[GitHub] flink pull request #5982: [FLINK-9325][checkpoint]generate the meta file for...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5982#discussion_r192370481
--- Diff: flink-core/src/main/java/org/apache/flink/core/fs/ClosingAtomicCreatingFSDataOutputStream.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link AtomicCreatingFsDataOutputStream} that is used to
+ * implement a safety net against unclosed streams.
+ *
+ * <p>See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ */
+@Internal
+public class ClosingAtomicCreatingFSDataOutputStream
+ extends AtomicCreatingFsDataOutputStream
+ implements WrappingProxyCloseable<AtomicCreatingFsDataOutputStream> {
+
+ private final SafetyNetCloseableRegistry registry;
+ private final String debugString;
+ private AtomicCreatingFsDataOutputStream outputStream;
+
+ private volatile boolean closed;
+
+ private ClosingAtomicCreatingFSDataOutputStream(
+ AtomicCreatingFsDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException {
+ this.outputStream = delegate;
+ this.registry = Preconditions.checkNotNull(registry);
+ this.debugString = Preconditions.checkNotNull(debugString);
+ this.closed = false;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return outputStream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ outputStream.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ outputStream.sync();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
--- End diff --
Either `closed` is not required to be `volatile` or this is not enough to prevent a race condition from happening. You could either use an `AtomicBoolean::compareAndSet` or in this particular case rely on the serialization from the registry, i.e. `if (registry.unregisterCloseable(this)) {...}`
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5982
My gut feeling is that we don't need `WriteMode.OVERWRITE` in cases where one wants such an atomic file creation...
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
Hi @StephanEwen I have update the PR according to the above comments, it's ready for an another review.
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
@StephanEwen Thanks for your good suggestions! Will update PR like it, and what about the problem related to WriteMode.OVERWRITE, do you against if we don't support it in `createAtomically()`?
---
[GitHub] flink pull request #5982: [FLINK-9325][checkpoint]generate the meta file for...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5982#discussion_r192374992
--- Diff: flink-core/src/main/java/org/apache/flink/core/fs/ClosingAtomicCreatingFSDataOutputStream.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link AtomicCreatingFsDataOutputStream} that is used to
+ * implement a safety net against unclosed streams.
+ *
+ * <p>See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ */
+@Internal
+public class ClosingAtomicCreatingFSDataOutputStream
+ extends AtomicCreatingFsDataOutputStream
+ implements WrappingProxyCloseable<AtomicCreatingFsDataOutputStream> {
+
+ private final SafetyNetCloseableRegistry registry;
+ private final String debugString;
+ private AtomicCreatingFsDataOutputStream outputStream;
+
+ private volatile boolean closed;
+
+ private ClosingAtomicCreatingFSDataOutputStream(
+ AtomicCreatingFsDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException {
+ this.outputStream = delegate;
+ this.registry = Preconditions.checkNotNull(registry);
+ this.debugString = Preconditions.checkNotNull(debugString);
+ this.closed = false;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return outputStream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ outputStream.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ outputStream.sync();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
--- End diff --
You are right, nice catch! 👍
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
@StephanEwen I guess this PR is already for an another look now...
---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5982
Hi, I met a problem here, for the Hadoop FileSystem when the `schema='hdfs'`, we can't rename a file to a existing file. This means that it hard(I'm not sure whether it's possible) to support the `WriteModel.OVERWRITE` for `createAtomically()` with `schema='hdfs'`, if we work-around this by firstly rename the existing file to a tmp file and then rename the pre-commit file to the target file, then this two step operations is not atomic, which may lead to a inconsistent result if the JVM crashes between this operations. I'm running out of myself on this now...Do you have any suggestions for the `createAtomically()` when `schema='hdfs'` and `writeMode='WriteMode.OVERWRITE'`? @StephanEwen
---