You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/15 22:23:47 UTC

[5/6] flink git commit: [FLINK-9751] [filesystem] Add fixes and tests for Persistent Resumable Writers

[FLINK-9751] [filesystem] Add fixes and tests for Persistent Resumable Writers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d238e1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d238e1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d238e1a

Branch: refs/heads/master
Commit: 9d238e1a170a96c311b0dafa92f572c2d97bbcad
Parents: e296094
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 15 23:20:37 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:20:37 2018 +0200

----------------------------------------------------------------------
 .../LocalRecoverableFsDataOutputStream.java     |   6 +-
 .../core/io/SimpleVersionedSerializer.java      |   3 +
 .../core/fs/AbstractResumableWriterTest.java    | 380 +++++++++++++++++++
 .../LocalFileSystemResumableWriterTest.java     |  45 +++
 .../HadoopRecoverableFsDataOutputStream.java    |  62 ++-
 .../apache/flink/runtime/util/HadoopUtils.java  |   2 +-
 .../fs/hdfs/HadoopResumableWriterTest.java      |  95 +++++
 7 files changed, 572 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index 6c6a554..fd8e8fe 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -57,11 +57,11 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 		this.tempFile = checkNotNull(resumable.tempFile());
 
 		if (!tempFile.exists()) {
-			throw new FileNotFoundException("File Not Found: " + tempFile.getName());
+			throw new FileNotFoundException("File Not Found: " + tempFile);
 		}
 
 		if (tempFile.length() < resumable.offset()) {
-			throw new IOException("Missing data in tmp file: " + tempFile.getName());
+			throw new IOException("Missing data in tmp file: " + tempFile);
 		}
 
 		this.fos = new FileOutputStream(this.tempFile, true);
@@ -165,6 +165,8 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 					try (FileOutputStream fos = new FileOutputStream(src, true)) {
 						fos.getChannel().truncate(expectedLength);
 					}
+				} else if (src.length() < expectedLength) {
+					throw new IOException("Missing data in tmp file: " + src);
 				}
 
 				// source still exists, so no renaming happened yet. do it!

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
index 6c061a5..4dfeea2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.core.io;
 
+import org.apache.flink.annotation.Internal;
+
 import java.io.IOException;
 
 /**
@@ -44,6 +46,7 @@ import java.io.IOException;
  * 
  * @param <E> The data type serialized / deserialized by this serializer.
  */
