You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/15 22:23:43 UTC
[1/6] flink git commit: [FLINK-9751] [filesystem] Add
PersistentResumableWriter interface.
Repository: flink
Updated Branches:
refs/heads/master 695bc56a9 -> 7a912e603
[FLINK-9751] [filesystem] Add PersistentResumableWriter interface.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2960949
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2960949
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2960949
Branch: refs/heads/master
Commit: e2960949bf84534978cb0be9f1a53cd783b43001
Parents: 66e0a27
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 13:53:51 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:18:56 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/core/fs/FileSystem.java | 18 ++
.../core/fs/RecoverableFsDataOutputStream.java | 90 ++++++
.../apache/flink/core/fs/RecoverableWriter.java | 173 +++++++++++
.../flink/core/fs/local/LocalFileSystem.java | 22 +-
.../flink/core/fs/local/LocalRecoverable.java | 74 +++++
.../LocalRecoverableFsDataOutputStream.java | 190 ++++++++++++
.../fs/local/LocalRecoverableSerializer.java | 99 ++++++
.../core/fs/local/LocalRecoverableWriter.java | 124 ++++++++
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 11 +-
.../runtime/fs/hdfs/HadoopFsRecoverable.java | 73 +++++
.../HadoopRecoverableFsDataOutputStream.java | 303 +++++++++++++++++++
.../fs/hdfs/HadoopRecoverableSerializer.java | 100 ++++++
.../fs/hdfs/HadoopRecoverableWriter.java | 134 ++++++++
.../apache/flink/runtime/util/HadoopUtils.java | 20 ++
14 files changed, 1425 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 27255f9..d451109 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -494,6 +494,24 @@ public abstract class FileSystem {
public abstract FSDataInputStream open(Path f) throws IOException;
/**
+ * Creates a new {@link RecoverableWriter}. A recoverable writer creates streams that can
+ * persist and recover their intermediate state.
+ * Persisting and recovering intermediate state is a core building block for writing to
+ * files that span multiple checkpoints.
+ *
+ * <p>The returned object can act as a shared factory to open and recover multiple streams.
+ *
+ * <p>This method is optional on file systems and various file system implementations may
+ * not support this method, throwing an {@code UnsupportedOperationException}.
+ *
+ * @return A RecoverableWriter for this file system.
+ * @throws IOException Thrown, if the recoverable writer cannot be instantiated.
+ */
+ public RecoverableWriter createRecoverableWriter() throws IOException {
+ throw new UnsupportedOperationException("This file system does not support recoverable writers.");
+ }
+
+ /**
* Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
*
* @return the number of bytes that large input files should be optimally be split into to minimize I/O time
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..8108670
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+
+import java.io.IOException;
+
+/**
+ * An output stream to a file system that can be recovered at well defined points.
+ * The stream initially writes to hidden files or temp files and only creates the
+ * target file once it is closed and "committed".
+ */
+public abstract class RecoverableFsDataOutputStream extends FSDataOutputStream {
+
+ /**
+ * Ensures all data so far is persistent (similar to {@link #sync()}) and returns
+ * a handle to recover the stream at the current position.
+ */
+ public abstract ResumeRecoverable persist() throws IOException;
+
+ /**
+ * Closes the stream, ensuring persistence of all data (similar to {@link #sync()}).
+ * This returns a Committer that can be used to publish (make visible) the file
+ * that the stream was writing to.
+ */
+ public abstract Committer closeForCommit() throws IOException;
+
+ /**
+ * Closes the stream releasing all local resources, but not finalizing the
+ * file that the stream writes to.
+ *
+ * <p>This method should be understood as "close to dispose on failure".
+ */
+ @Override
+ public abstract void close() throws IOException;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A committer can publish the file of a stream that was closed.
+ * The Committer can be recovered via a {@link CommitRecoverable}.
+ */
+ public interface Committer {
+
+ /**
+ * Commits the file, making it visible. The file will contain the exact data
+ * as when the committer was created.
+ *
+ * @throws IOException Thrown if committing fails.
+ */
+ void commit() throws IOException;
+
+ /**
+ * Commits the file, making it visible. The file will contain the exact data
+ * as when the committer was created.
+ *
+ * <p>This method tolerates situations where the file was already committed and
+ * will not raise an exception in that case. This is important for idempotent
+ * commit retries as they need to happen after recovery.
+ *
+ * @throws IOException Thrown if committing fails.
+ */
+ void commitAfterRecovery() throws IOException;
+
+ /**
+ * Gets a recoverable object to recover the committer. The recovered committer
+ * will commit the file with the exact same data as this committer would commit
+ * it.
+ */
+ CommitRecoverable getRecoverable();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
new file mode 100644
index 0000000..bbba793
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * The RecoverableWriter creates and recovers {@link RecoverableFsDataOutputStream}.
+ * It can be used to write data to a file system in a way that the writing can be
+ * resumed consistently after a failure and recovery without loss of data or possible
+ * duplication of bytes.
+ *
+ * <p>The streams do not make the files they write to immediately visible, but instead write
+ * to temp files or other temporary storage. To publish the data atomically in the
+ * end, the stream offers the {@link RecoverableFsDataOutputStream#closeForCommit()} method
+ * to create a committer that publishes the result.
+ *
+ * <p>These writers are useful in the context of checkpointing. The example below illustrates
+ * how to use them:
+ *
+ * <pre>{@code
+ * // --------- initial run --------
+ * RecoverableWriter writer = fileSystem.createRecoverableWriter();
+ * RecoverableFsDataOutputStream out = writer.open(path);
+ * out.write(...);
+ *
+ * // persist intermediate state
+ * ResumeRecoverable intermediateState = out.persist();
+ * storeInCheckpoint(intermediateState);
+ *
+ * // --------- recovery --------
+ * ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint
+ * RecoverableWriter writer = fileSystem.createRecoverableWriter();
+ * RecoverableFsDataOutputStream out = writer.recover(lastCheckpointState);
+ *
+ * out.write(...); // append more data
+ *
+ * out.closeForCommit().commit(); // close stream and publish all the data
+ *
+ * // --------- recovery without appending --------
+ * ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint
+ * RecoverableWriter writer = fileSystem.createRecoverableWriter();
+ * Committer committer = writer.recoverForCommit(lastCheckpointState);
+ * committer.commit(); // publish the state as of the last checkpoint
+ * }</pre>
+ *
+ * <h3>Recovery</h3>
+ *
+ * <p>Recovery relies on data persistence in the target file system or object store. While the
+ * code itself works with the specific primitives that the target storage offers, recovery will
+ * fail if the data written so far was deleted by an external factor.
+ * For example, some implementations stage data in temp files or object parts. If these
+ * were deleted by someone or by an automated cleanup policy, then resuming
+ * may fail. This is not surprising and should be expected, but we want to explicitly point
+ * this out here.
+ *
+ * <p>Specific care is needed for systems like S3, where the implementation uses Multipart Uploads
+ * to incrementally upload and persist parts of the result. Timeouts for Multipart Uploads
+ * and life time of Parts in unfinished Multipart Uploads need to be set in the bucket policy
+ * high enough to accommodate the recovery. These values are typically in the days, so regular
+ * recovery is typically not a problem. What can become an issue is situations where a Flink
+ * application is hard killed (all processes or containers removed) and then one tries to
+ * manually recover the application from an externalized checkpoint some days later. In that
+ * case, systems like S3 may have removed uncommitted parts and recovery will not succeed.
+ *
+ * <h3>Implementer's Note</h3>
+ *
+ * <p>From the perspective of the implementer, it would be desirable to make this class
+ * generic with respect to the concrete types of 'CommitRecoverable' and 'ResumeRecoverable'.
+ * However, we found that this makes the code more clumsy to use and we hence dropped the
+ * generics at the cost of doing some explicit casts in the implementation that would
+ * otherwise have been implicitly generated by the generics compiler.
+ */
+public interface RecoverableWriter {
+
+ /**
+ * Opens a new recoverable stream to write to the given path.
+ * Whether existing files will be overwritten is implementation specific and should
+ * not be relied upon.
+ *
+ * @param path The path of the file/object to write to.
+ * @return A new RecoverableFsDataOutputStream writing a new file/object.
+ *
+ * @throws IOException Thrown if the stream could not be opened/initialized.
+ */
+ RecoverableFsDataOutputStream open(Path path) throws IOException;
+
+ /**
+ * Resumes a recoverable stream consistently at the point indicated by the given ResumeRecoverable.
+ * Future writes to the stream will continue / append the file as of that point.
+ *
+ * <p>This method is optional and whether it is supported is indicated through the
+ * {@link #supportsResume()} method.
+ *
+ * @param resumable The opaque handle with the recovery information.
+ * @return A recoverable stream writing to the file/object as it was at the point when the
+ * ResumeRecoverable was created.
+ *
+ * @throws IOException Thrown, if resuming fails.
+ * @throws UnsupportedOperationException Thrown if this optional method is not supported.
+ */
+ RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException;
+
+ /**
+ * Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable
+ * for finalizing and committing. This will publish the target file with exactly the data
+ * that was written up to the point then the CommitRecoverable was created.
+ *
+ * @param resumable The opaque handle with the recovery information.
+ * @return A committer that publishes the target file.
+ *
+ * @throws IOException Thrown, if recovery fails.
+ */
+ RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) throws IOException;
+
+ /**
+ * The serializer for the CommitRecoverable types created in this writer.
+ * This serializer should be used to store the CommitRecoverable in checkpoint
+ * state or other forms of persistent state.
+ */
+ SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
+
+ /**
+ * The serializer for the ResumeRecoverable types created in this writer.
+ * This serializer should be used to store the ResumeRecoverable in checkpoint
+ * state or other forms of persistent state.
+ */
+ SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
+
+ /**
+ * Checks whether the writer and its streams support resuming (appending to) files after
+ * recovery (via the {@link #recover(ResumeRecoverable)} method).
+ *
+ * <p>If true, then this writer supports the {@link #recover(ResumeRecoverable)} method.
+ * If false, then that method may not be supported and streams can only be recovered via
+ * {@link #recoverForCommit(CommitRecoverable)}.
+ */
+ boolean supportsResume();
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A handle to an in-progress stream with a defined and persistent amount of data.
+ * The handle can be used to recover the stream as of exactly that point and
+ * publish the result file.
+ */
+ interface CommitRecoverable {}
+
+ /**
+ * A handle to an in-progress stream with a defined and persistent amount of data.
+ * The handle can be used to recover the stream exactly as of that point and either
+ * publish the result file or keep appending data to the stream.
+ */
+ interface ResumeRecoverable extends CommitRecoverable {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 9ebec6e..7b99e1b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -142,11 +142,9 @@ public class LocalFileSystem extends FileSystem {
return new LocalDataInputStream(file);
}
- private File pathToFile(Path path) {
- if (!path.isAbsolute()) {
- path = new Path(getWorkingDirectory(), path);
- }
- return new File(path.toUri().getPath());
+ @Override
+ public LocalRecoverableWriter createRecoverableWriter() throws IOException {
+ return new LocalRecoverableWriter(this);
}
@Override
@@ -309,6 +307,20 @@ public class LocalFileSystem extends FileSystem {
// ------------------------------------------------------------------------
/**
+ * Converts the given Path to a File for this file system.
+ *
+ * <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
+ */
+ public File pathToFile(Path path) {
+ if (!path.isAbsolute()) {
+ path = new Path(getWorkingDirectory(), path);
+ }
+ return new File(path.toUri().getPath());
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
* Gets the URI that represents the local file system.
* That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
* UNIX family platforms.
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverable.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverable.java
new file mode 100644
index 0000000..8454c3a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+
+import java.io.File;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the resume and commit descriptor objects for local recoverable streams.
+ */
+@Internal
+class LocalRecoverable implements CommitRecoverable, ResumeRecoverable {
+
+ /** The file path for the final result file. */
+ private final File targetFile;
+
+ /** The file path of the staging file. */
+ private final File tempFile;
+
+ /** The position to resume from. */
+ private final long offset;
+
+ /**
+ * Creates a resumable for the given file at the given position.
+ *
+ * @param targetFile The file to resume.
+ * @param offset The position to resume from.
+ */
+ LocalRecoverable(File targetFile, File tempFile, long offset) {
+ checkArgument(offset >= 0, "offset must be >= 0");
+ this.targetFile = checkNotNull(targetFile, "targetFile");
+ this.tempFile = checkNotNull(tempFile, "tempFile");
+ this.offset = offset;
+ }
+
+ public File targetFile() {
+ return targetFile;
+ }
+
+ public File tempFile() {
+ return tempFile;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ @Override
+ public String toString() {
+ return "LocalRecoverable " + tempFile + " @ " + offset + " -> " + targetFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..6c6a554
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RecoverableFsDataOutputStream} for the {@link LocalFileSystem}.
+ */
+@Internal
+class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+ private final File targetFile;
+
+ private final File tempFile;
+
+ private final FileOutputStream fos;
+
+ LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
+ this.targetFile = checkNotNull(targetFile);
+ this.tempFile = checkNotNull(tempFile);
+ this.fos = new FileOutputStream(tempFile);
+ }
+
+ LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
+ this.targetFile = checkNotNull(resumable.targetFile());
+ this.tempFile = checkNotNull(resumable.tempFile());
+
+ if (!tempFile.exists()) {
+ throw new FileNotFoundException("File Not Found: " + tempFile.getName());
+ }
+
+ if (tempFile.length() < resumable.offset()) {
+ throw new IOException("Missing data in tmp file: " + tempFile.getName());
+ }
+
+ this.fos = new FileOutputStream(this.tempFile, true);
+ this.fos.getChannel().truncate(resumable.offset());
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ fos.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ fos.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ fos.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ fos.getFD().sync();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return fos.getChannel().position();
+ }
+
+ @Override
+ public ResumeRecoverable persist() throws IOException {
+ // we call both flush and sync in order to ensure persistence on mounted
+ // file systems, like NFS, EBS, EFS, ...
+ flush();
+ sync();
+
+ return new LocalRecoverable(targetFile, tempFile, getPos());
+ }
+
+ @Override
+ public Committer closeForCommit() throws IOException {
+ final long pos = getPos();
+ close();
+ return new LocalCommitter(new LocalRecoverable(targetFile, tempFile, pos));
+ }
+
+ @Override
+ public void close() throws IOException {
+ fos.close();
+ }
+
+ // ------------------------------------------------------------------------
+
+ static class LocalCommitter implements Committer {
+
+ private final LocalRecoverable recoverable;
+
+ LocalCommitter(LocalRecoverable recoverable) {
+ this.recoverable = checkNotNull(recoverable);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ final File src = recoverable.tempFile();
+ final File dest = recoverable.targetFile();
+
+ // sanity check
+ if (src.length() != recoverable.offset()) {
+ // something was done to this file since the committer was created.
+ // this is not the "clean" case
+ throw new IOException("Cannot clean commit: File has trailing junk data.");
+ }
+
+ // rather than fall into default recovery, handle errors explicitly
+ // in order to improve error messages
+ try {
+ Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ }
+ catch (UnsupportedOperationException | AtomicMoveNotSupportedException e) {
+ if (!src.renameTo(dest)) {
+ throw new IOException("Committing file failed, could not rename " + src + " -> " + dest);
+ }
+ }
+ catch (FileAlreadyExistsException e) {
+ throw new IOException("Committing file failed. Target file already exists: " + dest);
+ }
+ }
+
+ @Override
+ public void commitAfterRecovery() throws IOException {
+ final File src = recoverable.tempFile();
+ final File dest = recoverable.targetFile();
+ final long expectedLength = recoverable.offset();
+
+ if (src.exists()) {
+ if (src.length() > expectedLength) {
+ // can happen if we co from persist to recovering for commit directly
+ // truncate the trailing junk away
+ try (FileOutputStream fos = new FileOutputStream(src, true)) {
+ fos.getChannel().truncate(expectedLength);
+ }
+ }
+
+ // source still exists, so no renaming happened yet. do it!
+ Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ }
+ else if (!dest.exists()) {
+ // neither exists - that can be a sign of
+ // - (1) a serious problem (file system loss of data)
+ // - (2) a recovery of a savepoint that is some time old and the users
+ // removed the files in the meantime.
+
+ // TODO how to handle this?
+ // We probably need an option for users whether this should log,
+ // or result in an exception or unrecoverable exception
+ }
+ }
+
+ @Override
+ public CommitRecoverable getRecoverable() {
+ return recoverable;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
new file mode 100644
index 0000000..685f7c1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple serializer for the {@link LocalRecoverable}.
+ */
+@Internal
+class LocalRecoverableSerializer implements SimpleVersionedSerializer<LocalRecoverable> {
+
+ static final LocalRecoverableSerializer INSTANCE = new LocalRecoverableSerializer();
+
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+ private static final int MAGIC_NUMBER = 0x1e744b57;
+
+ /**
+ * Do not instantiate, use reusable {@link #INSTANCE} instead.
+ */
+ private LocalRecoverableSerializer() {}
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(LocalRecoverable obj) throws IOException {
+ final byte[] targetFileBytes = obj.targetFile().getAbsolutePath().getBytes(CHARSET);
+ final byte[] tempFileBytes = obj.tempFile().getAbsolutePath().getBytes(CHARSET);
+ final byte[] targetBytes = new byte[20 + targetFileBytes.length + tempFileBytes.length];
+
+ ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+ bb.putInt(MAGIC_NUMBER);
+ bb.putLong(obj.offset());
+ bb.putInt(targetFileBytes.length);
+ bb.putInt(tempFileBytes.length);
+ bb.put(targetFileBytes);
+ bb.put(tempFileBytes);
+
+ return targetBytes;
+ }
+
+ @Override
+ public LocalRecoverable deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IOException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ private static LocalRecoverable deserializeV1(byte[] serialized) throws IOException {
+ final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+ if (bb.getInt() != MAGIC_NUMBER) {
+ throw new IOException("Corrupt data: Unexpected magic number.");
+ }
+
+ final long offset = bb.getLong();
+ final byte[] targetFileBytes = new byte[bb.getInt()];
+ final byte[] tempFileBytes = new byte[bb.getInt()];
+ bb.get(targetFileBytes);
+ bb.get(tempFileBytes);
+
+ final String targetPath = new String(targetFileBytes, CHARSET);
+ final String tempPath = new String(tempFileBytes, CHARSET);
+
+ return new LocalRecoverable(new File(targetPath), new File(tempPath), offset);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
new file mode 100644
index 0000000..a2f0485
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RecoverableWriter} for the {@link LocalFileSystem}.
+ */
+@Internal
+public class LocalRecoverableWriter implements RecoverableWriter {
+
+ private final LocalFileSystem fs;
+
+ public LocalRecoverableWriter(LocalFileSystem fs) {
+ this.fs = checkNotNull(fs);
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
+ final File targetFile = fs.pathToFile(filePath);
+ final File tempFile = generateStagingTempFilePath(targetFile);
+
+ // try to create the parent
+ final File parent = tempFile.getParentFile();
+ if (parent != null && !parent.mkdirs() && !parent.exists()) {
+ throw new IOException("Failed to create the parent directory: " + parent);
+ }
+
+ return new LocalRecoverableFsDataOutputStream(targetFile, tempFile);
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
+ if (recoverable instanceof LocalRecoverable) {
+ return new LocalRecoverableFsDataOutputStream((LocalRecoverable) recoverable);
+ }
+ else {
+ throw new IllegalArgumentException(
+ "LocalFileSystem cannot recover recoverable for other file system: " + recoverable);
+ }
+ }
+
+ @Override
+ public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
+ if (recoverable instanceof LocalRecoverable) {
+ return new LocalRecoverableFsDataOutputStream.LocalCommitter((LocalRecoverable) recoverable);
+ }
+ else {
+ throw new IllegalArgumentException(
+ "LocalFileSystem cannot recover recoverable for other file system: " + recoverable);
+ }
+ }
+
+ @Override
+ public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+ @SuppressWarnings("unchecked")
+ SimpleVersionedSerializer<CommitRecoverable> typedSerializer = (SimpleVersionedSerializer<CommitRecoverable>)
+ (SimpleVersionedSerializer<?>) LocalRecoverableSerializer.INSTANCE;
+
+ return typedSerializer;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+ @SuppressWarnings("unchecked")
+ SimpleVersionedSerializer<ResumeRecoverable> typedSerializer = (SimpleVersionedSerializer<ResumeRecoverable>)
+ (SimpleVersionedSerializer<?>) LocalRecoverableSerializer.INSTANCE;
+
+ return typedSerializer;
+ }
+
+ @Override
+ public boolean supportsResume() {
+ return true;
+ }
+
+ @VisibleForTesting
+ static File generateStagingTempFilePath(File targetFile) {
+ checkArgument(targetFile.isAbsolute(), "targetFile must be absolute");
+ checkArgument(!targetFile.isDirectory(), "targetFile must not be a directory");
+
+ final File parent = targetFile.getParentFile();
+ final String name = targetFile.getName();
+
+ checkArgument(parent != null, "targetFile must not be the root directory");
+
+ while (true) {
+ File candidate = new File(parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
+ if (!candidate.exists()) {
+ return candidate;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index f17d7e1..065ba5a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
import java.net.URI;
@@ -193,11 +194,19 @@ public class HadoopFileSystem extends FileSystem {
return fsKind;
}
+ @Override
+ public RecoverableWriter createRecoverableWriter() throws IOException {
+ // This writer is only supported on a subset of file systems, and on
+ // specific versions. We check these schemes and versions eagerly for better error
+ // messages in the constructor of the writer.
+ return new HadoopRecoverableWriter(fs);
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
- private static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
+ static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsRecoverable.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsRecoverable.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsRecoverable.java
new file mode 100644
index 0000000..1eaa8f4
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsRecoverable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the resume and commit descriptor objects for Hadoop's
+ * file system abstraction.
+ */
+class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {
+
+ /** The file path for the final result file. */
+ private final Path targetFile;
+
+ /** The file path of the staging file. */
+ private final Path tempFile;
+
+ /** The position to resume from. */
+ private final long offset;
+
+ /**
+ * Creates a resumable for the given file at the given position.
+ *
+ * @param targetFile The file to resume.
+ * @param offset The position to resume from.
+ */
+ HadoopFsRecoverable(Path targetFile, Path tempFile, long offset) {
+ checkArgument(offset >= 0, "offset must be >= 0");
+ this.targetFile = checkNotNull(targetFile, "targetFile");
+ this.tempFile = checkNotNull(tempFile, "tempFile");
+ this.offset = offset;
+ }
+
+ public Path targetFile() {
+ return targetFile;
+ }
+
+ public Path tempFile() {
+ return tempFile;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ @Override
+ public String toString() {
+ return "HadoopFsRecoverable " + tempFile + " @ " + offset + " -> " + targetFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..f944dc5
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Internal
+class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+ private static final long LEASE_TIMEOUT = 100000L;
+
+ private static Method truncateHandle;
+
+ private final FileSystem fs;
+
+ private final Path targetFile;
+
+ private final Path tempFile;
+
+ private final FSDataOutputStream out;
+
+ HadoopRecoverableFsDataOutputStream(
+ FileSystem fs,
+ Path targetFile,
+ Path tempFile) throws IOException {
+
+ ensureTruncateInitialized();
+
+ this.fs = checkNotNull(fs);
+ this.targetFile = checkNotNull(targetFile);
+ this.tempFile = checkNotNull(tempFile);
+ this.out = fs.create(tempFile);
+ }
+
+ HadoopRecoverableFsDataOutputStream(
+ FileSystem fs,
+ HadoopFsRecoverable recoverable) throws IOException {
+
+ ensureTruncateInitialized();
+
+ this.fs = checkNotNull(fs);
+ this.targetFile = checkNotNull(recoverable.targetFile());
+ this.tempFile = checkNotNull(recoverable.tempFile());
+
+ // the getFileStatus will throw a FileNotFound exception if the file is not there.
+ final FileStatus tmpFileStatus = fs.getFileStatus(tempFile);
+ if (tmpFileStatus.getLen() < recoverable.offset()) {
+ throw new IOException("Missing data in tmp file: " + tempFile.getName());
+ }
+
+ // truncate back and append
+ truncate(fs, tempFile, recoverable.offset());
+ waitUntilLeaseIsRevoked(tempFile);
+ out = fs.append(tempFile);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.hflush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ out.hsync();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return out.getPos();
+ }
+
+ @Override
+ public ResumeRecoverable persist() throws IOException {
+ sync();
+ return new HadoopFsRecoverable(targetFile, tempFile, getPos());
+ }
+
+ @Override
+ public Committer closeForCommit() throws IOException {
+ final long pos = getPos();
+ close();
+ return new HadoopFsCommitter(fs, new HadoopFsRecoverable(targetFile, tempFile, pos));
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ // ------------------------------------------------------------------------
+ // Reflection utils for truncation
+ // These are needed to compile against Hadoop versions before
+ // Hadoop 2.7, which have no truncation calls for HDFS.
+ // ------------------------------------------------------------------------
+
+ private static void ensureTruncateInitialized() throws FlinkRuntimeException {
+ if (truncateHandle == null) {
+ Method truncateMethod;
+ try {
+ truncateMethod = FileSystem.class.getMethod("truncate", Path.class, long.class);
+ }
+ catch (NoSuchMethodException e) {
+ throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
+ }
+
+ if (!Modifier.isPublic(truncateMethod.getModifiers())) {
+ throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
+ }
+
+ truncateHandle = truncateMethod;
+ }
+ }
+
+ static void truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
+ if (truncateHandle != null) {
+ try {
+ truncateHandle.invoke(hadoopFs, file, length);
+ }
+ catch (InvocationTargetException e) {
+ ExceptionUtils.rethrowIOException(e.getTargetException());
+ }
+ catch (Throwable t) {
+ throw new IOException(
+ "Truncation of file failed because of access/linking problems with Hadoop's truncate call. " +
+ "This is most likely a dependency conflict or class loading problem.");
+ }
+ }
+ else {
+ throw new IllegalStateException("Truncation handle has not been initialized");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Committer
+ // ------------------------------------------------------------------------
+
+ /**
+ * Implementation of a committer for the Hadoop File System abstraction.
+ * This implementation commits by renaming the temp file to the final file path.
+ * The temp file is truncated before renaming in case there is trailing garbage data.
+ */
+ static class HadoopFsCommitter implements Committer {
+
+ private final FileSystem fs;
+ private final HadoopFsRecoverable recoverable;
+
+ HadoopFsCommitter(FileSystem fs, HadoopFsRecoverable recoverable) {
+ this.fs = checkNotNull(fs);
+ this.recoverable = checkNotNull(recoverable);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ final Path src = recoverable.tempFile();
+ final Path dest = recoverable.targetFile();
+ final long expectedLength = recoverable.offset();
+
+ final FileStatus srcStatus;
+ try {
+ srcStatus = fs.getFileStatus(src);
+ }
+ catch (IOException e) {
+ throw new IOException("Cannot clean commit: Staging file does not exist.");
+ }
+
+ if (srcStatus.getLen() != expectedLength) {
+ // something was done to this file since the committer was created.
+ // this is not the "clean" case
+ throw new IOException("Cannot clean commit: File has trailing junk data.");
+ }
+
+ try {
+ fs.rename(src, dest);
+ }
+ catch (IOException e) {
+ throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
+ }
+ }
+
+ @Override
+ public void commitAfterRecovery() throws IOException {
+ final Path src = recoverable.tempFile();
+ final Path dest = recoverable.targetFile();
+ final long expectedLength = recoverable.offset();
+
+ FileStatus srcStatus = null;
+ try {
+ srcStatus = fs.getFileStatus(src);
+ }
+ catch (FileNotFoundException e) {
+ // status remains null
+ }
+ catch (IOException e) {
+ throw new IOException("Committing during recovery failed: Could not access status of source file.");
+ }
+
+ if (srcStatus != null) {
+ if (srcStatus.getLen() > expectedLength) {
+ // can happen if we co from persist to recovering for commit directly
+ // truncate the trailing junk away
+ truncate(fs, src, expectedLength);
+ }
+ }
+ else if (!fs.exists(dest)) {
+ // neither exists - that can be a sign of
+ // - (1) a serious problem (file system loss of data)
+ // - (2) a recovery of a savepoint that is some time old and the users
+ // removed the files in the meantime.
+
+ // TODO how to handle this?
+ // We probably need an option for users whether this should log,
+ // or result in an exception or unrecoverable exception
+ }
+ }
+
+ @Override
+ public CommitRecoverable getRecoverable() {
+ return recoverable;
+ }
+ }
+
+ /**
+ * Called when resuming execution after a failure and waits until the lease
+ * of the file we are resuming is free.
+ *
+ * <p>The lease of the file we are resuming writing/committing to may still
+ * belong to the process that failed previously and whose state we are
+ * recovering.
+ *
+ * @param path The path to the file we want to resume writing to.
+ */
+ private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException {
+ Preconditions.checkState(fs instanceof DistributedFileSystem);
+
+ final DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ dfs.recoverLease(path);
+ boolean isclosed = dfs.isFileClosed(path);
+
+ final StopWatch sw = new StopWatch();
+ sw.start();
+
+ while (!isclosed) {
+ if (sw.getTime() > LEASE_TIMEOUT) {
+ break;
+ }
+
+ try {
+ Thread.sleep(500L);
+ } catch (InterruptedException e1) {
+ // ignore it
+ }
+ isclosed = dfs.isFileClosed(path);
+ }
+ return isclosed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java
new file mode 100644
index 0000000..941e96a
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple serializer for the {@link HadoopFsRecoverable}.
+ */
+@Internal
+class HadoopRecoverableSerializer implements SimpleVersionedSerializer<HadoopFsRecoverable> {
+
+ static final HadoopRecoverableSerializer INSTANCE = new HadoopRecoverableSerializer();
+
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+ private static final int MAGIC_NUMBER = 0xd7436c5e;
+
+ /**
+ * Do not instantiate, use reusable {@link #INSTANCE} instead.
+ */
+ private HadoopRecoverableSerializer() {}
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(HadoopFsRecoverable obj) throws IOException {
+ final byte[] targetFileBytes = obj.targetFile().toString().getBytes(CHARSET);
+ final byte[] tempFileBytes = obj.tempFile().toString().getBytes(CHARSET);
+ final byte[] targetBytes = new byte[20 + targetFileBytes.length + tempFileBytes.length];
+
+ ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+ bb.putInt(MAGIC_NUMBER);
+ bb.putLong(obj.offset());
+ bb.putInt(targetFileBytes.length);
+ bb.putInt(tempFileBytes.length);
+ bb.put(targetFileBytes);
+ bb.put(tempFileBytes);
+
+ return targetBytes;
+ }
+
+ @Override
+ public HadoopFsRecoverable deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IOException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ private static HadoopFsRecoverable deserializeV1(byte[] serialized) throws IOException {
+ final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+ if (bb.getInt() != MAGIC_NUMBER) {
+ throw new IOException("Corrupt data: Unexpected magic number.");
+ }
+
+ final long offset = bb.getLong();
+ final byte[] targetFileBytes = new byte[bb.getInt()];
+ final byte[] tempFileBytes = new byte[bb.getInt()];
+ bb.get(targetFileBytes);
+ bb.get(tempFileBytes);
+
+ final String targetPath = new String(targetFileBytes, CHARSET);
+ final String tempPath = new String(tempFileBytes, CHARSET);
+
+ return new HadoopFsRecoverable(new Path(targetPath), new Path(tempPath), offset);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
new file mode 100644
index 0000000..305f8ee
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableWriter} for
+ * Hadoop's file system abstraction.
+ */
+@Internal
+public class HadoopRecoverableWriter implements RecoverableWriter {
+
+ /** The Hadoop file system on which the writer operates. */
+ private final org.apache.hadoop.fs.FileSystem fs;
+
+ /**
+ * Creates a new Recoverable writer.
+ * @param fs The Hadoop file system on which the writer operates.
+ */
+ public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {
+ this.fs = checkNotNull(fs);
+
+ // This writer is only supported on a subset of file systems, and on
+ // specific versions. We check these schemes and versions eagerly for
+ // better error messages.
+ if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) {
+ throw new UnsupportedOperationException(
+ "Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer");
+ }
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
+ final org.apache.hadoop.fs.Path targetFile = HadoopFileSystem.toHadoopPath(filePath);
+ final org.apache.hadoop.fs.Path tempFile = generateStagingTempFilePath(fs, targetFile);
+ return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile);
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
+ if (recoverable instanceof HadoopFsRecoverable) {
+ return new HadoopRecoverableFsDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
+ }
+ else {
+ throw new IllegalArgumentException(
+ "Hadoop File System cannot recover a recoverable for another file system: " + recoverable);
+ }
+ }
+
+ @Override
+ public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
+ if (recoverable instanceof HadoopFsRecoverable) {
+ return new HadoopRecoverableFsDataOutputStream.HadoopFsCommitter(fs, (HadoopFsRecoverable) recoverable);
+ }
+ else {
+ throw new IllegalArgumentException(
+ "Hadoop File System cannot recover a recoverable for another file system: " + recoverable);
+ }
+ }
+
+ @Override
+ public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+ @SuppressWarnings("unchecked")
+ SimpleVersionedSerializer<CommitRecoverable> typedSerializer = (SimpleVersionedSerializer<CommitRecoverable>)
+ (SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+
+ return typedSerializer;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+ @SuppressWarnings("unchecked")
+ SimpleVersionedSerializer<ResumeRecoverable> typedSerializer = (SimpleVersionedSerializer<ResumeRecoverable>)
+ (SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+
+ return typedSerializer;
+ }
+
+ @Override
+ public boolean supportsResume() {
+ return true;
+ }
+
+ @VisibleForTesting
+ static org.apache.hadoop.fs.Path generateStagingTempFilePath(
+ org.apache.hadoop.fs.FileSystem fs,
+ org.apache.hadoop.fs.Path targetFile) throws IOException {
+
+ checkArgument(targetFile.isAbsolute(), "targetFile must be absolute");
+
+ final org.apache.hadoop.fs.Path parent = targetFile.getParent();
+ final String name = targetFile.getName();
+
+ checkArgument(parent != null, "targetFile must not be the root directory");
+
+ while (true) {
+ org.apache.hadoop.fs.Path candidate = new org.apache.hadoop.fs.Path(
+ parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
+ if (!fs.exists(candidate)) {
+ return candidate;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2960949/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 5b14f43..09a9e54 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -20,12 +20,14 @@ package org.apache.flink.runtime.util;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,4 +124,22 @@ public class HadoopUtils {
}
return false;
}
+
+ /**
+ * Checks if the Hadoop dependency is at least of the given version.
+ */
+ public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException {
+ String versionString = VersionInfo.getVersion();
+ String[] versionParts = versionString.split("\\.");
+
+ if (versionParts.length < 2) {
+ throw new FlinkRuntimeException(
+ "Cannot determine version of Hadoop, unexpected version string: " + versionString);
+ }
+
+ int maj = Integer.parseInt(versionParts[0]);
+ int min = Integer.parseInt(versionParts[1]);
+
+ return maj > major || (maj == major && min >= minor);
+ }
}
[4/6] flink git commit: [hotfix] [core] Fix checkstyle workaround in
FileSystem
Posted by se...@apache.org.
[hotfix] [core] Fix checkstyle workaround in FileSystem
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2475989
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2475989
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2475989
Branch: refs/heads/master
Commit: f247598982fd82aeb2c609434cf875daef3ffabb
Parents: 548e9ed
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 12:33:29 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:18:56 2018 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/core/fs/FileSystem.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2475989/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 8698595..27255f9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -229,9 +229,7 @@ public abstract class FileSystem {
/** The default filesystem scheme to be used, configured during process-wide initialization.
* This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
- //CHECKSTYLE.OFF: StaticVariableName
- private static URI DEFAULT_SCHEME;
- //CHECKSTYLE.ON: StaticVariableName
+ private static URI defaultScheme;
// ------------------------------------------------------------------------
// Initialization
@@ -275,11 +273,11 @@ public abstract class FileSystem {
// also read the default file system scheme
final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
if (stringifiedUri == null) {
- DEFAULT_SCHEME = null;
+ defaultScheme = null;
}
else {
try {
- DEFAULT_SCHEME = new URI(stringifiedUri);
+ defaultScheme = new URI(stringifiedUri);
}
catch (URISyntaxException e) {
throw new IllegalConfigurationException("The default file system scheme ('" +
@@ -427,7 +425,7 @@ public abstract class FileSystem {
* @return The default file system URI
*/
public static URI getDefaultFsUri() {
- return DEFAULT_SCHEME != null ? DEFAULT_SCHEME : LocalFileSystem.getLocalFsURI();
+ return defaultScheme != null ? defaultScheme : LocalFileSystem.getLocalFsURI();
}
// ------------------------------------------------------------------------
[6/6] flink git commit: [FLINK-9751] [filesystem] Use FileChannel
directly in LocalRecoverableFsDataOutputStream
Posted by se...@apache.org.
[FLINK-9751] [filesystem] Use FileChannel directly in LocalRecoverableFsDataOutputStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a912e60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a912e60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a912e60
Branch: refs/heads/master
Commit: 7a912e603a040acfc46d394e86794b2c031298cb
Parents: 9d238e1
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 15:13:21 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:20:44 2018 +0200
----------------------------------------------------------------------
.../LocalRecoverableFsDataOutputStream.java | 28 +++++++++++++-------
1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a912e60/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index fd8e8fe..df9288e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -27,10 +27,14 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -44,12 +48,16 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
private final File tempFile;
- private final FileOutputStream fos;
+ private final FileChannel fileChannel;
+
+ private final OutputStream fos;
LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
this.targetFile = checkNotNull(targetFile);
this.tempFile = checkNotNull(tempFile);
- this.fos = new FileOutputStream(tempFile);
+
+ this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+ this.fos = Channels.newOutputStream(fileChannel);
}
LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
@@ -57,15 +65,15 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
this.tempFile = checkNotNull(resumable.tempFile());
if (!tempFile.exists()) {
- throw new FileNotFoundException("File Not Found: " + tempFile);
+ throw new FileNotFoundException("File Not Found: " + tempFile.getAbsolutePath());
}
- if (tempFile.length() < resumable.offset()) {
- throw new IOException("Missing data in tmp file: " + tempFile);
+ this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND);
+ if (this.fileChannel.position() < resumable.offset()) {
+ throw new IOException("Missing data in tmp file: " + tempFile.getAbsolutePath());
}
-
- this.fos = new FileOutputStream(this.tempFile, true);
- this.fos.getChannel().truncate(resumable.offset());
+ this.fileChannel.truncate(resumable.offset());
+ this.fos = Channels.newOutputStream(fileChannel);
}
@Override
@@ -85,12 +93,12 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
@Override
public void sync() throws IOException {
- fos.getFD().sync();
+ fileChannel.force(true);
}
@Override
public long getPos() throws IOException {
- return fos.getChannel().position();
+ return fileChannel.position();
}
@Override
[2/6] flink git commit: [hotfix] [core] Fix checkstyle in
flink-core:'org.apache.flink.api.common.io.compression'
Posted by se...@apache.org.
[hotfix] [core] Fix checkstyle in flink-core:'org.apache.flink.api.common.io.compression'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/548e9ed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/548e9ed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/548e9ed5
Branch: refs/heads/master
Commit: 548e9ed55efaf9eb22c5f3031558f0371df0d08c
Parents: 695bc56
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 15:23:38 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:18:56 2018 +0200
----------------------------------------------------------------------
.../common/io/compression/Bzip2InputStreamFactory.java | 13 +++++++------
.../compression/DeflateInflaterInputStreamFactory.java | 6 ++----
.../io/compression/GzipInflaterInputStreamFactory.java | 7 +++----
.../common/io/compression/XZInputStreamFactory.java | 12 +++++++-----
4 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/548e9ed5/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java
index d204907..a1692a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java
@@ -15,26 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.common.io.compression;
import org.apache.flink.annotation.Internal;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-
+/**
+ * Factory for Bzip2 decompressors.
+ */
@Internal
public class Bzip2InputStreamFactory implements InflaterInputStreamFactory<BZip2CompressorInputStream> {
- private static Bzip2InputStreamFactory INSTANCE = null;
+ private static final Bzip2InputStreamFactory INSTANCE = new Bzip2InputStreamFactory();
public static Bzip2InputStreamFactory getInstance() {
- if (INSTANCE == null) {
- INSTANCE = new Bzip2InputStreamFactory();
- }
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/548e9ed5/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
index b5051e6..33f7d51 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.common.io.compression;
import org.apache.flink.annotation.Internal;
@@ -31,12 +32,9 @@ import java.util.zip.InflaterInputStream;
@Internal
public class DeflateInflaterInputStreamFactory implements InflaterInputStreamFactory<InflaterInputStream> {
- private static DeflateInflaterInputStreamFactory INSTANCE = null;
+ private static final DeflateInflaterInputStreamFactory INSTANCE = new DeflateInflaterInputStreamFactory();
public static DeflateInflaterInputStreamFactory getInstance() {
- if (INSTANCE == null) {
- INSTANCE = new DeflateInflaterInputStreamFactory();
- }
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/548e9ed5/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
index 478eb2d..335e365 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.common.io.compression;
import org.apache.flink.annotation.Internal;
@@ -31,14 +32,12 @@ import java.util.zip.GZIPInputStream;
@Internal
public class GzipInflaterInputStreamFactory implements InflaterInputStreamFactory<GZIPInputStream> {
- private static GzipInflaterInputStreamFactory INSTANCE = null;
+ private static final GzipInflaterInputStreamFactory INSTANCE = new GzipInflaterInputStreamFactory();
public static GzipInflaterInputStreamFactory getInstance() {
- if (INSTANCE == null) {
- INSTANCE = new GzipInflaterInputStreamFactory();
- }
return INSTANCE;
}
+
@Override
public GZIPInputStream create(InputStream in) throws IOException {
return new GZIPInputStream(in);
http://git-wip-us.apache.org/repos/asf/flink/blob/548e9ed5/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java
index c80de40..0802ab6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java
@@ -15,25 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.common.io.compression;
-import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.flink.annotation.Internal;
+import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
+
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
+/**
+ * Factory for XZ decompressors.
+ */
@Internal
public class XZInputStreamFactory implements InflaterInputStreamFactory<XZCompressorInputStream> {
- private static XZInputStreamFactory INSTANCE = null;
+ private static final XZInputStreamFactory INSTANCE = new XZInputStreamFactory();
public static XZInputStreamFactory getInstance() {
- if (INSTANCE == null) {
- INSTANCE = new XZInputStreamFactory();
- }
return INSTANCE;
}
[3/6] flink git commit: [hotfix] [core] Remove unused class
AbstractMultiFSDataInputStream
Posted by se...@apache.org.
[hotfix] [core] Remove unused class AbstractMultiFSDataInputStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66e0a271
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66e0a271
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66e0a271
Branch: refs/heads/master
Commit: 66e0a271ad24277330f1de54db7798a2389a671a
Parents: f247598
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 29 19:27:58 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:18:56 2018 +0200
----------------------------------------------------------------------
.../core/fs/AbstractMultiFSDataInputStream.java | 115 -------------------
1 file changed, 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/66e0a271/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
deleted file mode 100644
index e01ac2e..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-/**
- * Abstract base class for wrappers over multiple {@link FSDataInputStream}, which gives a contiguous view on all inner
- * streams and makes them look like a single stream, in which we can read, seek, etc.
- */
-@Internal
-public abstract class AbstractMultiFSDataInputStream extends FSDataInputStream {
-
- /** Inner stream for the currently accessed segment of the virtual global stream. */
- protected FSDataInputStream delegate;
-
- /** Position in the virtual global stream. */
- protected long totalPos;
-
- /** Total available bytes in the virtual global stream. */
- protected long totalAvailable;
-
- public AbstractMultiFSDataInputStream() {
- this.totalPos = 0L;
- }
-
- @Override
- public void seek(long desired) throws IOException {
-
- if (desired == totalPos) {
- return;
- }
-
- Preconditions.checkArgument(desired >= 0L);
-
- if (desired > totalAvailable) {
- throw new EOFException();
- }
-
- IOUtils.closeQuietly(delegate);
- delegate = getSeekedStreamForOffset(desired);
-
- this.totalPos = desired;
- }
-
- @Override
- public long getPos() throws IOException {
- return totalPos;
- }
-
- @Override
- public int read() throws IOException {
-
- if (null == delegate) {
- return -1;
- }
-
- int val = delegate.read();
-
- if (-1 == val) {
- IOUtils.closeQuietly(delegate);
- if (totalPos < totalAvailable) {
- delegate = getSeekedStreamForOffset(totalPos);
- } else {
- delegate = null;
- }
- return read();
- }
-
- ++totalPos;
- return val;
- }
-
- @Override
- public void close() throws IOException {
- IOUtils.closeQuietly(delegate);
- }
-
- @Override
- public long skip(long n) throws IOException {
- seek(totalPos + n);
- return n;
- }
-
- /**
- * Delivers a the right stream for the given global stream offset. The returned stream is already seeked to the
- * right local offset that correctly reflects the global offset.
- *
- * @param globalStreamOffset the global offset to which we seek
- * @return a sub-stream, seeked to the correct local offset w.r.t. the global offset.
- * @throws IOException
- */
- protected abstract FSDataInputStream getSeekedStreamForOffset(long globalStreamOffset) throws IOException;
-}
[5/6] flink git commit: [FLINK-9751] [filesystem] Add fixes and tests
for Persistent Resumable Writers
Posted by se...@apache.org.
[FLINK-9751] [filesystem] Add fixes and tests for Persistent Resumable Writers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d238e1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d238e1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d238e1a
Branch: refs/heads/master
Commit: 9d238e1a170a96c311b0dafa92f572c2d97bbcad
Parents: e296094
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 15 23:20:37 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:20:37 2018 +0200
----------------------------------------------------------------------
.../LocalRecoverableFsDataOutputStream.java | 6 +-
.../core/io/SimpleVersionedSerializer.java | 3 +
.../core/fs/AbstractResumableWriterTest.java | 380 +++++++++++++++++++
.../LocalFileSystemResumableWriterTest.java | 45 +++
.../HadoopRecoverableFsDataOutputStream.java | 62 ++-
.../apache/flink/runtime/util/HadoopUtils.java | 2 +-
.../fs/hdfs/HadoopResumableWriterTest.java | 95 +++++
7 files changed, 572 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index 6c6a554..fd8e8fe 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -57,11 +57,11 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
this.tempFile = checkNotNull(resumable.tempFile());
if (!tempFile.exists()) {
- throw new FileNotFoundException("File Not Found: " + tempFile.getName());
+ throw new FileNotFoundException("File Not Found: " + tempFile);
}
if (tempFile.length() < resumable.offset()) {
- throw new IOException("Missing data in tmp file: " + tempFile.getName());
+ throw new IOException("Missing data in tmp file: " + tempFile);
}
this.fos = new FileOutputStream(this.tempFile, true);
@@ -165,6 +165,8 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
try (FileOutputStream fos = new FileOutputStream(src, true)) {
fos.getChannel().truncate(expectedLength);
}
+ } else if (src.length() < expectedLength) {
+ throw new IOException("Missing data in tmp file: " + src);
}
// source still exists, so no renaming happened yet. do it!
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
index 6c061a5..4dfeea2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.core.io;
+import org.apache.flink.annotation.Internal;
+
import java.io.IOException;
/**
@@ -44,6 +46,7 @@ import java.io.IOException;
*
* @param <E> The data type serialized / deserialized by this serializer.
*/
+@Internal
public interface SimpleVersionedSerializer<E> extends Versioned {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
new file mode 100644
index 0000000..8077305
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A base test-suite for the {@link RecoverableWriter}.
+ * This should be subclassed to test each filesystem specific writer.
+ */
+public abstract class AbstractResumableWriterTest extends TestLogger {
+
+ private static final Random RND = new Random();
+
+ private static final String testData1 = "THIS IS A TEST 1.";
+ private static final String testData2 = "THIS IS A TEST 2.";
+ private static final String testData3 = "THIS IS A TEST 3.";
+
+ private Path basePathForTest;
+
+ private static FileSystem fileSystem;
+
+ public abstract Path getBasePath() throws Exception;
+
+ public abstract FileSystem initializeFileSystem();
+
+ public Path getBasePathForTest() {
+ return basePathForTest;
+ }
+
+ private FileSystem getFileSystem() {
+ if (fileSystem == null) {
+ fileSystem = initializeFileSystem();
+ }
+ return fileSystem;
+ }
+
+ private RecoverableWriter getNewFileSystemWriter() throws IOException {
+ return getFileSystem().createRecoverableWriter();
+ }
+
+ @Before
+ public void prepare() throws Exception {
+ basePathForTest = new Path(getBasePath(), randomName());
+ getFileSystem().mkdirs(basePathForTest);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ getFileSystem().delete(basePathForTest, true);
+ }
+
+ @Test
+ public void testCloseWithNoData() throws Exception {
+ final RecoverableWriter writer = getNewFileSystemWriter();
+
+ final Path testDir = getBasePathForTest();
+
+ final Path path = new Path(testDir, "part-0");
+
+ final RecoverableFsDataOutputStream stream = writer.open(path);
+ for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+ Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+ Assert.assertTrue(fileContents.getValue().isEmpty());
+ }
+
+ stream.closeForCommit().commit();
+
+ for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertTrue(fileContents.getValue().isEmpty());
+ }
+ }
+
+ @Test
+ public void testCommitAfterNormalClose() throws Exception {
+ final RecoverableWriter writer = getNewFileSystemWriter();
+
+ final Path testDir = getBasePathForTest();
+
+ final Path path = new Path(testDir, "part-0");
+
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+ stream.closeForCommit().commit();
+
+ for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertEquals(testData1, fileContents.getValue());
+ }
+ }
+ }
+
+ @Test
+ public void testCommitAfterPersist() throws Exception {
+ final RecoverableWriter writer = getNewFileSystemWriter();
+
+ final Path testDir = getBasePathForTest();
+
+ final Path path = new Path(testDir, "part-0");
+
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+ stream.persist();
+
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+ stream.closeForCommit().commit();
+
+ for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertEquals(testData1 + testData2, fileContents.getValue());
+ }
+ }
+ }
+
+ // TESTS FOR RECOVERY
+
+ private static final String INIT_EMPTY_PERSIST = "EMPTY";
+ private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
+ private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
+ private static final String FINAL_WITH_EXTRA_STATE = "FINAL";
+
+ @Test
+ public void testRecoverWithEmptyState() throws Exception {
+ testResumeAfterMultiplePersist(
+ INIT_EMPTY_PERSIST,
+ "",
+ testData3);
+ }
+
+ @Test
+ public void testRecoverWithState() throws Exception {
+ testResumeAfterMultiplePersist(
+ INTERM_WITH_STATE_PERSIST,
+ testData1,
+ testData1 + testData3);
+ }
+
+ @Test
+ public void testRecoverFromIntermWithoutAdditionalState() throws Exception {
+ testResumeAfterMultiplePersist(
+ INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
+ testData1,
+ testData1 + testData3);
+ }
+
+ @Test
+ public void testRecoverAfterMultiplePersistsState() throws Exception {
+ testResumeAfterMultiplePersist(
+ FINAL_WITH_EXTRA_STATE,
+ testData1 + testData2,
+ testData1 + testData2 + testData3);
+ }
+
+ private void testResumeAfterMultiplePersist(
+ final String persistName,
+ final String expectedPostRecoveryContents,
+ final String expectedFinalContents) throws Exception {
+
+ final Path testDir = getBasePathForTest();
+ final Path path = new Path(testDir, "part-0");
+
+ final RecoverableWriter initWriter = getNewFileSystemWriter();
+
+ final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4);
+ try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+ recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
+
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
+ recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());
+
+ // and write some more data
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+ recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
+ }
+
+ final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer();
+ final byte[] serializedRecoverable = serializer.serialize(recoverables.get(persistName));
+
+ // get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+ final RecoverableWriter newWriter = getNewFileSystemWriter();
+ final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> deserializer = newWriter.getResumeRecoverableSerializer();
+ final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
+ deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
+
+ try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) {
+
+ // we expect the data to be truncated
+ Map<Path, String> files = getFileContentByPath(testDir);
+ Assert.assertEquals(1L, files.size());
+
+ for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+ Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+ Assert.assertEquals(expectedPostRecoveryContents, fileContents.getValue());
+ }
+
+ recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8));
+ recoveredStream.closeForCommit().commit();
+
+ files = getFileContentByPath(testDir);
+ Assert.assertEquals(1L, files.size());
+
+ for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertEquals(expectedFinalContents, fileContents.getValue());
+ }
+ }
+ }
+
+ @Test
+ public void testCommitAfterRecovery() throws Exception {
+ final Path testDir = getBasePathForTest();
+ final Path path = new Path(testDir, "part-0");
+
+ final RecoverableWriter initWriter = getNewFileSystemWriter();
+
+ final RecoverableWriter.CommitRecoverable recoverable;
+ try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ stream.persist();
+ stream.persist();
+
+ // and write some more data
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+ recoverable = stream.closeForCommit().getRecoverable();
+ }
+
+ final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable);
+
+ // get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+ final RecoverableWriter newWriter = getNewFileSystemWriter();
+ final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> deserializer = newWriter.getCommitRecoverableSerializer();
+ final RecoverableWriter.CommitRecoverable recoveredRecoverable = deserializer.deserialize(deserializer.getVersion(), serializedRecoverable);
+
+ final RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable);
+ committer.commitAfterRecovery();
+
+ Map<Path, String> files = getFileContentByPath(testDir);
+ Assert.assertEquals(1L, files.size());
+
+ for (Map.Entry<Path, String> fileContents : files.entrySet()) {
+ Assert.assertEquals("part-0", fileContents.getKey().getName());
+ Assert.assertEquals(testData1 + testData2, fileContents.getValue());
+ }
+ }
+
+ // TESTS FOR EXCEPTIONS
+
+ @Test(expected = IOException.class)
+ public void testExceptionWritingAfterCloseForCommit() throws Exception {
+ final Path testDir = getBasePathForTest();
+
+ final RecoverableWriter writer = getNewFileSystemWriter();
+ final Path path = new Path(testDir, "part-0");
+
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ stream.closeForCommit().getRecoverable();
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+ fail();
+ }
+ }
+
+ @Test(expected = IOException.class)
+ public void testResumeAfterCommit() throws Exception {
+ final Path testDir = getBasePathForTest();
+
+ final RecoverableWriter writer = getNewFileSystemWriter();
+ final Path path = new Path(testDir, "part-0");
+
+ RecoverableWriter.ResumeRecoverable recoverable;
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ recoverable = stream.persist();
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+ stream.closeForCommit().commit();
+ }
+
+ // this should throw an exception as the file is already committed
+ writer.recover(recoverable);
+ fail();
+ }
+
+ @Test
+ public void testResumeWithWrongOffset() throws Exception {
+ // this is a rather unrealistic scenario, but it is to trigger
+ // truncation of the file and try to resume with missing data.
+
+ final Path testDir = getBasePathForTest();
+
+ final RecoverableWriter writer = getNewFileSystemWriter();
+ final Path path = new Path(testDir, "part-0");
+
+ final RecoverableWriter.ResumeRecoverable recoverable1;
+ final RecoverableWriter.ResumeRecoverable recoverable2;
+ try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+ recoverable1 = stream.persist();
+ stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+ recoverable2 = stream.persist();
+ stream.write(testData3.getBytes(StandardCharsets.UTF_8));
+ }
+
+ try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {
+ // this should work fine
+ } catch (Exception e) {
+ fail();
+ }
+
+ // this should throw an exception
+ try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable2)) {
+ fail();
+ } catch (IOException e) {
+ // we expect this
+ return;
+ }
+ fail();
+ }
+
+ private Map<Path, String> getFileContentByPath(Path directory) throws IOException {
+ Map<Path, String> contents = new HashMap<>();
+
+ final FileStatus[] filesInBucket = getFileSystem().listStatus(directory);
+ for (FileStatus file : filesInBucket) {
+ final long fileLength = file.getLen();
+ byte[] serContents = new byte[(int) fileLength];
+
+ FSDataInputStream stream = getFileSystem().open(file.getPath());
+ stream.read(serContents);
+
+ contents.put(file.getPath(), new String(serContents, StandardCharsets.UTF_8));
+ }
+ return contents;
+ }
+
+ private static String randomName() {
+ return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
new file mode 100644
index 0000000..d347609
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for the {@link LocalRecoverableWriter}.
+ */
+public class LocalFileSystemResumableWriterTest extends AbstractResumableWriterTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Override
+ public Path getBasePath() throws Exception {
+ return new Path(TEMP_FOLDER.newFolder().toURI());
+ }
+
+ @Override
+ public FileSystem initializeFileSystem() {
+ return FileSystem.getLocalFileSystem();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index f944dc5..c688b32 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -19,11 +19,13 @@
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.time.StopWatch;
@@ -38,13 +40,18 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for Hadoop's
+ * file system abstraction.
+ */
@Internal
class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
- private static final long LEASE_TIMEOUT = 100000L;
+ private static final long LEASE_TIMEOUT = 100_000L;
private static Method truncateHandle;
@@ -79,16 +86,23 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
this.targetFile = checkNotNull(recoverable.targetFile());
this.tempFile = checkNotNull(recoverable.tempFile());
- // the getFileStatus will throw a FileNotFound exception if the file is not there.
- final FileStatus tmpFileStatus = fs.getFileStatus(tempFile);
- if (tmpFileStatus.getLen() < recoverable.offset()) {
- throw new IOException("Missing data in tmp file: " + tempFile.getName());
+ // truncate back and append
+ try {
+ truncate(fs, tempFile, recoverable.offset());
+ } catch (Exception e) {
+ throw new IOException("Missing data in tmp file: " + tempFile, e);
}
- // truncate back and append
- truncate(fs, tempFile, recoverable.offset());
waitUntilLeaseIsRevoked(tempFile);
out = fs.append(tempFile);
+
+ // sanity check
+ long pos = out.getPos();
+ if (pos != recoverable.offset()) {
+ IOUtils.closeQuietly(out);
+ throw new IOException("Truncate failed: " + tempFile +
+ " (requested=" + recoverable.offset() + " ,size=" + pos + ')');
+ }
}
@Override
@@ -108,6 +122,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
@Override
public void sync() throws IOException {
+ out.hflush();
out.hsync();
}
@@ -243,9 +258,22 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
if (srcStatus != null) {
if (srcStatus.getLen() > expectedLength) {
- // can happen if we co from persist to recovering for commit directly
+ // can happen if we go from persist to recovering for commit directly
// truncate the trailing junk away
- truncate(fs, src, expectedLength);
+ try {
+ truncate(fs, src, expectedLength);
+ } catch (Exception e) {
+ // this can happen if the file is smaller than expected
+ throw new IOException("Problem while truncating file: " + src, e);
+ }
+ }
+
+ // rename to final location (if it exists, overwrite it)
+ try {
+ fs.rename(src, dest);
+ }
+ catch (IOException e) {
+ throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
}
}
else if (!fs.exists(dest)) {
@@ -281,23 +309,21 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
final DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.recoverLease(path);
- boolean isclosed = dfs.isFileClosed(path);
+
+ final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
final StopWatch sw = new StopWatch();
sw.start();
- while (!isclosed) {
- if (sw.getTime() > LEASE_TIMEOUT) {
- break;
- }
-
+ boolean isClosed = dfs.isFileClosed(path);
+ while (!isClosed && deadline.hasTimeLeft()) {
try {
Thread.sleep(500L);
} catch (InterruptedException e1) {
- // ignore it
+ throw new IOException("Recovering the lease failed: ", e1);
}
- isclosed = dfs.isFileClosed(path);
+ isClosed = dfs.isFileClosed(path);
}
- return isclosed;
+ return isClosed;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 09a9e54..d0b3a7a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.util;
import org.apache.flink.configuration.ConfigConstants;
-
import org.apache.flink.util.FlinkRuntimeException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.Text;
http://git-wip-us.apache.org/repos/asf/flink/blob/9d238e1a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
new file mode 100644
index 0000000..5827a55
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * Tests for the {@link HadoopRecoverableWriter}.
+ */
+public class HadoopResumableWriterTest extends AbstractResumableWriterTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ private static MiniDFSCluster hdfsCluster;
+
+ /** The cached file system instance. */
+ private static FileSystem fileSystem;
+
+ private static Path basePath;
+
+ @BeforeClass
+ public static void testHadoopVersion() {
+ Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
+ }
+
+ @BeforeClass
+ public static void verifyOS() {
+ Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
+ }
+
+ @BeforeClass
+ public static void createHDFS() throws Exception {
+ final File baseDir = TEMP_FOLDER.newFolder();
+
+ final Configuration hdConf = new Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+
+ final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+
+ final org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
+
+ fileSystem = new HadoopFileSystem(hdfs);
+ basePath = new Path(hdfs.getUri() + "/tests");
+ }
+
+ @AfterClass
+ public static void destroyHDFS() throws Exception {
+ if (hdfsCluster != null) {
+ hdfsCluster.getFileSystem().delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true);
+ hdfsCluster.shutdown();
+ }
+ }
+
+ @Override
+ public Path getBasePath() {
+ return basePath;
+ }
+
+ @Override
+ public FileSystem initializeFileSystem() {
+ return fileSystem;
+ }
+}