You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/15 22:23:43 UTC

[1/6] flink git commit: [FLINK-9751] [filesystem] Add PersistentResumableWriter interface.

Repository: flink
Updated Branches:
  refs/heads/master 695bc56a9 -> 7a912e603


[FLINK-9751] [filesystem] Add PersistentResumableWriter interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2960949
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2960949
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2960949

Branch: refs/heads/master
Commit: e2960949bf84534978cb0be9f1a53cd783b43001
Parents: 66e0a27
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 13:53:51 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:18:56 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    |  18 ++
 .../core/fs/RecoverableFsDataOutputStream.java  |  90 ++++++
 .../apache/flink/core/fs/RecoverableWriter.java | 173 +++++++++++
 .../flink/core/fs/local/LocalFileSystem.java    |  22 +-
 .../flink/core/fs/local/LocalRecoverable.java   |  74 +++++
 .../LocalRecoverableFsDataOutputStream.java     | 190 ++++++++++++
 .../fs/local/LocalRecoverableSerializer.java    |  99 ++++++
 .../core/fs/local/LocalRecoverableWriter.java   | 124 ++++++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  11 +-
 .../runtime/fs/hdfs/HadoopFsRecoverable.java    |  73 +++++
 .../HadoopRecoverableFsDataOutputStream.java    | 303 +++++++++++++++++++
 .../fs/hdfs/HadoopRecoverableSerializer.java    | 100 ++++++
 .../fs/hdfs/HadoopRecoverableWriter.java        | 134 ++++++++
 .../apache/flink/runtime/util/HadoopUtils.java  |  20 ++
 14 files changed, 1425 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 27255f9..d451109 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -494,6 +494,24 @@ public abstract class FileSystem {
 	public abstract FSDataInputStream open(Path f) throws IOException;
 
 	/**
+	 * Creates a new {@link RecoverableWriter}. A recoverable writer creates streams that can
+	 * persist and recover their intermediate state.
+	 * Persisting and recovering intermediate state is a core building block for writing to
+	 * files that span multiple checkpoints.
+	 *
+	 * <p>The returned object can act as a shared factory to open and recover multiple streams.
+	 *
+	 * <p>This method is optional on file systems and various file system implementations may
+	 * not support this method, throwing an {@code UnsupportedOperationException}.
+	 *
+	 * @return A RecoverableWriter for this file system.
+	 * @throws IOException Thrown, if the recoverable writer cannot be instantiated.
+	 */
+	public RecoverableWriter createRecoverableWriter() throws IOException {
+		throw new UnsupportedOperationException("This file system does not support recoverable writers.");
+	}
+
+	/**
 	 * Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
 	 *
 	 * @return the number of bytes that large input files should be optimally be split into to minimize I/O time

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..8108670
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+
+import java.io.IOException;
+
+/**
+ * An output stream to a file system that can be recovered at well defined points.
+ * The stream initially writes to hidden files or temp files and only creates the
+ * target file once it is closed and "committed".
+ */
+public abstract class RecoverableFsDataOutputStream extends FSDataOutputStream {
+
+	/**
+	 * Ensures all data so far is persistent (similar to {@link #sync()}) and returns
+	 * a handle to recover the stream at the current position.
+	 */
+	public abstract ResumeRecoverable persist() throws IOException;
+
+	/**
+	 * Closes the stream, ensuring persistence of all data (similar to {@link #sync()}).
+	 * This returns a Committer that can be used to publish (make visible) the file
+	 * that the stream was writing to.
+	 */
+	public abstract Committer closeForCommit() throws IOException;
+
+	/**
+	 * Closes the stream releasing all local resources, but not finalizing the
+	 * file that the stream writes to.
+	 *
+	 * <p>This method should be understood as "close to dispose on failure".
+	 */
+	@Override
+	public abstract void close() throws IOException;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A committer can publish the file of a stream that was closed.
+	 * The Committer can be recovered via a {@link CommitRecoverable}.
+	 */
+	public interface Committer {
+
+		/**
+		 * Commits the file, making it visible. The file will contain the exact data
+		 * as when the committer was created.
+		 *
+		 * @throws IOException Thrown if committing fails.
+		 */
+		void commit() throws IOException;
+
+		/**
+		 * Commits the file, making it visible. The file will contain the exact data
+		 * as when the committer was created.
+		 *
+		 * <p>This method tolerates situations where the file was already committed and
+		 * will not raise an exception in that case. This is important for idempotent
+		 * commit retries as they need to happen after recovery.
+		 *
+		 * @throws IOException Thrown if committing fails.
+		 */
+		void commitAfterRecovery() throws IOException;
+
+		/**
+		 * Gets a recoverable object to recover the committer. The recovered committer
+		 * will commit the file with the exact same data as this committer would commit
+		 * it.
+		 */
+		CommitRecoverable getRecoverable();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
new file mode 100644
index 0000000..bbba793
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * The RecoverableWriter creates and recovers {@link RecoverableFsDataOutputStream}.
+ * It can be used to write data to a file system in a way that the writing can be
+ * resumed consistently after a failure and recovery without loss of data or possible
+ * duplication of bytes.
+ *
+ * <p>The streams do not make the files they write to immediately visible, but instead write
+ * to temp files or other temporary storage. To publish the data atomically in the
+ * end, the stream offers the {@link RecoverableFsDataOutputStream#closeForCommit()} method
+ * to create a committer that publishes the result.
+ *
+ * <p>These writers are useful in the context of checkpointing. The example below illustrates
+ * how to use them:
+ *
+ * <pre>{@code
+ * // --------- initial run --------
+ * RecoverableWriter writer = fileSystem.createRecoverableWriter();
+ * RecoverableFsDataOutputStream out = writer.open(path);
+ * out.write(...);
+ *
+ * // persist intermediate state
+ * ResumeRecoverable intermediateState = out.persist();
+ * storeInCheckpoint(intermediateState);
+ *
+ * // --------- recovery --------
+ * ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint
+ * RecoverableWriter writer = fileSystem.createRecoverableWriter();
+ * RecoverableFsDataOutputStream out = writer.recover(lastCheckpointState);
+ *
+ * out.write(...); // append more data
+ *
+ * out.closeForCommit().commit(); // close stream and publish all the data
+ *
+ * // --------- recovery without appending --------
+ * ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint
+ * RecoverableWriter writer = fileSystem.createRecoverableWriter();
+ * Committer committer = writer.recoverForCommit(lastCheckpointState);
+ * committer.commit(); // publish the state as of the last checkpoint
+ * }</pre>
+ *
+ * <h3>Recovery</h3>
+ *
+ * <p>Recovery relies on data persistence in the target file system or object store. While the
+ * code itself works with the specific primitives that the target storage offers, recovery will
+ * fail if the data written so far was deleted by an external factor.
+ * For example, some implementations stage data in temp files or object parts. If these
+ * were deleted by someone or by an automated cleanup policy, then resuming
+ * may fail. This is not surprising and should be expected, but we want to explicitly point
+ * this out here.
+ *
+ * <p>Specific care is needed for systems like S3, where the implementation uses Multipart Uploads
+ * to incrementally upload and persist parts of the result. Timeouts for Multipart Uploads
+ * and life time of Parts in unfinished Multipart Uploads need to be set in the bucket policy
+ * high enough to accommodate the recovery. These values are typically in the days, so regular
+ * recovery is typically not a problem. What can become an issue is situations where a Flink
+ * application is hard killed (all processes or containers removed) and then one tries to
+ * manually recover the application from an externalized checkpoint some days later. In that
+ * case, systems like S3 may have removed uncommitted parts and recovery will not succeed.
+ *
+ * <h3>Implementer's Note</h3>
+ *
+ * <p>From the perspective of the implementer, it would be desirable to make this class
+ * generic with respect to the concrete types of 'CommitRecoverable' and 'ResumeRecoverable'.
+ * However, we found that this makes the code more clumsy to use and we hence dropped the
+ * generics at the cost of doing some explicit casts in the implementation that would
+ * otherwise have been implicitly generated by the generics compiler.
+ */
+public interface RecoverableWriter {
+
+	/**
+	 * Opens a new recoverable stream to write to the given path.
+	 * Whether existing files will be overwritten is implementation specific and should
+	 * not be relied upon.
+	 *
+	 * @param path The path of the file/object to write to.
+	 * @return A new RecoverableFsDataOutputStream writing a new file/object.
+	 *
+	 * @throws IOException Thrown if the stream could not be opened/initialized.
+	 */
+	RecoverableFsDataOutputStream open(Path path) throws IOException;
+
+	/**
+	 * Resumes a recoverable stream consistently at the point indicated by the given ResumeRecoverable.
+	 * Future writes to the stream will continue / append the file as of that point.
+	 *
+	 * <p>This method is optional and whether it is supported is indicated through the
+	 * {@link #supportsResume()} method.
+	 *
+	 * @param resumable The opaque handle with the recovery information.
+	 * @return A recoverable stream writing to the file/object as it was at the point when the
+	 *         ResumeRecoverable was created.
+	 *
+	 * @throws IOException Thrown, if resuming fails.
+	 * @throws UnsupportedOperationException Thrown if this optional method is not supported.
+	 */
+	RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException;
+
+	/**
+	 * Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable
+	 * for finalizing and committing. This will publish the target file with exactly the data
+	 * that was written up to the point then the CommitRecoverable was created.
+	 *
+	 * @param resumable The opaque handle with the recovery information.
+	 * @return A committer that publishes the target file.
+	 *
+	 * @throws IOException Thrown, if recovery fails.
+	 */
+	RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) throws IOException;
+
+	/**
+	 * The serializer for the CommitRecoverable types created in this writer.
+	 * This serializer should be used to store the CommitRecoverable in checkpoint
+	 * state or other forms of persistent state.
+	 */
+	SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
+
+	/**
+	 * The serializer for the ResumeRecoverable types created in this writer.
+	 * This serializer should be used to store the ResumeRecoverable in checkpoint
+	 * state or other forms of persistent state.
+	 */
+	SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
+
+	/**
+	 * Checks whether the writer and its streams support resuming (appending to) files after
+	 * recovery (via the {@link #recover(ResumeRecoverable)} method).
+	 *
+	 * <p>If true, then this writer supports the {@link #recover(ResumeRecoverable)} method.
+	 * If false, then that method may not be supported and streams can only be recovered via
+	 * {@link #recoverForCommit(CommitRecoverable)}.
+	 */
+	boolean supportsResume();
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A handle to an in-progress stream with a defined and persistent amount of data.
+	 * The handle can be used to recover the stream as of exactly that point and
+	 * publish the result file.
+	 */
+	interface CommitRecoverable {}
+
+	/**
+	 * A handle to an in-progress stream with a defined and persistent amount of data.
+	 * The handle can be used to recover the stream exactly as of that point and either
+	 * publish the result file or keep appending data to the stream.
+	 */
+	interface ResumeRecoverable extends CommitRecoverable {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 9ebec6e..7b99e1b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -142,11 +142,9 @@ public class LocalFileSystem extends FileSystem {
 		return new LocalDataInputStream(file);
 	}
 
-	private File pathToFile(Path path) {
-		if (!path.isAbsolute()) {
-			path = new Path(getWorkingDirectory(), path);
-		}
-		return new File(path.toUri().getPath());
+	@Override
+	public LocalRecoverableWriter createRecoverableWriter() throws IOException {
+		return new LocalRecoverableWriter(this);
 	}
 
 	@Override
@@ -309,6 +307,20 @@ public class LocalFileSystem extends FileSystem {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Converts the given Path to a File for this file system.
+	 *
+	 * <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
+	 */
+	public File pathToFile(Path path) {
+		if (!path.isAbsolute()) {
+			path = new Path(getWorkingDirectory(), path);
+		}
+		return new File(path.toUri().getPath());
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Gets the URI that represents the local file system.
 	 * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
 	 * UNIX family platforms.

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverable.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverable.java
new file mode 100644
index 0000000..8454c3a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+
+import java.io.File;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the resume and commit descriptor objects for local recoverable streams.
+ */
+@Internal
+class LocalRecoverable implements CommitRecoverable, ResumeRecoverable {
+
+	/** The file path for the final result file. */
+	private final File targetFile;
+
+	/** The file path of the staging file. */
+	private final File tempFile;
+
+	/** The position to resume from. */
+	private final long offset;
+
+	/**
+	 * Creates a resumable for the given file at the given position.
+	 *
+	 * @param targetFile The file to resume.
+	 * @param offset The position to resume from.
+	 */
+	LocalRecoverable(File targetFile, File tempFile, long offset) {
+		checkArgument(offset >= 0, "offset must be >= 0");
+		this.targetFile = checkNotNull(targetFile, "targetFile");
+		this.tempFile = checkNotNull(tempFile, "tempFile");
+		this.offset = offset;
+	}
+
+	public File targetFile() {
+		return targetFile;
+	}
+
+	public File tempFile() {
+		return tempFile;
+	}
+
+	public long offset() {
+		return offset;
+	}
+
+	@Override
+	public String toString() {
+		return "LocalRecoverable " + tempFile + " @ " + offset + " -> " + targetFile;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..6c6a554
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RecoverableFsDataOutputStream} for the {@link LocalFileSystem}.
+ */
+@Internal
+class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+	private final File targetFile;
+
+	private final File tempFile;
+
+	private final FileOutputStream fos;
+
+	LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
+		this.targetFile = checkNotNull(targetFile);
+		this.tempFile = checkNotNull(tempFile);
+		this.fos = new FileOutputStream(tempFile);
+	}
+
+	LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
+		this.targetFile = checkNotNull(resumable.targetFile());
+		this.tempFile = checkNotNull(resumable.tempFile());
+
+		if (!tempFile.exists()) {
+			throw new FileNotFoundException("File Not Found: " + tempFile.getName());
+		}
+
+		if (tempFile.length() < resumable.offset()) {
+			throw new IOException("Missing data in tmp file: " + tempFile.getName());
+		}
+
+		this.fos = new FileOutputStream(this.tempFile, true);
+		this.fos.getChannel().truncate(resumable.offset());
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		fos.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		fos.write(b, off, len);
+	}
+
+	@Override
+	public void flush() throws IOException {
+		fos.flush();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		fos.getFD().sync();
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return fos.getChannel().position();
+	}
+
+	@Override
+	public ResumeRecoverable persist() throws IOException {
+		// we call both flush and sync in order to ensure persistence on mounted
+		// file systems, like NFS, EBS, EFS, ...
+		flush();
+		sync();
+
+		return new LocalRecoverable(targetFile, tempFile, getPos());
+	}
+
+	@Override
+	public Committer closeForCommit() throws IOException {
+		final long pos = getPos();
+		close();
+		return new LocalCommitter(new LocalRecoverable(targetFile, tempFile, pos));
+	}
+
+	@Override
+	public void close() throws IOException {
+		fos.close();
+	}
+
+	// ------------------------------------------------------------------------
+
+	static class LocalCommitter implements Committer {
+
+		private final LocalRecoverable recoverable;
+
+		LocalCommitter(LocalRecoverable recoverable) {
+			this.recoverable = checkNotNull(recoverable);
+		}
+
+		@Override
+		public void commit() throws IOException {
+			final File src = recoverable.tempFile();
+			final File dest = recoverable.targetFile();
+
+			// sanity check
+			if (src.length() != recoverable.offset()) {
+				// something was done to this file since the committer was created.
+				// this is not the "clean" case
+				throw new IOException("Cannot clean commit: File has trailing junk data.");
+			}
+
+			// rather than fall into default recovery, handle errors explicitly
+			// in order to improve error messages
+			try {
+				Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
+			}
+			catch (UnsupportedOperationException | AtomicMoveNotSupportedException e) {
+				if (!src.renameTo(dest)) {
+					throw new IOException("Committing file failed, could not rename " + src + " -> " + dest);
+				}
+			}
+			catch (FileAlreadyExistsException e) {
+				throw new IOException("Committing file failed. Target file already exists: " + dest);
+			}
+		}
+
+		@Override
+		public void commitAfterRecovery() throws IOException {
+			final File src = recoverable.tempFile();
+			final File dest = recoverable.targetFile();
+			final long expectedLength = recoverable.offset();
+
+			if (src.exists()) {
+				if (src.length() > expectedLength) {
+					// can happen if we co from persist to recovering for commit directly
+					// truncate the trailing junk away
+					try (FileOutputStream fos = new FileOutputStream(src, true)) {
+						fos.getChannel().truncate(expectedLength);
+					}
+				}
+
+				// source still exists, so no renaming happened yet. do it!
+				Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
+			}
+			else if (!dest.exists()) {
+				// neither exists - that can be a sign of
+				//   - (1) a serious problem (file system loss of data)
+				//   - (2) a recovery of a savepoint that is some time old and the users
+				//         removed the files in the meantime.
+
+				// TODO how to handle this?
+				// We probably need an option for users whether this should log,
+				// or result in an exception or unrecoverable exception
+			}
+		}
+
+		@Override
+		public CommitRecoverable getRecoverable() {
+			return recoverable;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
new file mode 100644
index 0000000..685f7c1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple serializer for the {@link LocalRecoverable}.
+ */
+@Internal
+class LocalRecoverableSerializer implements SimpleVersionedSerializer<LocalRecoverable> {
+
+	static final LocalRecoverableSerializer INSTANCE = new LocalRecoverableSerializer();
+
+	private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+	private static final int MAGIC_NUMBER = 0x1e744b57;
+
+	/**
+	 * Do not instantiate, use reusable {@link #INSTANCE} instead.
+	 */
+	private LocalRecoverableSerializer() {}
+
+	@Override
+	public int getVersion() {
+		return 1;
+	}
+
+	@Override
+	public byte[] serialize(LocalRecoverable obj) throws IOException {
+		final byte[] targetFileBytes = obj.targetFile().getAbsolutePath().getBytes(CHARSET);
+		final byte[] tempFileBytes = obj.tempFile().getAbsolutePath().getBytes(CHARSET);
+		final byte[] targetBytes = new byte[20 + targetFileBytes.length + tempFileBytes.length];
+
+		ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+		bb.putInt(MAGIC_NUMBER);
+		bb.putLong(obj.offset());
+		bb.putInt(targetFileBytes.length);
+		bb.putInt(tempFileBytes.length);
+		bb.put(targetFileBytes);
+		bb.put(tempFileBytes);
+
+		return targetBytes;
+	}
+
+	@Override
+	public LocalRecoverable deserialize(int version, byte[] serialized) throws IOException {
+		switch (version) {
+			case 1:
+				return deserializeV1(serialized);
+			default:
+				throw new IOException("Unrecognized version or corrupt state: " + version);
+		}
+	}
+
+	private static LocalRecoverable deserializeV1(byte[] serialized) throws IOException {
+		final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+		if (bb.getInt() != MAGIC_NUMBER) {
+			throw new IOException("Corrupt data: Unexpected magic number.");
+		}
+
+		final long offset = bb.getLong();
+		final byte[] targetFileBytes = new byte[bb.getInt()];
+		final byte[] tempFileBytes = new byte[bb.getInt()];
+		bb.get(targetFileBytes);
+		bb.get(tempFileBytes);
+
+		final String targetPath = new String(targetFileBytes, CHARSET);
+		final String tempPath = new String(tempFileBytes, CHARSET);
+
+		return new LocalRecoverable(new File(targetPath), new File(tempPath), offset);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
new file mode 100644
index 0000000..a2f0485
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RecoverableWriter} for the {@link LocalFileSystem}.
+ */
+@Internal
+public class LocalRecoverableWriter implements RecoverableWriter {
+
+	private final LocalFileSystem fs;
+
+	public LocalRecoverableWriter(LocalFileSystem fs) {
+		this.fs = checkNotNull(fs);
+	}
+
+	@Override
+	public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
+		final File targetFile = fs.pathToFile(filePath);
+		final File tempFile = generateStagingTempFilePath(targetFile);
+
+		// try to create the parent
+		final File parent = tempFile.getParentFile();
+		if (parent != null && !parent.mkdirs() && !parent.exists()) {
+			throw new IOException("Failed to create the parent directory: " + parent);
+		}
+
+		return new LocalRecoverableFsDataOutputStream(targetFile, tempFile);
+	}
+
+	@Override
+	public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
+		if (recoverable instanceof LocalRecoverable) {
+			return new LocalRecoverableFsDataOutputStream((LocalRecoverable) recoverable);
+		}
+		else {
+			throw new IllegalArgumentException(
+					"LocalFileSystem cannot recover recoverable for other file system: " + recoverable);
+		}
+	}
+
+	@Override
+	public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
+		if (recoverable instanceof LocalRecoverable) {
+			return new LocalRecoverableFsDataOutputStream.LocalCommitter((LocalRecoverable) recoverable);
+		}
+		else {
+			throw new IllegalArgumentException(
+					"LocalFileSystem cannot recover recoverable for other file system: " + recoverable);
+		}
+	}
+
+	@Override
+	public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+		@SuppressWarnings("unchecked")
+		SimpleVersionedSerializer<CommitRecoverable> typedSerializer = (SimpleVersionedSerializer<CommitRecoverable>)
+				(SimpleVersionedSerializer<?>) LocalRecoverableSerializer.INSTANCE;
+
+		return typedSerializer;
+	}
+
+	@Override
+	public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+		@SuppressWarnings("unchecked")
+		SimpleVersionedSerializer<ResumeRecoverable> typedSerializer = (SimpleVersionedSerializer<ResumeRecoverable>)
+				(SimpleVersionedSerializer<?>) LocalRecoverableSerializer.INSTANCE;
+
+		return typedSerializer;
+	}
+
+	@Override
+	public boolean supportsResume() {
+		return true;
+	}
+
+	@VisibleForTesting
+	static File generateStagingTempFilePath(File targetFile) {
+		checkArgument(targetFile.isAbsolute(), "targetFile must be absolute");
+		checkArgument(!targetFile.isDirectory(), "targetFile must not be a directory");
+
+		final File parent = targetFile.getParentFile();
+		final String name = targetFile.getName();
+
+		checkArgument(parent != null, "targetFile must not be the root directory");
+
+		while (true) {
+			File candidate = new File(parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
+			if (!candidate.exists()) {
+				return candidate;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index f17d7e1..065ba5a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
 
 import java.io.IOException;
 import java.net.URI;
@@ -193,11 +194,19 @@ public class HadoopFileSystem extends FileSystem {
 		return fsKind;
 	}
 
+	@Override
+	public RecoverableWriter createRecoverableWriter() throws IOException {
+		// This writer is only supported on a subset of file systems, and on
+		// specific versions. We check these schemes and versions eagerly for better error
+		// messages in the constructor of the writer.
+		return new HadoopRecoverableWriter(fs);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
+	static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
 		return new org.apache.hadoop.fs.Path(path.toUri());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsRecoverable.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsRecoverable.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsRecoverable.java
new file mode 100644
index 0000000..1eaa8f4
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsRecoverable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the resume and commit descriptor objects for Hadoop's
+ * file system abstraction.
+ */
+class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {
+
+	/** The file path for the final result file. */
+	private final Path targetFile;
+
+	/** The file path of the staging file. */
+	private final Path tempFile;
+
+	/** The position to resume from. */
+	private final long offset;
+
+	/**
+	 * Creates a resumable for the given file at the given position.
+	 *
+	 * @param targetFile The file to resume.
+	 * @param offset The position to resume from.
+	 */
+	HadoopFsRecoverable(Path targetFile, Path tempFile, long offset) {
+		checkArgument(offset >= 0, "offset must be >= 0");
+		this.targetFile = checkNotNull(targetFile, "targetFile");
+		this.tempFile = checkNotNull(tempFile, "tempFile");
+		this.offset = offset;
+	}
+
+	public Path targetFile() {
+		return targetFile;
+	}
+
+	public Path tempFile() {
+		return tempFile;
+	}
+
+	public long offset() {
+		return offset;
+	}
+
+	@Override
+	public String toString() {
+		return "HadoopFsRecoverable " + tempFile + " @ " + offset + " -> " + targetFile;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..f944dc5
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Internal
+class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+	private static final long LEASE_TIMEOUT = 100000L;
+
+	private static Method truncateHandle;
+
+	private final FileSystem fs;
+
+	private final Path targetFile;
+
+	private final Path tempFile;
+
+	private final FSDataOutputStream out;
+
+	HadoopRecoverableFsDataOutputStream(
+			FileSystem fs,
+			Path targetFile,
+			Path tempFile) throws IOException {
+
+		ensureTruncateInitialized();
+
+		this.fs = checkNotNull(fs);
+		this.targetFile = checkNotNull(targetFile);
+		this.tempFile = checkNotNull(tempFile);
+		this.out = fs.create(tempFile);
+	}
+
+	HadoopRecoverableFsDataOutputStream(
+			FileSystem fs,
+			HadoopFsRecoverable recoverable) throws IOException {
+
+		ensureTruncateInitialized();
+
+		this.fs = checkNotNull(fs);
+		this.targetFile = checkNotNull(recoverable.targetFile());
+		this.tempFile = checkNotNull(recoverable.tempFile());
+
+		// the getFileStatus will throw a FileNotFound exception if the file is not there.
+		final FileStatus tmpFileStatus = fs.getFileStatus(tempFile);
+		if (tmpFileStatus.getLen() < recoverable.offset()) {
+			throw new IOException("Missing data in tmp file: " + tempFile.getName());
+		}
+
+		// truncate back and append
+		truncate(fs, tempFile, recoverable.offset());
+		waitUntilLeaseIsRevoked(tempFile);
+		out = fs.append(tempFile);
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		out.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		out.write(b, off, len);
+	}
+
+	@Override
+	public void flush() throws IOException {
+		out.hflush();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		out.hsync();
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return out.getPos();
+	}
+
+	@Override
+	public ResumeRecoverable persist() throws IOException {
+		sync();
+		return new HadoopFsRecoverable(targetFile, tempFile, getPos());
+	}
+
+	@Override
+	public Committer closeForCommit() throws IOException {
+		final long pos = getPos();
+		close();
+		return new HadoopFsCommitter(fs, new HadoopFsRecoverable(targetFile, tempFile, pos));
+	}
+
+	@Override
+	public void close() throws IOException {
+		out.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Reflection utils for truncation
+	//    These are needed to compile against Hadoop versions before
+	//    Hadoop 2.7, which have no truncation calls for HDFS.
+	// ------------------------------------------------------------------------
+
+	private static void ensureTruncateInitialized() throws FlinkRuntimeException {
+		if (truncateHandle == null) {
+			Method truncateMethod;
+			try {
+				truncateMethod = FileSystem.class.getMethod("truncate", Path.class, long.class);
+			}
+			catch (NoSuchMethodException e) {
+				throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
+			}
+
+			if (!Modifier.isPublic(truncateMethod.getModifiers())) {
+				throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
+			}
+
+			truncateHandle = truncateMethod;
+		}
+	}
+
+	static void truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
+		if (truncateHandle != null) {
+			try {
+				truncateHandle.invoke(hadoopFs, file, length);
+			}
+			catch (InvocationTargetException e) {
+				ExceptionUtils.rethrowIOException(e.getTargetException());
+			}
+			catch (Throwable t) {
+				throw new IOException(
+						"Truncation of file failed because of access/linking problems with Hadoop's truncate call. " +
+								"This is most likely a dependency conflict or class loading problem.");
+			}
+		}
+		else {
+			throw new IllegalStateException("Truncation handle has not been initialized");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Committer
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Implementation of a committer for the Hadoop File System abstraction.
+	 * This implementation commits by renaming the temp file to the final file path.
+	 * The temp file is truncated before renaming in case there is trailing garbage data.
+	 */
+	static class HadoopFsCommitter implements Committer {
+
+		private final FileSystem fs;
+		private final HadoopFsRecoverable recoverable;
+
+		HadoopFsCommitter(FileSystem fs, HadoopFsRecoverable recoverable) {
+			this.fs = checkNotNull(fs);
+			this.recoverable = checkNotNull(recoverable);
+		}
+
+		@Override
+		public void commit() throws IOException {
+			final Path src = recoverable.tempFile();
+			final Path dest = recoverable.targetFile();
+			final long expectedLength = recoverable.offset();
+
+			final FileStatus srcStatus;
+			try {
+				srcStatus = fs.getFileStatus(src);
+			}
+			catch (IOException e) {
+				throw new IOException("Cannot clean commit: Staging file does not exist.");
+			}
+
+			if (srcStatus.getLen() != expectedLength) {
+				// something was done to this file since the committer was created.
+				// this is not the "clean" case
+				throw new IOException("Cannot clean commit: File has trailing junk data.");
+			}
+
+			try {
+				fs.rename(src, dest);
+			}
+			catch (IOException e) {
+				throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
+			}
+		}
+
+		@Override
+		public void commitAfterRecovery() throws IOException {
+			final Path src = recoverable.tempFile();
+			final Path dest = recoverable.targetFile();
+			final long expectedLength = recoverable.offset();
+
+			FileStatus srcStatus = null;
+			try {
+				srcStatus = fs.getFileStatus(src);
+			}
+			catch (FileNotFoundException e) {
+				// status remains null
+			}
+			catch (IOException e) {
+				throw new IOException("Committing during recovery failed: Could not access status of source file.");
+			}
+
+			if (srcStatus != null) {
+				if (srcStatus.getLen() > expectedLength) {
+					// can happen if we co from persist to recovering for commit directly
+					// truncate the trailing junk away
+					truncate(fs, src, expectedLength);
+				}
+			}
+			else if (!fs.exists(dest)) {
+				// neither exists - that can be a sign of
+				//   - (1) a serious problem (file system loss of data)
+				//   - (2) a recovery of a savepoint that is some time old and the users
+				//         removed the files in the meantime.
+
+				// TODO how to handle this?
+				// We probably need an option for users whether this should log,
+				// or result in an exception or unrecoverable exception
+			}
+		}
+
+		@Override
+		public CommitRecoverable getRecoverable() {
+			return recoverable;
+		}
+	}
+
+	/**
+	 * Called when resuming execution after a failure and waits until the lease
+	 * of the file we are resuming is free.
+	 *
+	 * <p>The lease of the file we are resuming writing/committing to may still
+	 * belong to the process that failed previously and whose state we are
+	 * recovering.
+	 *
+	 * @param path The path to the file we want to resume writing to.
+	 */
+	private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException {
+		Preconditions.checkState(fs instanceof DistributedFileSystem);
+
+		final DistributedFileSystem dfs = (DistributedFileSystem) fs;
+		dfs.recoverLease(path);
+		boolean isclosed = dfs.isFileClosed(path);
+
+		final StopWatch sw = new StopWatch();
+		sw.start();
+
+		while (!isclosed) {
+			if (sw.getTime() > LEASE_TIMEOUT) {
+				break;
+			}
+
+			try {
+				Thread.sleep(500L);
+			} catch (InterruptedException e1) {
+				// ignore it
+			}
+			isclosed = dfs.isFileClosed(path);
+		}
+		return isclosed;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java
new file mode 100644
index 0000000..941e96a
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple serializer for the {@link HadoopFsRecoverable}.
+ */
+@Internal
+class HadoopRecoverableSerializer implements SimpleVersionedSerializer<HadoopFsRecoverable> {
+
+	static final HadoopRecoverableSerializer INSTANCE = new HadoopRecoverableSerializer();
+
+	private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+	private static final int MAGIC_NUMBER = 0xd7436c5e;
+
+	/**
+	 * Do not instantiate, use reusable {@link #INSTANCE} instead.
+	 */
+	private HadoopRecoverableSerializer() {}
+
+	@Override
+	public int getVersion() {
+		return 1;
+	}
+
+	@Override
+	public byte[] serialize(HadoopFsRecoverable obj) throws IOException {
+		final byte[] targetFileBytes = obj.targetFile().toString().getBytes(CHARSET);
+		final byte[] tempFileBytes = obj.tempFile().toString().getBytes(CHARSET);
+		final byte[] targetBytes = new byte[20 + targetFileBytes.length + tempFileBytes.length];
+
+		ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+		bb.putInt(MAGIC_NUMBER);
+		bb.putLong(obj.offset());
+		bb.putInt(targetFileBytes.length);
+		bb.putInt(tempFileBytes.length);
+		bb.put(targetFileBytes);
+		bb.put(tempFileBytes);
+
+		return targetBytes;
+	}
+
+	@Override
+	public HadoopFsRecoverable deserialize(int version, byte[] serialized) throws IOException {
+		switch (version) {
+			case 1:
+				return deserializeV1(serialized);
+			default:
+				throw new IOException("Unrecognized version or corrupt state: " + version);
+		}
+	}
+
+	private static HadoopFsRecoverable deserializeV1(byte[] serialized) throws IOException {
+		final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+		if (bb.getInt() != MAGIC_NUMBER) {
+			throw new IOException("Corrupt data: Unexpected magic number.");
+		}
+
+		final long offset = bb.getLong();
+		final byte[] targetFileBytes = new byte[bb.getInt()];
+		final byte[] tempFileBytes = new byte[bb.getInt()];
+		bb.get(targetFileBytes);
+		bb.get(tempFileBytes);
+
+		final String targetPath = new String(targetFileBytes, CHARSET);
+		final String tempPath = new String(tempFileBytes, CHARSET);
+
+		return new HadoopFsRecoverable(new Path(targetPath), new Path(tempPath), offset);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
new file mode 100644
index 0000000..305f8ee
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableWriter} for
+ * Hadoop's file system abstraction.
+ */
+@Internal
+public class HadoopRecoverableWriter implements RecoverableWriter {
+
+	/** The Hadoop file system on which the writer operates. */
+	private final org.apache.hadoop.fs.FileSystem fs;
+
+	/**
+	 * Creates a new Recoverable writer.
+	 * @param fs The Hadoop file system on which the writer operates.
+	 */
+	public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {
+		this.fs = checkNotNull(fs);
+
+		// This writer is only supported on a subset of file systems, and on
+		// specific versions. We check these schemes and versions eagerly for
+		// better error messages.
+		if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) {
+			throw new UnsupportedOperationException(
+					"Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer");
+		}
+	}
+
+	@Override
+	public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
+		final org.apache.hadoop.fs.Path targetFile = HadoopFileSystem.toHadoopPath(filePath);
+		final org.apache.hadoop.fs.Path tempFile = generateStagingTempFilePath(fs, targetFile);
+		return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile);
+	}
+
+	@Override
+	public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
+		if (recoverable instanceof HadoopFsRecoverable) {
+			return new HadoopRecoverableFsDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
+		}
+		else {
+			throw new IllegalArgumentException(
+					"Hadoop File System cannot recover a recoverable for another file system: " + recoverable);
+		}
+	}
+
+	@Override
+	public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
+		if (recoverable instanceof HadoopFsRecoverable) {
+			return new HadoopRecoverableFsDataOutputStream.HadoopFsCommitter(fs, (HadoopFsRecoverable) recoverable);
+		}
+		else {
+			throw new IllegalArgumentException(
+					"Hadoop File System  cannot recover a recoverable for another file system: " + recoverable);
+		}
+	}
+
+	@Override
+	public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+		@SuppressWarnings("unchecked")
+		SimpleVersionedSerializer<CommitRecoverable> typedSerializer = (SimpleVersionedSerializer<CommitRecoverable>)
+				(SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+
+		return typedSerializer;
+	}
+
+	@Override
+	public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+		@SuppressWarnings("unchecked")
+		SimpleVersionedSerializer<ResumeRecoverable> typedSerializer = (SimpleVersionedSerializer<ResumeRecoverable>)
+				(SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+
+		return typedSerializer;
+	}
+
+	@Override
+	public boolean supportsResume() {
+		return true;
+	}
+
+	@VisibleForTesting
+	static org.apache.hadoop.fs.Path generateStagingTempFilePath(
+			org.apache.hadoop.fs.FileSystem fs,
+			org.apache.hadoop.fs.Path targetFile) throws IOException {
+
+		checkArgument(targetFile.isAbsolute(), "targetFile must be absolute");
+
+		final org.apache.hadoop.fs.Path parent = targetFile.getParent();
+		final String name = targetFile.getName();
+
+		checkArgument(parent != null, "targetFile must not be the root directory");
+
+		while (true) {
+			org.apache.hadoop.fs.Path candidate = new org.apache.hadoop.fs.Path(
+					parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
+			if (!fs.exists(candidate)) {
+				return candidate;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 5b14f43..09a9e54 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -20,12 +20,14 @@ package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.ConfigConstants;
 
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.VersionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,4 +124,22 @@ public class HadoopUtils {
 		}
 		return false;
 	}
+
+	/**
+	 * Checks if the Hadoop dependency is at least of the given version.
+	 */
+	public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException {
+		String versionString = VersionInfo.getVersion();
+		String[] versionParts = versionString.split("\\.");
+
+		if (versionParts.length < 2) {
+			throw new FlinkRuntimeException(
+					"Cannot determine version of Hadoop, unexpected version string: " + versionString);
+		}
+
+		int maj = Integer.parseInt(versionParts[0]);
+		int min = Integer.parseInt(versionParts[1]);
+
+		return maj > major || (maj == major && min >= minor);
+	}
 }


[4/6] flink git commit: [hotfix] [core] Fix checkstyle workaround in FileSystem

Posted by se...@apache.org.
[hotfix] [core] Fix checkstyle workaround in FileSystem


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2475989
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2475989
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2475989

Branch: refs/heads/master
Commit: f247598982fd82aeb2c609434cf875daef3ffabb
Parents: 548e9ed
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 12:33:29 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:18:56 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/core/fs/FileSystem.java    | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2475989/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 8698595..27255f9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -229,9 +229,7 @@ public abstract class FileSystem {
 
 	/** The default filesystem scheme to be used, configured during process-wide initialization.
 	 * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
-	//CHECKSTYLE.OFF: StaticVariableName
-	private static URI DEFAULT_SCHEME;
-	//CHECKSTYLE.ON: StaticVariableName
+	private static URI defaultScheme;
 
 	// ------------------------------------------------------------------------
 	//  Initialization
@@ -275,11 +273,11 @@ public abstract class FileSystem {
 			// also read the default file system scheme
 			final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
 			if (stringifiedUri == null) {
-				DEFAULT_SCHEME = null;
+				defaultScheme = null;
 			}
 			else {
 				try {
-					DEFAULT_SCHEME = new URI(stringifiedUri);
+					defaultScheme = new URI(stringifiedUri);
 				}
 				catch (URISyntaxException e) {
 					throw new IllegalConfigurationException("The default file system scheme ('" +
@@ -427,7 +425,7 @@ public abstract class FileSystem {
 	 * @return The default file system URI
 	 */
 	public static URI getDefaultFsUri() {
-		return DEFAULT_SCHEME != null ? DEFAULT_SCHEME : LocalFileSystem.getLocalFsURI();
+		return defaultScheme != null ? defaultScheme : LocalFileSystem.getLocalFsURI();
 	}
 
 	// ------------------------------------------------------------------------


[6/6] flink git commit: [FLINK-9751] [filesystem] Use FileChannel directly in LocalRecoverableFsDataOutputStream

Posted by se...@apache.org.
[FLINK-9751] [filesystem] Use FileChannel directly in LocalRecoverableFsDataOutputStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a912e60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a912e60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a912e60

Branch: refs/heads/master
Commit: 7a912e603a040acfc46d394e86794b2c031298cb
Parents: 9d238e1
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 15:13:21 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:20:44 2018 +0200

----------------------------------------------------------------------
 .../LocalRecoverableFsDataOutputStream.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a912e60/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index fd8e8fe..df9288e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -27,10 +27,14 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
 import java.nio.file.AtomicMoveNotSupportedException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -44,12 +48,16 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 
 	private final File tempFile;
 
-	private final FileOutputStream fos;
+	private final FileChannel fileChannel;
+
+	private final OutputStream fos;
 
 	LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
 		this.targetFile = checkNotNull(targetFile);
 		this.tempFile = checkNotNull(tempFile);
-		this.fos = new FileOutputStream(tempFile);
+
+		this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+		this.fos = Channels.newOutputStream(fileChannel);
 	}
 
 	LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
@@ -57,15 +65,15 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 		this.tempFile = checkNotNull(resumable.tempFile());
 
 		if (!tempFile.exists()) {
-			throw new FileNotFoundException("File Not Found: " + tempFile);
+			throw new FileNotFoundException("File Not Found: " + tempFile.getAbsolutePath());
 		}
 
-		if (tempFile.length() < resumable.offset()) {
-			throw new IOException("Missing data in tmp file: " + tempFile);
+		this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND);
+		if (this.fileChannel.position() < resumable.offset()) {
+			throw new IOException("Missing data in tmp file: " + tempFile.getAbsolutePath());
 		}
-
-		this.fos = new FileOutputStream(this.tempFile, true);
-		this.fos.getChannel().truncate(resumable.offset());
+		this.fileChannel.truncate(resumable.offset());
+		this.fos = Channels.newOutputStream(fileChannel);
 	}
 
 	@Override
@@ -85,12 +93,12 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 
 	@Override
 	public void sync() throws IOException {
-		fos.getFD().sync();
+		fileChannel.force(true);
 	}
 
 	@Override
 	public long getPos() throws IOException {
-		return fos.getChannel().position();
+		return fileChannel.position();
 	}
 
 	@Override


[2/6] flink git commit: [hotfix] [core] Fix checkstyle in flink-core:'org.apache.flink.api.common.io.compression'

Posted by se...@apache.org.
[hotfix] [core] Fix checkstyle in flink-core:'org.apache.flink.api.common.io.compression'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/548e9ed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/548e9ed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/548e9ed5

Branch: refs/heads/master
Commit: 548e9ed55efaf9eb22c5f3031558f0371df0d08c
Parents: 695bc56
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 15:23:38 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:18:56 2018 +0200

----------------------------------------------------------------------
 .../common/io/compression/Bzip2InputStreamFactory.java | 13 +++++++------
 .../compression/DeflateInflaterInputStreamFactory.java |  6 ++----
 .../io/compression/GzipInflaterInputStreamFactory.java |  7 +++----
 .../common/io/compression/XZInputStreamFactory.java    | 12 +++++++-----
 4 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/548e9ed5/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java
index d204907..a1692a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java
@@ -15,26 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.io.compression;
 
 import org.apache.flink.annotation.Internal;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
 import java.util.Collections;
 
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-
+/**
+ * Factory for Bzip2 decompressors.
+ */
 @Internal
 public class Bzip2InputStreamFactory implements InflaterInputStreamFactory<BZip2CompressorInputStream> {
 
-	private static Bzip2InputStreamFactory INSTANCE = null;
+	private static final Bzip2InputStreamFactory INSTANCE = new Bzip2InputStreamFactory();
 
 	public static Bzip2InputStreamFactory getInstance() {
-		if (INSTANCE == null) {
-			INSTANCE = new Bzip2InputStreamFactory();
-		}
 		return INSTANCE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/548e9ed5/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
index b5051e6..33f7d51 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.io.compression;
 
 import org.apache.flink.annotation.Internal;
@@ -31,12 +32,9 @@ import java.util.zip.InflaterInputStream;
 @Internal
 public class DeflateInflaterInputStreamFactory implements InflaterInputStreamFactory<InflaterInputStream> {
 
-	private static DeflateInflaterInputStreamFactory INSTANCE = null;
+	private static final DeflateInflaterInputStreamFactory INSTANCE = new DeflateInflaterInputStreamFactory();
 
 	public static DeflateInflaterInputStreamFactory getInstance() {
-		if (INSTANCE == null) {
-			INSTANCE = new DeflateInflaterInputStreamFactory();
-		}
 		return INSTANCE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/548e9ed5/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
index 478eb2d..335e365 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.io.compression;
 
 import org.apache.flink.annotation.Internal;
@@ -31,14 +32,12 @@ import java.util.zip.GZIPInputStream;
 @Internal
 public class GzipInflaterInputStreamFactory implements InflaterInputStreamFactory<GZIPInputStream> {
 
-	private static GzipInflaterInputStreamFactory INSTANCE = null;
+	private static final GzipInflaterInputStreamFactory INSTANCE = new GzipInflaterInputStreamFactory();
 
 	public static GzipInflaterInputStreamFactory getInstance() {
-		if (INSTANCE == null) {
-			INSTANCE = new GzipInflaterInputStreamFactory();
-		}
 		return INSTANCE;
 	}
+
 	@Override
 	public GZIPInputStream create(InputStream in) throws IOException {
 		return new GZIPInputStream(in);

http://git-wip-us.apache.org/repos/asf/flink/blob/548e9ed5/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java
index c80de40..0802ab6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java
@@ -15,25 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.io.compression;
 
-import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
 import org.apache.flink.annotation.Internal;
 
+import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
 import java.util.Collections;
 
+/**
+ * Factory for XZ decompressors.
+ */
 @Internal
 public class XZInputStreamFactory implements InflaterInputStreamFactory<XZCompressorInputStream> {
 
-	private static XZInputStreamFactory INSTANCE = null;
+	private static final  XZInputStreamFactory INSTANCE = new XZInputStreamFactory();
 
 	public static XZInputStreamFactory getInstance() {
-		if (INSTANCE == null) {
-			INSTANCE = new XZInputStreamFactory();
-		}
 		return INSTANCE;
 	}
 


[3/6] flink git commit: [hotfix] [core] Remove unused class AbstractMultiFSDataInputStream

Posted by se...@apache.org.
[hotfix] [core] Remove unused class AbstractMultiFSDataInputStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66e0a271
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66e0a271
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66e0a271

Branch: refs/heads/master
Commit: 66e0a271ad24277330f1de54db7798a2389a671a
Parents: f247598
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 29 19:27:58 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:18:56 2018 +0200

----------------------------------------------------------------------
 .../core/fs/AbstractMultiFSDataInputStream.java | 115 -------------------
 1 file changed, 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66e0a271/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
deleted file mode 100644
index e01ac2e..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-/**
- * Abstract base class for wrappers over multiple {@link FSDataInputStream}, which gives a contiguous view on all inner
- * streams and makes them look like a single stream, in which we can read, seek, etc.
- */
-@Internal
-public abstract class AbstractMultiFSDataInputStream extends FSDataInputStream {
-
-	/** Inner stream for the currently accessed segment of the virtual global stream. */
-	protected FSDataInputStream delegate;
-
-	/** Position in the virtual global stream. */
-	protected long totalPos;
-
-	/** Total available bytes in the virtual global stream. */
-	protected long totalAvailable;
-
-	public AbstractMultiFSDataInputStream() {
-		this.totalPos = 0L;
-	}
-
-	@Override
-	public void seek(long desired) throws IOException {
-
-		if (desired == totalPos) {
-			return;
-		}
-
-		Preconditions.checkArgument(desired >= 0L);
-
-		if (desired > totalAvailable) {
-			throw new EOFException();
-		}
-
-		IOUtils.closeQuietly(delegate);
-		delegate = getSeekedStreamForOffset(desired);
-
-		this.totalPos = desired;
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		return totalPos;
-	}
-
-	@Override
-	public int read() throws IOException {
-
-		if (null == delegate) {
-			return -1;
-		}
-
-		int val = delegate.read();
-
-		if (-1 == val) {
-			IOUtils.closeQuietly(delegate);
-			if (totalPos < totalAvailable) {
-				delegate = getSeekedStreamForOffset(totalPos);
-			} else {
-				delegate = null;
-			}
-			return read();
-		}
-
-		++totalPos;
-		return val;
-	}
-
-	@Override
-	public void close() throws IOException {
-		IOUtils.closeQuietly(delegate);
-	}
-
-	@Override
-	public long skip(long n) throws IOException {
-		seek(totalPos + n);
-		return n;
-	}
-
-	/**
-	 * Delivers a the right stream for the given global stream offset. The returned stream is already seeked to the
-	 * right local offset that correctly reflects the global offset.
-	 *
-	 * @param globalStreamOffset the global offset to which we seek
-	 * @return a sub-stream, seeked to the correct local offset w.r.t. the global offset.
-	 * @throws IOException
-	 */
-	protected abstract FSDataInputStream getSeekedStreamForOffset(long globalStreamOffset) throws IOException;
-}


[5/6] flink git commit: [FLINK-9751] [filesystem] Add fixes and tests for Persistent Resumable Writers

Posted by se...@apache.org.
[FLINK-9751] [filesystem] Add fixes and tests for Persistent Resumable Writers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d238e1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d238e1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d238e1a

Branch: refs/heads/master
Commit: 9d238e1a170a96c311b0dafa92f572c2d97bbcad
Parents: e296094
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 15 23:20:37 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:20:37 2018 +0200

----------------------------------------------------------------------
 .../LocalRecoverableFsDataOutputStream.java     |   6 +-
 .../core/io/SimpleVersionedSerializer.java      |   3 +
 .../core/fs/AbstractResumableWriterTest.java    | 380 +++++++++++++++++++
 .../LocalFileSystemResumableWriterTest.java     |  45 +++
 .../HadoopRecoverableFsDataOutputStream.java    |  62 ++-
 .../apache/flink/runtime/util/HadoopUtils.java  |   2 +-
 .../fs/hdfs/HadoopResumableWriterTest.java      |  95 +++++
 7 files changed, 572 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index 6c6a554..fd8e8fe 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -57,11 +57,11 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 		this.tempFile = checkNotNull(resumable.tempFile());
 
 		if (!tempFile.exists()) {
-			throw new FileNotFoundException("File Not Found: " + tempFile.getName());
+			throw new FileNotFoundException("File Not Found: " + tempFile);
 		}
 
 		if (tempFile.length() < resumable.offset()) {
-			throw new IOException("Missing data in tmp file: " + tempFile.getName());
+			throw new IOException("Missing data in tmp file: " + tempFile);
 		}
 
 		this.fos = new FileOutputStream(this.tempFile, true);
@@ -165,6 +165,8 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 					try (FileOutputStream fos = new FileOutputStream(src, true)) {
 						fos.getChannel().truncate(expectedLength);
 					}
+				} else if (src.length() < expectedLength) {
+					throw new IOException("Missing data in tmp file: " + src);
 				}
 
 				// source still exists, so no renaming happened yet. do it!

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
index 6c061a5..4dfeea2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.core.io;
 
+import org.apache.flink.annotation.Internal;
+
 import java.io.IOException;
 
 /**
@@ -44,6 +46,7 @@ import java.io.IOException;
  * 
  * @param <E> The data type serialized / deserialized by this serializer.
  */
+@Internal
 public interface SimpleVersionedSerializer<E> extends Versioned {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
new file mode 100644
index 0000000..8077305
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A base test-suite for the {@link RecoverableWriter}.
+ * This should be subclassed to test each filesystem specific writer.
+ */
+public abstract class AbstractResumableWriterTest extends TestLogger {
+
+	private static final Random RND = new Random();
+
+	private static final String testData1 = "THIS IS A TEST 1.";
+	private static final String testData2 = "THIS IS A TEST 2.";
+	private static final String testData3 = "THIS IS A TEST 3.";
+
+	private Path basePathForTest;
+
+	private static FileSystem fileSystem;
+
+	public abstract Path getBasePath() throws Exception;
+
+	public abstract FileSystem initializeFileSystem();
+
+	public Path getBasePathForTest() {
+		return basePathForTest;
+	}
+
+	private FileSystem getFileSystem() {
+		if (fileSystem == null) {
+			fileSystem = initializeFileSystem();
+		}
+		return fileSystem;
+	}
+
+	private RecoverableWriter getNewFileSystemWriter() throws IOException {
+		return getFileSystem().createRecoverableWriter();
+	}
+
+	@Before
+	public void prepare() throws Exception {
+		basePathForTest = new Path(getBasePath(), randomName());
+		getFileSystem().mkdirs(basePathForTest);
+	}
+
+	@After
+	public void cleanup() throws Exception {
+		getFileSystem().delete(basePathForTest, true);
+	}
+
+	@Test
+	public void testCloseWithNoData() throws Exception {
+		final RecoverableWriter writer = getNewFileSystemWriter();
+
+		final Path testDir = getBasePathForTest();
+
+		final Path path = new Path(testDir, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+			Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+			Assert.assertTrue(fileContents.getValue().isEmpty());
+		}
+
+		stream.closeForCommit().commit();
+
+		for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+			Assert.assertEquals("part-0", fileContents.getKey().getName());
+			Assert.assertTrue(fileContents.getValue().isEmpty());
+		}
+	}
+
+	@Test
+	public void testCommitAfterNormalClose() throws Exception {
+		final RecoverableWriter writer = getNewFileSystemWriter();
+
+		final Path testDir = getBasePathForTest();
+
+		final Path path = new Path(testDir, "part-0");
+
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+			stream.closeForCommit().commit();
+
+			for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+				Assert.assertEquals("part-0", fileContents.getKey().getName());
+				Assert.assertEquals(testData1, fileContents.getValue());
+			}
+		}
+	}
+
+	@Test
+	public void testCommitAfterPersist() throws Exception {
+		final RecoverableWriter writer = getNewFileSystemWriter();
+
+		final Path testDir = getBasePathForTest();
+
+		final Path path = new Path(testDir, "part-0");
+
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+			stream.persist();
+
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+			stream.closeForCommit().commit();
+
+			for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+				Assert.assertEquals("part-0", fileContents.getKey().getName());
+				Assert.assertEquals(testData1 + testData2, fileContents.getValue());
+			}
+		}
+	}
+
+	// TESTS FOR RECOVERY
+
+	private static final String INIT_EMPTY_PERSIST = "EMPTY";
+	private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
+	private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
+	private static final String FINAL_WITH_EXTRA_STATE = "FINAL";
+
+	@Test
+	public void testRecoverWithEmptyState() throws Exception {
+		testResumeAfterMultiplePersist(
+				INIT_EMPTY_PERSIST,
+				"",
+				testData3);
+	}
+
+	@Test
+	public void testRecoverWithState() throws Exception {
+		testResumeAfterMultiplePersist(
+				INTERM_WITH_STATE_PERSIST,
+				testData1,
+				testData1 + testData3);
+	}
+
+	@Test
+	public void testRecoverFromIntermWithoutAdditionalState() throws Exception {
+		testResumeAfterMultiplePersist(
+				INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
+				testData1,
+				testData1 + testData3);
+	}
+
+	@Test
+	public void testRecoverAfterMultiplePersistsState() throws Exception {
+		testResumeAfterMultiplePersist(
+				FINAL_WITH_EXTRA_STATE,
+				testData1 + testData2,
+				testData1 + testData2 + testData3);
+	}
+
+	private void testResumeAfterMultiplePersist(
+			final String persistName,
+			final String expectedPostRecoveryContents,
+			final String expectedFinalContents) throws Exception {
+
+		final Path testDir = getBasePathForTest();
+		final Path path = new Path(testDir, "part-0");
+
+		final RecoverableWriter initWriter = getNewFileSystemWriter();
+
+		final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4);
+		try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+			recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
+
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
+			recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());
+
+			// and write some more data
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+			recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
+		}
+
+		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer();
+		final byte[] serializedRecoverable = serializer.serialize(recoverables.get(persistName));
+
+		// get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+		final RecoverableWriter newWriter = getNewFileSystemWriter();
+		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> deserializer = newWriter.getResumeRecoverableSerializer();
+		final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
+				deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
+
+		try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) {
+
+			// we expect the data to be truncated
+			Map<Path, String> files = getFileContentByPath(testDir);
+			Assert.assertEquals(1L, files.size());
+
+			for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+				Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+				Assert.assertEquals(expectedPostRecoveryContents, fileContents.getValue());
+			}
+
+			recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8));
+			recoveredStream.closeForCommit().commit();
+
+			files = getFileContentByPath(testDir);
+			Assert.assertEquals(1L, files.size());
+
+			for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+				Assert.assertEquals("part-0", fileContents.getKey().getName());
+				Assert.assertEquals(expectedFinalContents, fileContents.getValue());
+			}
+		}
+	}
+
+	@Test
+	public void testCommitAfterRecovery() throws Exception {
+		final Path testDir = getBasePathForTest();
+		final Path path = new Path(testDir, "part-0");
+
+		final RecoverableWriter initWriter = getNewFileSystemWriter();
+
+		final RecoverableWriter.CommitRecoverable recoverable;
+		try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			stream.persist();
+			stream.persist();
+
+			// and write some more data
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+			recoverable = stream.closeForCommit().getRecoverable();
+		}
+
+		final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable);
+
+		// get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+		final RecoverableWriter newWriter = getNewFileSystemWriter();
+		final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> deserializer = newWriter.getCommitRecoverableSerializer();
+		final RecoverableWriter.CommitRecoverable recoveredRecoverable = deserializer.deserialize(deserializer.getVersion(), serializedRecoverable);
+
+		final RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable);
+		committer.commitAfterRecovery();
+
+		Map<Path, String> files = getFileContentByPath(testDir);
+		Assert.assertEquals(1L, files.size());
+
+		for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+			Assert.assertEquals("part-0", fileContents.getKey().getName());
+			Assert.assertEquals(testData1 + testData2, fileContents.getValue());
+		}
+	}
+
+	// TESTS FOR EXCEPTIONS
+
+	@Test(expected = IOException.class)
+	public void testExceptionWritingAfterCloseForCommit() throws Exception {
+		final Path testDir = getBasePathForTest();
+
+		final RecoverableWriter writer = getNewFileSystemWriter();
+		final Path path = new Path(testDir, "part-0");
+
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			stream.closeForCommit().getRecoverable();
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+			fail();
+		}
+	}
+
+	@Test(expected = IOException.class)
+	public void testResumeAfterCommit() throws Exception {
+		final Path testDir = getBasePathForTest();
+
+		final RecoverableWriter writer = getNewFileSystemWriter();
+		final Path path = new Path(testDir, "part-0");
+
+		RecoverableWriter.ResumeRecoverable recoverable;
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			recoverable = stream.persist();
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+			stream.closeForCommit().commit();
+		}
+
+		// this should throw an exception as the file is already committed
+		writer.recover(recoverable);
+		fail();
+	}
+
+	@Test
+	public void testResumeWithWrongOffset() throws Exception {
+		// this is a rather unrealistic scenario, but it is to trigger
+		// truncation of the file and try to resume with missing data.
+
+		final Path testDir = getBasePathForTest();
+
+		final RecoverableWriter writer = getNewFileSystemWriter();
+		final Path path = new Path(testDir, "part-0");
+
+		final RecoverableWriter.ResumeRecoverable recoverable1;
+		final RecoverableWriter.ResumeRecoverable recoverable2;
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			recoverable1 = stream.persist();
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+			recoverable2 = stream.persist();
+			stream.write(testData3.getBytes(StandardCharsets.UTF_8));
+		}
+
+		try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {
+			// this should work fine
+		} catch (Exception e) {
+			fail();
+		}
+
+		// this should throw an exception
+		try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable2)) {
+			fail();
+		} catch (IOException e) {
+			// we expect this
+			return;
+		}
+		fail();
+	}
+
+	private Map<Path, String> getFileContentByPath(Path directory) throws IOException {
+		Map<Path, String> contents = new HashMap<>();
+
+		final FileStatus[] filesInBucket = getFileSystem().listStatus(directory);
+		for (FileStatus file : filesInBucket) {
+			final long fileLength = file.getLen();
+			byte[] serContents = new byte[(int) fileLength];
+
+			FSDataInputStream stream = getFileSystem().open(file.getPath());
+			stream.read(serContents);
+
+			contents.put(file.getPath(), new String(serContents, StandardCharsets.UTF_8));
+		}
+		return contents;
+	}
+
+	private static String randomName() {
+		return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
new file mode 100644
index 0000000..d347609
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for the {@link LocalRecoverableWriter}.
+ */
+public class LocalFileSystemResumableWriterTest extends AbstractResumableWriterTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Override
+	public Path getBasePath() throws Exception {
+		return new Path(TEMP_FOLDER.newFolder().toURI());
+	}
+
+	@Override
+	public FileSystem initializeFileSystem() {
+		return FileSystem.getLocalFileSystem();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index f944dc5..c688b32 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
 import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.time.StopWatch;
@@ -38,13 +40,18 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.time.Duration;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for Hadoop's
+ * file system abstraction.
+ */
 @Internal
 class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 
-	private static final long LEASE_TIMEOUT = 100000L;
+	private static final long LEASE_TIMEOUT = 100_000L;
 
 	private static Method truncateHandle;
 
@@ -79,16 +86,23 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 		this.targetFile = checkNotNull(recoverable.targetFile());
 		this.tempFile = checkNotNull(recoverable.tempFile());
 
-		// the getFileStatus will throw a FileNotFound exception if the file is not there.
-		final FileStatus tmpFileStatus = fs.getFileStatus(tempFile);
-		if (tmpFileStatus.getLen() < recoverable.offset()) {
-			throw new IOException("Missing data in tmp file: " + tempFile.getName());
+		// truncate back and append
+		try {
+			truncate(fs, tempFile, recoverable.offset());
+		} catch (Exception e) {
+			throw new IOException("Missing data in tmp file: " + tempFile, e);
 		}
 
-		// truncate back and append
-		truncate(fs, tempFile, recoverable.offset());
 		waitUntilLeaseIsRevoked(tempFile);
 		out = fs.append(tempFile);
+
+		// sanity check
+		long pos = out.getPos();
+		if (pos != recoverable.offset()) {
+			IOUtils.closeQuietly(out);
+			throw new IOException("Truncate failed: " + tempFile +
+					" (requested=" + recoverable.offset() + " ,size=" + pos + ')');
+		}
 	}
 
 	@Override
@@ -108,6 +122,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 
 	@Override
 	public void sync() throws IOException {
+		out.hflush();
 		out.hsync();
 	}
 
@@ -243,9 +258,22 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 
 			if (srcStatus != null) {
 				if (srcStatus.getLen() > expectedLength) {
-					// can happen if we co from persist to recovering for commit directly
+					// can happen if we go from persist to recovering for commit directly
 					// truncate the trailing junk away
-					truncate(fs, src, expectedLength);
+					try {
+						truncate(fs, src, expectedLength);
+					} catch (Exception e) {
+						// this can happen if the file is smaller than  expected
+						throw new IOException("Problem while truncating file: " + src, e);
+					}
+				}
+
+				// rename to final location (if it exists, overwrite it)
+				try {
+					fs.rename(src, dest);
+				}
+				catch (IOException e) {
+					throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
 				}
 			}
 			else if (!fs.exists(dest)) {
@@ -281,23 +309,21 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 
 		final DistributedFileSystem dfs = (DistributedFileSystem) fs;
 		dfs.recoverLease(path);
-		boolean isclosed = dfs.isFileClosed(path);
+
+		final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
 
 		final StopWatch sw = new StopWatch();
 		sw.start();
 
-		while (!isclosed) {
-			if (sw.getTime() > LEASE_TIMEOUT) {
-				break;
-			}
-
+		boolean isClosed = dfs.isFileClosed(path);
+		while (!isClosed && deadline.hasTimeLeft()) {
 			try {
 				Thread.sleep(500L);
 			} catch (InterruptedException e1) {
-				// ignore it
+				throw new IOException("Recovering the lease failed: ", e1);
 			}
-			isclosed = dfs.isFileClosed(path);
+			isClosed = dfs.isFileClosed(path);
 		}
-		return isclosed;
+		return isClosed;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 09a9e54..d0b3a7a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.ConfigConstants;
-
 import org.apache.flink.util.FlinkRuntimeException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.Text;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
new file mode 100644
index 0000000..5827a55
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * Tests for the {@link HadoopRecoverableWriter}.
+ */
+public class HadoopResumableWriterTest extends AbstractResumableWriterTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+
+	/** The cached file system instance. */
+	private static FileSystem fileSystem;
+
+	private static Path basePath;
+
+	@BeforeClass
+	public static void testHadoopVersion() {
+		Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
+	}
+
+	@BeforeClass
+	public static void verifyOS() {
+		Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
+	}
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File baseDir = TEMP_FOLDER.newFolder();
+
+		final Configuration hdConf = new Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+
+		final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		hdfsCluster = builder.build();
+
+		final org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
+
+		fileSystem = new HadoopFileSystem(hdfs);
+		basePath = new Path(hdfs.getUri() + "/tests");
+	}
+
+	@AfterClass
+	public static void destroyHDFS() throws Exception {
+		if (hdfsCluster != null) {
+			hdfsCluster.getFileSystem().delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true);
+			hdfsCluster.shutdown();
+		}
+	}
+
+	@Override
+	public Path getBasePath() {
+		return basePath;
+	}
+
+	@Override
+	public FileSystem initializeFileSystem() {
+		return fileSystem;
+	}
+}