+@Internal
 public interface SimpleVersionedSerializer<E> extends Versioned {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
new file mode 100644
index 0000000..8077305
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A base test-suite for the {@link RecoverableWriter}.
+ * This should be subclassed to test each filesystem specific writer.
+ */
+public abstract class AbstractResumableWriterTest extends TestLogger {
+
+	private static final Random RND = new Random();
+
+	private static final String testData1 = "THIS IS A TEST 1.";
+	private static final String testData2 = "THIS IS A TEST 2.";
+	private static final String testData3 = "THIS IS A TEST 3.";
+
+	private Path basePathForTest;
+
+	private static FileSystem fileSystem;
+
+	public abstract Path getBasePath() throws Exception;
+
+	public abstract FileSystem initializeFileSystem();
+
+	public Path getBasePathForTest() {
+		return basePathForTest;
+	}
+
+	private FileSystem getFileSystem() {
+		if (fileSystem == null) {
+			fileSystem = initializeFileSystem();
+		}
+		return fileSystem;
+	}
+
+	private RecoverableWriter getNewFileSystemWriter() throws IOException {
+		return getFileSystem().createRecoverableWriter();
+	}
+
+	@Before
+	public void prepare() throws Exception {
+		basePathForTest = new Path(getBasePath(), randomName());
+		getFileSystem().mkdirs(basePathForTest);
+	}
+
+	@After
+	public void cleanup() throws Exception {
+		getFileSystem().delete(basePathForTest, true);
+	}
+
+	@Test
+	public void testCloseWithNoData() throws Exception {
+		final RecoverableWriter writer = getNewFileSystemWriter();
+
+		final Path testDir = getBasePathForTest();
+
+		final Path path = new Path(testDir, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+			Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+			Assert.assertTrue(fileContents.getValue().isEmpty());
+		}
+
+		stream.closeForCommit().commit();
+
+		for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+			Assert.assertEquals("part-0", fileContents.getKey().getName());
+			Assert.assertTrue(fileContents.getValue().isEmpty());
+		}
+	}
+
+	@Test
+	public void testCommitAfterNormalClose() throws Exception {
+		final RecoverableWriter writer = getNewFileSystemWriter();
+
+		final Path testDir = getBasePathForTest();
+
+		final Path path = new Path(testDir, "part-0");
+
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+			stream.closeForCommit().commit();
+
+			for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+				Assert.assertEquals("part-0", fileContents.getKey().getName());
+				Assert.assertEquals(testData1, fileContents.getValue());
+			}
+		}
+	}
+
+	@Test
+	public void testCommitAfterPersist() throws Exception {
+		final RecoverableWriter writer = getNewFileSystemWriter();
+
+		final Path testDir = getBasePathForTest();
+
+		final Path path = new Path(testDir, "part-0");
+
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+			stream.persist();
+
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+			stream.closeForCommit().commit();
+
+			for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+				Assert.assertEquals("part-0", fileContents.getKey().getName());
+				Assert.assertEquals(testData1 + testData2, fileContents.getValue());
+			}
+		}
+	}
+
+	// TESTS FOR RECOVERY
+
+	private static final String INIT_EMPTY_PERSIST = "EMPTY";
+	private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
+	private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
+	private static final String FINAL_WITH_EXTRA_STATE = "FINAL";
+
+	@Test
+	public void testRecoverWithEmptyState() throws Exception {
+		testResumeAfterMultiplePersist(
+				INIT_EMPTY_PERSIST,
+				"",
+				testData3);
+	}
+
+	@Test
+	public void testRecoverWithState() throws Exception {
+		testResumeAfterMultiplePersist(
+				INTERM_WITH_STATE_PERSIST,
+				testData1,
+				testData1 + testData3);
+	}
+
+	@Test
+	public void testRecoverFromIntermWithoutAdditionalState() throws Exception {
+		testResumeAfterMultiplePersist(
+				INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
+				testData1,
+				testData1 + testData3);
+	}
+
+	@Test
+	public void testRecoverAfterMultiplePersistsState() throws Exception {
+		testResumeAfterMultiplePersist(
+				FINAL_WITH_EXTRA_STATE,
+				testData1 + testData2,
+				testData1 + testData2 + testData3);
+	}
+
+	private void testResumeAfterMultiplePersist(
+			final String persistName,
+			final String expectedPostRecoveryContents,
+			final String expectedFinalContents) throws Exception {
+
+		final Path testDir = getBasePathForTest();
+		final Path path = new Path(testDir, "part-0");
+
+		final RecoverableWriter initWriter = getNewFileSystemWriter();
+
+		final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4);
+		try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+			recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
+
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
+			recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());
+
+			// and write some more data
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+			recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
+		}
+
+		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer();
+		final byte[] serializedRecoverable = serializer.serialize(recoverables.get(persistName));
+
+		// get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+		final RecoverableWriter newWriter = getNewFileSystemWriter();
+		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> deserializer = newWriter.getResumeRecoverableSerializer();
+		final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
+				deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
+
+		try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) {
+
+			// we expect the data to be truncated
+			Map<Path, String> files = getFileContentByPath(testDir);
+			Assert.assertEquals(1L, files.size());
+
+			for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+				Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+				Assert.assertEquals(expectedPostRecoveryContents, fileContents.getValue());
+			}
+
+			recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8));
+			recoveredStream.closeForCommit().commit();
+
+			files = getFileContentByPath(testDir);
+			Assert.assertEquals(1L, files.size());
+
+			for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+				Assert.assertEquals("part-0", fileContents.getKey().getName());
+				Assert.assertEquals(expectedFinalContents, fileContents.getValue());
+			}
+		}
+	}
+
+	@Test
+	public void testCommitAfterRecovery() throws Exception {
+		final Path testDir = getBasePathForTest();
+		final Path path = new Path(testDir, "part-0");
+
+		final RecoverableWriter initWriter = getNewFileSystemWriter();
+
+		final RecoverableWriter.CommitRecoverable recoverable;
+		try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			stream.persist();
+			stream.persist();
+
+			// and write some more data
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+			recoverable = stream.closeForCommit().getRecoverable();
+		}
+
+		final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable);
+
+		// get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+		final RecoverableWriter newWriter = getNewFileSystemWriter();
+		final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> deserializer = newWriter.getCommitRecoverableSerializer();
+		final RecoverableWriter.CommitRecoverable recoveredRecoverable = deserializer.deserialize(deserializer.getVersion(), serializedRecoverable);
+
+		final RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable);
+		committer.commitAfterRecovery();
+
+		Map<Path, String> files = getFileContentByPath(testDir);
+		Assert.assertEquals(1L, files.size());
+
+		for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+			Assert.assertEquals("part-0", fileContents.getKey().getName());
+			Assert.assertEquals(testData1 + testData2, fileContents.getValue());
+		}
+	}
+
+	// TESTS FOR EXCEPTIONS
+
+	@Test(expected = IOException.class)
+	public void testExceptionWritingAfterCloseForCommit() throws Exception {
+		final Path testDir = getBasePathForTest();
+
+		final RecoverableWriter writer = getNewFileSystemWriter();
+		final Path path = new Path(testDir, "part-0");
+
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			stream.closeForCommit().getRecoverable();
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+			fail();
+		}
+	}
+
+	@Test(expected = IOException.class)
+	public void testResumeAfterCommit() throws Exception {
+		final Path testDir = getBasePathForTest();
+
+		final RecoverableWriter writer = getNewFileSystemWriter();
+		final Path path = new Path(testDir, "part-0");
+
+		RecoverableWriter.ResumeRecoverable recoverable;
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			recoverable = stream.persist();
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+			stream.closeForCommit().commit();
+		}
+
+		// this should throw an exception as the file is already committed
+		writer.recover(recoverable);
+		fail();
+	}
+
+	@Test
+	public void testResumeWithWrongOffset() throws Exception {
+		// this is a rather unrealistic scenario, but it is to trigger
+		// truncation of the file and try to resume with missing data.
+
+		final Path testDir = getBasePathForTest();
+
+		final RecoverableWriter writer = getNewFileSystemWriter();
+		final Path path = new Path(testDir, "part-0");
+
+		final RecoverableWriter.ResumeRecoverable recoverable1;
+		final RecoverableWriter.ResumeRecoverable recoverable2;
+		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+			recoverable1 = stream.persist();
+			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+			recoverable2 = stream.persist();
+			stream.write(testData3.getBytes(StandardCharsets.UTF_8));
+		}
+
+		try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {
+			// this should work fine
+		} catch (Exception e) {
+			fail();
+		}
+
+		// this should throw an exception
+		try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable2)) {
+			fail();
+		} catch (IOException e) {
+			// we expect this
+			return;
+		}
+		fail();
+	}
+
+	private Map<Path, String> getFileContentByPath(Path directory) throws IOException {
+		Map<Path, String> contents = new HashMap<>();
+
+		final FileStatus[] filesInBucket = getFileSystem().listStatus(directory);
+		for (FileStatus file : filesInBucket) {
+			final long fileLength = file.getLen();
+			byte[] serContents = new byte[(int) fileLength];
+
+			FSDataInputStream stream = getFileSystem().open(file.getPath());
+			stream.read(serContents);
+
+			contents.put(file.getPath(), new String(serContents, StandardCharsets.UTF_8));
+		}
+		return contents;
+	}
+
+	private static String randomName() {
+		return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
new file mode 100644
index 0000000..d347609
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for the {@link LocalRecoverableWriter}.
+ */
+public class LocalFileSystemResumableWriterTest extends AbstractResumableWriterTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Override
+	public Path getBasePath() throws Exception {
+		return new Path(TEMP_FOLDER.newFolder().toURI());
+	}
+
+	@Override
+	public FileSystem initializeFileSystem() {
+		return FileSystem.getLocalFileSystem();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index f944dc5..c688b32 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
 import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.time.StopWatch;
@@ -38,13 +40,18 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.time.Duration;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for Hadoop's
+ * file system abstraction.
+ */
 @Internal
 class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 
-	private static final long LEASE_TIMEOUT = 100000L;
+	private static final long LEASE_TIMEOUT = 100_000L;
 
 	private static Method truncateHandle;
 
@@ -79,16 +86,23 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 		this.targetFile = checkNotNull(recoverable.targetFile());
 		this.tempFile = checkNotNull(recoverable.tempFile());
 
-		// the getFileStatus will throw a FileNotFound exception if the file is not there.
-		final FileStatus tmpFileStatus = fs.getFileStatus(tempFile);
-		if (tmpFileStatus.getLen() < recoverable.offset()) {
-			throw new IOException("Missing data in tmp file: " + tempFile.getName());
+		// truncate back and append
+		try {
+			truncate(fs, tempFile, recoverable.offset());
+		} catch (Exception e) {
+			throw new IOException("Missing data in tmp file: " + tempFile, e);
 		}
 
-		// truncate back and append
-		truncate(fs, tempFile, recoverable.offset());
 		waitUntilLeaseIsRevoked(tempFile);
 		out = fs.append(tempFile);
+
+		// sanity check
+		long pos = out.getPos();
+		if (pos != recoverable.offset()) {
+			IOUtils.closeQuietly(out);
+			throw new IOException("Truncate failed: " + tempFile +
+					" (requested=" + recoverable.offset() + " ,size=" + pos + ')');
+		}
 	}
 
 	@Override
@@ -108,6 +122,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 
 	@Override
 	public void sync() throws IOException {
+		out.hflush();
 		out.hsync();
 	}
 
@@ -243,9 +258,22 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 
 			if (srcStatus != null) {
 				if (srcStatus.getLen() > expectedLength) {
-					// can happen if we co from persist to recovering for commit directly
+					// can happen if we go from persist to recovering for commit directly
 					// truncate the trailing junk away
-					truncate(fs, src, expectedLength);
+					try {
+						truncate(fs, src, expectedLength);
+					} catch (Exception e) {
+						// this can happen if the file is smaller than  expected
+						throw new IOException("Problem while truncating file: " + src, e);
+					}
+				}
+
+				// rename to final location (if it exists, overwrite it)
+				try {
+					fs.rename(src, dest);
+				}
+				catch (IOException e) {
+					throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
 				}
 			}
 			else if (!fs.exists(dest)) {
@@ -281,23 +309,21 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 
 		final DistributedFileSystem dfs = (DistributedFileSystem) fs;
 		dfs.recoverLease(path);
-		boolean isclosed = dfs.isFileClosed(path);
+
+		final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
 
 		final StopWatch sw = new StopWatch();
 		sw.start();
 
-		while (!isclosed) {
-			if (sw.getTime() > LEASE_TIMEOUT) {
-				break;
-			}
-
+		boolean isClosed = dfs.isFileClosed(path);
+		while (!isClosed && deadline.hasTimeLeft()) {
 			try {
 				Thread.sleep(500L);
 			} catch (InterruptedException e1) {
-				// ignore it
+				throw new IOException("Recovering the lease failed: ", e1);
 			}
-			isclosed = dfs.isFileClosed(path);
+			isClosed = dfs.isFileClosed(path);
 		}
-		return isclosed;
+		return isClosed;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 09a9e54..d0b3a7a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.ConfigConstants;
-
 import org.apache.flink.util.FlinkRuntimeException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.Text;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
new file mode 100644
index 0000000..5827a55
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * Tests for the {@link HadoopRecoverableWriter}.
+ */
+public class HadoopResumableWriterTest extends AbstractResumableWriterTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+
+	/** The cached file system instance. */
+	private static FileSystem fileSystem;
+
+	private static Path basePath;
+
+	@BeforeClass
+	public static void testHadoopVersion() {
+		Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
+	}
+
+	@BeforeClass
+	public static void verifyOS() {
+		Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
+	}
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File baseDir = TEMP_FOLDER.newFolder();
+
+		final Configuration hdConf = new Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+
+		final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		hdfsCluster = builder.build();
+
+		final org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
+
+		fileSystem = new HadoopFileSystem(hdfs);
+		basePath = new Path(hdfs.getUri() + "/tests");
+	}
+
+	@AfterClass
+	public static void destroyHDFS() throws Exception {
+		if (hdfsCluster != null) {
+			hdfsCluster.getFileSystem().delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true);
+			hdfsCluster.shutdown();
+		}
+	}
+
+	@Override
+	public Path getBasePath() {
+		return basePath;
+	}
+
+	@Override
+	public FileSystem initializeFileSystem() {
+		return fileSystem;
+	}
+}