You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/05/10 05:22:20 UTC

[GitHub] flink pull request #5982: [FLINK-9325][checkpoint]generate the meta file for...

GitHub user sihuazhou opened a pull request:

    https://github.com/apache/flink/pull/5982

    [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful

    ## What is the purpose of the change
    
    *This PR in order to enhance the checkpoint's finalization a bit, write the metadata file first to a temp file and then atomically rename it (with an equivalent workaround for S3).*
    
    
    ## Brief change log
    
      - *write the metadata file first to a temp file and then atomically rename it*
    
    ## Verifying this change
    
    This change is already covered by existing tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
    no

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sihuazhou/flink generateTheMetaFileAtomaticly

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5982.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5982
    
----
commit ed37d6d55f3b133aed70e403a7c8b155787462dc
Author: sihuazhou <su...@...>
Date:   2018-05-10T05:17:09Z

    generate the meta file only when the writing is truly successful.

----


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Hi @StephanEwen,
    - I prefer to introduce the `create(Path, WriteMode)`, because I feel this is more extensible (it could allow us to introduce other WriteMode's in the future).
    - I would choose to throw unsupported operations exception for the others schema, I think that make the program more deterministic.
    
    What do you think?


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Thanks for preparing this.
    
    I looked at the `TwoPhraseFSDatautputStream` - maybe we can make this simpler. Do we need the distinction between phases? Is it not enough to behave as a regular stream, just overriding `close()` to do `super.close()` + `rename()`? That may be enough. When the stream is closed, all the writing methods anyways fail with a "stream closed exception".
    
    Also, we need this method to be implemented in all FileSystem subclasses.
    
    Typos: "Phrase" --> "Phase"


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Hi @StephanEwen, I added more tests for the `FileSystem#createAtomically()`, concerning the `TwoPhaseFsDataoutputStream`, can we introduce a `commit_on_close` option for it to make it easier to use? If the `commit_on_close=true`, we commit the writing in `close()` atomically. If the `commit_on_close=false`, we need user to call `commit()` manually, this could be useful in try-with-resource situation, what do you think?


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Hi @StephanEwen Thank you very much for your reply, I'm not sure whether just overriding `close()` to do `super.close()` + `rename()` is enough, for example.
    ```
    try (outputStream = new TwoPhraseFSDatautputStream(...)) {
        outputStream.write("part1");
        throw new RuntimeException("xxx");
        outputStream.write("part2");
    }
    ```
    This will also rename the `tmp file` to the `target file`, because we just `rename()` in `close()`. And the current `TwoPhraseFSDatautputStream` works as a wrapper so it should have supported all the file system. I'm not sure whether I misunderstand what your meaning... please let knows if I misunderstand something and your opinion, Thanks!



---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Hi @StephanEwen Could you please have a look at this?


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    I think this fix here might not work for S3, because a rename() with the S3 file systems will actually trigger a copy (or even a download and upload), so it is not a cheap operation.
    
    The we can fix this by adding a `create(...)` method (or mode) to the FileSystem API that does not publish data in the file until `close()` is called. For hdfs://, file://, this would be using a temp file with renaming, for S3 we don't write to a temp file, because S3 makes the file only visible on close() anyways.


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    How about adding the method `createAtomically` or so, with otherwise the same signature as the `create(Path, WriteMode)` method?


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    @StephanEwen After thinking your comments again, I think I misunderstood the `Also, we need this method to be implemented in all FileSystem subclasses.` 😭  , I will address that. But the question related to `TwoPhaseFsDataoutputStream` still a bit confuse to me...


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    @StephanEwen I've addressed your comments, could you please have a look again?


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Good point about the renaming on `close()` in case close is called for cleanup, rather than success.
    
    We could follow the same semantics as in [CheckpointStateOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java#L61)
    
    There the semantics are:
      - `close()` means "close on error / cleanup" and closes the stream and deletes the temp file.
      - `closeAndPublish()` would mean "close on success" and close the stream and rename the file.
      - After ``closeAndPublish()` has been called, `close()` becomes a no-op.
    
    The [FsCheckpointMetadataOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java) implements that pattern, I think it worked well and is easy to use.
    



---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    @StephanEwen Thank you very much for your great suggestion! I will address this that way.


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Could anybody have a look at this?


---

[GitHub] flink pull request #5982: [FLINK-9325][checkpoint]generate the meta file for...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5982#discussion_r192371034
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/TwoPhaseFSDataOutputStream.java ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +
    +/**
    + * Operates the output stream in two phrases, any exception during the operation of {@link TwoPhaseFSDataOutputStream} will
    + * lead the {@link #targetFile} to be invisible.
    + *
    + * <p>PHASE 1, write the data into the {@link #preparingFile}.
    + * PHASE 2, close the {@link #preparingFile} and rename it to the {@link #targetFile}.
    + */
    +@Internal
    +public class TwoPhaseFSDataOutputStream extends AtomicCreatingFsDataOutputStream {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseFSDataOutputStream.class);
    +
    +	/**
    +	 * the target file system.
    +	 */
    +	private final FileSystem fs;
    +
    +	/**
    +	 * the target file which the preparing file will be renamed to in the {@link #closeAndPublish()}.
    +	 */
    +	private final Path targetFile;
    +
    +	/**
    +	 * the preparing file to store the on flying data.
    +	 */
    +	private final Path preparingFile;
    +
    +	/**
    +	 * the output stream of the preparing file.
    +	 */
    +	private final FSDataOutputStream preparedOutputStream;
    +
    +	private volatile boolean closed;
    +
    +	public TwoPhaseFSDataOutputStream(FileSystem fs, Path f, FileSystem.WriteMode writeMode) throws IOException {
    +
    +		Preconditions.checkArgument(FileSystem.WriteMode.OVERWRITE != writeMode, "WriteMode.OVERWRITE is unsupported yet.");
    +
    +		this.fs = fs;
    +		this.targetFile = f;
    +		this.preparingFile = generateTemporaryFilename(f);
    +		this.closed = false;
    +
    +		if (writeMode == FileSystem.WriteMode.NO_OVERWRITE && fs.exists(targetFile)) {
    +			throw new IOException("Target file " + targetFile + " is already exists.");
    +		}
    +
    +		this.preparedOutputStream = fs.create(this.preparingFile, writeMode);
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return this.preparedOutputStream.getPos();
    +	}
    +
    +	@Override
    +	public void write(int b) throws IOException {
    +		this.preparedOutputStream.write(b);
    +	}
    +
    +	@Override
    +	public void flush() throws IOException {
    +		this.preparedOutputStream.flush();
    +	}
    +
    +	@Override
    +	public void sync() throws IOException {
    +		this.preparedOutputStream.sync();
    +	}
    +
    +	/**
    +	 * Does the cleanup things, close the stream and delete the {@link #preparingFile}.
    +	 */
    +	@Override
    +	public void close() throws IOException {
    +		if (!closed) {
    --- End diff --
    
    Same here.


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    @StephanEwen Thanks! Looking forward~


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    I think we need to have a special output stream type (`AtomicCreatingFsDataOutputStream` or similar) as the return type of `FileSystem.createAtomic()`. Otherwise, how can a user actually create a file? The `closeAndPublish()` method is not part of any API class.


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Yes, @StephanEwen thanks for the continuous suggestions, will follow your suggestion.


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    @sihuazhou I got caught up in some other tasks - will try to get back to this here soon, I would like to have this feature in as a base for "search for completed checkpoint".


---

[GitHub] flink pull request #5982: [FLINK-9325][checkpoint]generate the meta file for...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5982#discussion_r192370481
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/ClosingAtomicCreatingFSDataOutputStream.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +
    +/**
    + * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link AtomicCreatingFsDataOutputStream} that is used to
    + * implement a safety net against unclosed streams.
    + *
    + * <p>See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
    + */
    +@Internal
    +public class ClosingAtomicCreatingFSDataOutputStream
    +		extends AtomicCreatingFsDataOutputStream
    +		implements WrappingProxyCloseable<AtomicCreatingFsDataOutputStream> {
    +
    +	private final SafetyNetCloseableRegistry registry;
    +	private final String debugString;
    +	private AtomicCreatingFsDataOutputStream outputStream;
    +
    +	private volatile boolean closed;
    +
    +	private ClosingAtomicCreatingFSDataOutputStream(
    +			AtomicCreatingFsDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException {
    +		this.outputStream = delegate;
    +		this.registry = Preconditions.checkNotNull(registry);
    +		this.debugString = Preconditions.checkNotNull(debugString);
    +		this.closed = false;
    +	}
    +
    +	public boolean isClosed() {
    +		return closed;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return outputStream.getPos();
    +	}
    +
    +	@Override
    +	public void write(int b) throws IOException {
    +		outputStream.write(b);
    +	}
    +
    +	@Override
    +	public void flush() throws IOException {
    +		outputStream.flush();
    +	}
    +
    +	@Override
    +	public void sync() throws IOException {
    +		outputStream.sync();
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		if (!closed) {
    --- End diff --
    
    Either `closed` is not required to be `volatile` or this is not enough to prevent a race condition from happening. You could either use an `AtomicBoolean::compareAndSet` or in this particular case rely on the serialization from the registry, i.e. `if (registry.unregisterCloseable(this)) {...}` 


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    My gut feeling is that we don't need `WriteMode.OVERWRITE` in cases where one wants such an atomic file creation...


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Hi @StephanEwen I have update the PR according to the above comments, it's ready for an another review.


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    @StephanEwen Thanks for your good suggestions! Will update PR like it, and what about the problem related to WriteMode.OVERWRITE, do you against if we don't support it in `createAtomically()`?


---

[GitHub] flink pull request #5982: [FLINK-9325][checkpoint]generate the meta file for...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5982#discussion_r192374992
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/ClosingAtomicCreatingFSDataOutputStream.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +
    +/**
    + * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link AtomicCreatingFsDataOutputStream} that is used to
    + * implement a safety net against unclosed streams.
    + *
    + * <p>See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
    + */
    +@Internal
    +public class ClosingAtomicCreatingFSDataOutputStream
    +		extends AtomicCreatingFsDataOutputStream
    +		implements WrappingProxyCloseable<AtomicCreatingFsDataOutputStream> {
    +
    +	private final SafetyNetCloseableRegistry registry;
    +	private final String debugString;
    +	private AtomicCreatingFsDataOutputStream outputStream;
    +
    +	private volatile boolean closed;
    +
    +	private ClosingAtomicCreatingFSDataOutputStream(
    +			AtomicCreatingFsDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException {
    +		this.outputStream = delegate;
    +		this.registry = Preconditions.checkNotNull(registry);
    +		this.debugString = Preconditions.checkNotNull(debugString);
    +		this.closed = false;
    +	}
    +
    +	public boolean isClosed() {
    +		return closed;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return outputStream.getPos();
    +	}
    +
    +	@Override
    +	public void write(int b) throws IOException {
    +		outputStream.write(b);
    +	}
    +
    +	@Override
    +	public void flush() throws IOException {
    +		outputStream.flush();
    +	}
    +
    +	@Override
    +	public void sync() throws IOException {
    +		outputStream.sync();
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		if (!closed) {
    --- End diff --
    
    You are right, nice catch! 👍


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    @StephanEwen I guess this PR is already for an another look now...


---

[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5982
  
    Hi, I met a problem here, for the Hadoop FileSystem when the `schema='hdfs'`, we can't rename a file to a existing file. This means that it hard(I'm not sure whether it's possible) to support the `WriteModel.OVERWRITE` for `createAtomically()` with `schema='hdfs'`, if we work-around this by firstly rename the existing file to a tmp file and then rename the pre-commit file to the target file, then this two step operations is not atomic, which may lead to a inconsistent result if the JVM crashes between this operations. I'm running out of myself on this now...Do you have any suggestions for the `createAtomically()` when `schema='hdfs'` and `writeMode='WriteMode.OVERWRITE'`? @StephanEwen 


---