You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/15 22:23:47 UTC
[5/6] flink git commit: [FLINK-9751] [filesystem] Add fixes and tests
for Persistent Resumable Writers
[FLINK-9751] [filesystem] Add fixes and tests for Persistent Resumable Writers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d238e1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d238e1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d238e1a
Branch: refs/heads/master
Commit: 9d238e1a170a96c311b0dafa92f572c2d97bbcad
Parents: e296094
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 15 23:20:37 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:20:37 2018 +0200
----------------------------------------------------------------------
.../LocalRecoverableFsDataOutputStream.java | 6 +-
.../core/io/SimpleVersionedSerializer.java | 3 +
.../core/fs/AbstractResumableWriterTest.java | 380 +++++++++++++++++++
.../LocalFileSystemResumableWriterTest.java | 45 +++
.../HadoopRecoverableFsDataOutputStream.java | 62 ++-
.../apache/flink/runtime/util/HadoopUtils.java | 2 +-
.../fs/hdfs/HadoopResumableWriterTest.java | 95 +++++
7 files changed, 572 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index 6c6a554..fd8e8fe 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -57,11 +57,11 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
this.tempFile = checkNotNull(resumable.tempFile());
if (!tempFile.exists()) {
- throw new FileNotFoundException("File Not Found: " + tempFile.getName());
+ throw new FileNotFoundException("File Not Found: " + tempFile);
}
if (tempFile.length() < resumable.offset()) {
- throw new IOException("Missing data in tmp file: " + tempFile.getName());
+ throw new IOException("Missing data in tmp file: " + tempFile);
}
this.fos = new FileOutputStream(this.tempFile, true);
@@ -165,6 +165,8 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
try (FileOutputStream fos = new FileOutputStream(src, true)) {
fos.getChannel().truncate(expectedLength);
}
+ } else if (src.length() < expectedLength) {
+ throw new IOException("Missing data in tmp file: " + src);
}
// source still exists, so no renaming happened yet. do it!
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
index 6c061a5..4dfeea2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.core.io;
+import org.apache.flink.annotation.Internal;
+
import java.io.IOException;
/**
@@ -44,6 +46,7 @@ import java.io.IOException;
*
* @param <E> The data type serialized / deserialized by this serializer.
*/
+@Internal
public interface SimpleVersionedSerializer<E> extends Versioned {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
new file mode 100644
index 0000000..8077305
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A base test-suite for the {@link RecoverableWriter}.
+ * This should be subclassed to test each filesystem specific writer.
+ */
+public abstract class AbstractResumableWriterTest extends TestLogger {
+
+ private static final Random RND = new Random();
+
+ private static final String testData1 = "THIS IS A TEST 1.";
+ private static final String testData2 = "THIS IS A TEST 2.";
+ private static final String testData3 = "THIS IS A TEST 3.";
+
+ private Path basePathForTest;
+
+ private static FileSystem fileSystem;
+
+ public abstract Path getBasePath() throws Exception;
+
+ public abstract FileSystem initializeFileSystem();
+
+ public Path getBasePathForTest() {
+ return basePathForTest;
+ }
+
+ private FileSystem getFileSystem() {
+ if (fileSystem == null) {
+ fileSystem = initializeFileSystem();
+ }
+ return fileSystem;
+ }
+
+ private RecoverableWriter getNewFileSystemWriter() throws IOException {
+ return getFileSystem().createRecoverableWriter();
+ }
+
+ @Before
+ public void prepare() throws Exception {
+ basePathForTest = new Path(getBasePath(), randomName());
+ getFileSystem().mkdirs(basePathForTest);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ getFileSystem().delete(basePathForTest, true);
+ }
+
+ @Test
+ public void testCloseWithNoData() throws Exception {
+ final RecoverableWriter writer = getNewFileSystemWriter();
+
+ final Path testDir = getBasePathForTest();
+
+ final Path path = new Path(testDir, "part-0");
+
+ final RecoverableFsDataOutputStream stream = writer.open(path);
+ for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+ Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+ Assert.assertTrue(fileContents.getValue().isEmpty());
+ }
+
+ stream.closeForCommit().commit();
+
+ for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertTrue(fileContents.getValue().isEmpty());
+ }
+ }
+
+ @Test
+ public void testCommitAfterNormalClose() throws Exception {
+ final RecoverableWriter writer = getNewFileSystemWriter();
+
+ final Path testDir = getBasePathForTest();
+
+ final Path path = new Path(testDir, "part-0");
+
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+ stream.closeForCommit().commit();
+
+ for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertEquals(testData1, fileContents.getValue());
+ }
+ }
+ }
+
+ @Test
+ public void testCommitAfterPersist() throws Exception {
+ final RecoverableWriter writer = getNewFileSystemWriter();
+
+ final Path testDir = getBasePathForTest();
+
+ final Path path = new Path(testDir, "part-0");
+
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+ stream.persist();
+
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+ stream.closeForCommit().commit();
+
+ for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertEquals(testData1 + testData2, fileContents.getValue());
+ }
+ }
+ }
+
+ // TESTS FOR RECOVERY
+
+ private static final String INIT_EMPTY_PERSIST = "EMPTY";
+ private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
+ private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
+ private static final String FINAL_WITH_EXTRA_STATE = "FINAL";
+
+ @Test
+ public void testRecoverWithEmptyState() throws Exception {
+ testResumeAfterMultiplePersist(
+ INIT_EMPTY_PERSIST,
+ "",
+ testData3);
+ }
+
+ @Test
+ public void testRecoverWithState() throws Exception {
+ testResumeAfterMultiplePersist(
+ INTERM_WITH_STATE_PERSIST,
+ testData1,
+ testData1 + testData3);
+ }
+
+ @Test
+ public void testRecoverFromIntermWithoutAdditionalState() throws Exception {
+ testResumeAfterMultiplePersist(
+ INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
+ testData1,
+ testData1 + testData3);
+ }
+
+ @Test
+ public void testRecoverAfterMultiplePersistsState() throws Exception {
+ testResumeAfterMultiplePersist(
+ FINAL_WITH_EXTRA_STATE,
+ testData1 + testData2,
+ testData1 + testData2 + testData3);
+ }
+
+ private void testResumeAfterMultiplePersist(
+ final String persistName,
+ final String expectedPostRecoveryContents,
+ final String expectedFinalContents) throws Exception {
+
+ final Path testDir = getBasePathForTest();
+ final Path path = new Path(testDir, "part-0");
+
+ final RecoverableWriter initWriter = getNewFileSystemWriter();
+
+ final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4);
+ try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+ recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
+
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
+ recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());
+
+ // and write some more data
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+ recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
+ }
+
+ final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer();
+ final byte[] serializedRecoverable = serializer.serialize(recoverables.get(persistName));
+
+ // get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+ final RecoverableWriter newWriter = getNewFileSystemWriter();
+ final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> deserializer = newWriter.getResumeRecoverableSerializer();
+ final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
+ deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
+
+ try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) {
+
+ // we expect the data to be truncated
+ Map<Path, String> files = getFileContentByPath(testDir);
+ Assert.assertEquals(1L, files.size());
+
+ for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+ Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+ Assert.assertEquals(expectedPostRecoveryContents, fileContents.getValue());
+ }
+
+ recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8));
+ recoveredStream.closeForCommit().commit();
+
+ files = getFileContentByPath(testDir);
+ Assert.assertEquals(1L, files.size());
+
+ for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertEquals(expectedFinalContents, fileContents.getValue());
+ }
+ }
+ }
+
+ @Test
+ public void testCommitAfterRecovery() throws Exception {
+ final Path testDir = getBasePathForTest();
+ final Path path = new Path(testDir, "part-0");
+
+ final RecoverableWriter initWriter = getNewFileSystemWriter();
+
+ final RecoverableWriter.CommitRecoverable recoverable;
+ try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ stream.persist();
+ stream.persist();
+
+ // and write some more data
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+ recoverable = stream.closeForCommit().getRecoverable();
+ }
+
+ final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable);
+
+ // get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+ final RecoverableWriter newWriter = getNewFileSystemWriter();
+ final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> deserializer = newWriter.getCommitRecoverableSerializer();
+ final RecoverableWriter.CommitRecoverable recoveredRecoverable = deserializer.deserialize(deserializer.getVersion(), serializedRecoverable);
+
+ final RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable);
+ committer.commitAfterRecovery();
+
+ Map<Path, String> files = getFileContentByPath(testDir);
+ Assert.assertEquals(1L, files.size());
+
+ for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertEquals(testData1 + testData2, fileContents.getValue());
+ }
+ }
+
+ // TESTS FOR EXCEPTIONS
+
+ @Test(expected = IOException.class)
+ public void testExceptionWritingAfterCloseForCommit() throws Exception {
+ final Path testDir = getBasePathForTest();
+
+ final RecoverableWriter writer = getNewFileSystemWriter();
+ final Path path = new Path(testDir, "part-0");
+
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ stream.closeForCommit().getRecoverable();
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+ fail();
+ }
+ }
+
+ @Test(expected = IOException.class)
+ public void testResumeAfterCommit() throws Exception {
+ final Path testDir = getBasePathForTest();
+
+ final RecoverableWriter writer = getNewFileSystemWriter();
+ final Path path = new Path(testDir, "part-0");
+
+ RecoverableWriter.ResumeRecoverable recoverable;
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ recoverable = stream.persist();
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+ stream.closeForCommit().commit();
+ }
+
+ // this should throw an exception as the file is already committed
+ writer.recover(recoverable);
+ fail();
+ }
+
+ @Test
+ public void testResumeWithWrongOffset() throws Exception {
+ // this is a rather unrealistic scenario, but it is to trigger
+ // truncation of the file and try to resume with missing data.
+
+ final Path testDir = getBasePathForTest();
+
+ final RecoverableWriter writer = getNewFileSystemWriter();
+ final Path path = new Path(testDir, "part-0");
+
+ final RecoverableWriter.ResumeRecoverable recoverable1;
+ final RecoverableWriter.ResumeRecoverable recoverable2;
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ recoverable1 = stream.persist();
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+ recoverable2 = stream.persist();
+ stream.write(testData3.getBytes(StandardCharsets.UTF_8));
+ }
+
+ try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {
+ // this should work fine
+ } catch (Exception e) {
+ fail();
+ }
+
+ // this should throw an exception
+ try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable2)) {
+ fail();
+ } catch (IOException e) {
+ // we expect this
+ return;
+ }
+ fail();
+ }
+
+ private Map<Path, String> getFileContentByPath(Path directory) throws IOException {
+ Map<Path, String> contents = new HashMap<>();
+
+ final FileStatus[] filesInBucket = getFileSystem().listStatus(directory);
+ for (FileStatus file : filesInBucket) {
+ final long fileLength = file.getLen();
+ byte[] serContents = new byte[(int) fileLength];
+
+ FSDataInputStream stream = getFileSystem().open(file.getPath());
+ stream.read(serContents);
+
+ contents.put(file.getPath(), new String(serContents, StandardCharsets.UTF_8));
+ }
+ return contents;
+ }
+
+ private static String randomName() {
+ return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
new file mode 100644
index 0000000..d347609
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for the {@link LocalRecoverableWriter}.
+ */
+public class LocalFileSystemResumableWriterTest extends AbstractResumableWriterTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Override
+ public Path getBasePath() throws Exception {
+ return new Path(TEMP_FOLDER.newFolder().toURI());
+ }
+
+ @Override
+ public FileSystem initializeFileSystem() {
+ return FileSystem.getLocalFileSystem();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index f944dc5..c688b32 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -19,11 +19,13 @@
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.time.StopWatch;
@@ -38,13 +40,18 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for Hadoop's
+ * file system abstraction.
+ */
@Internal
class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
- private static final long LEASE_TIMEOUT = 100000L;
+ private static final long LEASE_TIMEOUT = 100_000L;
private static Method truncateHandle;
@@ -79,16 +86,23 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
this.targetFile = checkNotNull(recoverable.targetFile());
this.tempFile = checkNotNull(recoverable.tempFile());
- // the getFileStatus will throw a FileNotFound exception if the file is not there.
- final FileStatus tmpFileStatus = fs.getFileStatus(tempFile);
- if (tmpFileStatus.getLen() < recoverable.offset()) {
- throw new IOException("Missing data in tmp file: " + tempFile.getName());
+ // truncate back and append
+ try {
+ truncate(fs, tempFile, recoverable.offset());
+ } catch (Exception e) {
+ throw new IOException("Missing data in tmp file: " + tempFile, e);
}
- // truncate back and append
- truncate(fs, tempFile, recoverable.offset());
waitUntilLeaseIsRevoked(tempFile);
out = fs.append(tempFile);
+
+ // sanity check
+ long pos = out.getPos();
+ if (pos != recoverable.offset()) {
+ IOUtils.closeQuietly(out);
+ throw new IOException("Truncate failed: " + tempFile +
+ " (requested=" + recoverable.offset() + " ,size=" + pos + ')');
+ }
}
@Override
@@ -108,6 +122,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
@Override
public void sync() throws IOException {
+ out.hflush();
out.hsync();
}
@@ -243,9 +258,22 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
if (srcStatus != null) {
if (srcStatus.getLen() > expectedLength) {
- // can happen if we co from persist to recovering for commit directly
+ // can happen if we go from persist to recovering for commit directly
// truncate the trailing junk away
- truncate(fs, src, expectedLength);
+ try {
+ truncate(fs, src, expectedLength);
+ } catch (Exception e) {
+ // this can happen if the file is smaller than expected
+ throw new IOException("Problem while truncating file: " + src, e);
+ }
+ }
+
+ // rename to final location (if it exists, overwrite it)
+ try {
+ fs.rename(src, dest);
+ }
+ catch (IOException e) {
+ throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
}
}
else if (!fs.exists(dest)) {
@@ -281,23 +309,21 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
final DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.recoverLease(path);
- boolean isclosed = dfs.isFileClosed(path);
+
+ final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
final StopWatch sw = new StopWatch();
sw.start();
- while (!isclosed) {
- if (sw.getTime() > LEASE_TIMEOUT) {
- break;
- }
-
+ boolean isClosed = dfs.isFileClosed(path);
+ while (!isClosed && deadline.hasTimeLeft()) {
try {
Thread.sleep(500L);
} catch (InterruptedException e1) {
- // ignore it
+ throw new IOException("Recovering the lease failed: ", e1);
}
- isclosed = dfs.isFileClosed(path);
+ isClosed = dfs.isFileClosed(path);
}
- return isclosed;
+ return isClosed;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 09a9e54..d0b3a7a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.util;
import org.apache.flink.configuration.ConfigConstants;
-
import org.apache.flink.util.FlinkRuntimeException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.Text;
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
new file mode 100644
index 0000000..5827a55
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * Tests for the {@link HadoopRecoverableWriter}.
+ */
+public class HadoopResumableWriterTest extends AbstractResumableWriterTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ private static MiniDFSCluster hdfsCluster;
+
+ /** The cached file system instance. */
+ private static FileSystem fileSystem;
+
+ private static Path basePath;
+
+ @BeforeClass
+ public static void testHadoopVersion() {
+ Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
+ }
+
+ @BeforeClass
+ public static void verifyOS() {
+ Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
+ }
+
+ @BeforeClass
+ public static void createHDFS() throws Exception {
+ final File baseDir = TEMP_FOLDER.newFolder();
+
+ final Configuration hdConf = new Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+
+ final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+
+ final org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
+
+ fileSystem = new HadoopFileSystem(hdfs);
+ basePath = new Path(hdfs.getUri() + "/tests");
+ }
+
+ @AfterClass
+ public static void destroyHDFS() throws Exception {
+ if (hdfsCluster != null) {
+ hdfsCluster.getFileSystem().delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true);
+ hdfsCluster.shutdown();
+ }
+ }
+
+ @Override
+ public Path getBasePath() {
+ return basePath;
+ }
+
+ @Override
+ public FileSystem initializeFileSystem() {
+ return fileSystem;
+ }
+}