You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/11 15:26:51 UTC

[GitHub] asfgit closed pull request #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer.

asfgit closed pull request #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer.
URL: https://github.com/apache/flink/pull/6795
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
index 8108670822a..af5b8bb2efa 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
 import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 
@@ -28,6 +29,7 @@
  * The stream initially writes to hidden files or temp files and only creates the
  * target file once it is closed and "committed".
  */
+@PublicEvolving
 public abstract class RecoverableFsDataOutputStream extends FSDataOutputStream {
 
 	/**
@@ -44,10 +46,15 @@
 	public abstract Committer closeForCommit() throws IOException;
 
 	/**
-	 * Closes the stream releasing all local resources, but not finalizing the
-	 * file that the stream writes to.
+	 * Closes this stream. Closing the steam releases the local resources that the stream
+	 * uses, but does NOT result in durability of previously written data. This method
+	 * should be interpreted as a "close in order to dispose" or "close on failure".
 	 *
-	 * <p>This method should be understood as "close to dispose on failure".
+	 * <p>In order to persist all previously written data, one needs to call the
+	 * {@link #closeForCommit()} method and call {@link Committer#commit()} on the retured
+	 * committer object.
+	 *
+	 * @throws IOException Thrown if an error occurred during closing.
 	 */
 	@Override
 	public abstract void close() throws IOException;
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
index bbba793141f..e5bfdb84a02 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
@@ -89,6 +90,7 @@
  * generics at the cost of doing some explicit casts in the implementation that would
  * otherwise have been implicitly generated by the generics compiler.
  */
+@PublicEvolving
 public interface RecoverableWriter {
 
 	/**
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
similarity index 97%
rename from flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
rename to flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
index 80773058a33..ab37a078cd8 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
@@ -39,7 +39,7 @@
  * A base test-suite for the {@link RecoverableWriter}.
  * This should be subclassed to test each filesystem specific writer.
  */
-public abstract class AbstractResumableWriterTest extends TestLogger {
+public abstract class AbstractRecoverableWriterTest extends TestLogger {
 
 	private static final Random RND = new Random();
 
@@ -53,20 +53,20 @@
 
 	public abstract Path getBasePath() throws Exception;
 
-	public abstract FileSystem initializeFileSystem();
+	public abstract FileSystem initializeFileSystem() throws Exception;
 
 	public Path getBasePathForTest() {
 		return basePathForTest;
 	}
 
-	private FileSystem getFileSystem() {
+	private FileSystem getFileSystem() throws Exception {
 		if (fileSystem == null) {
 			fileSystem = initializeFileSystem();
 		}
 		return fileSystem;
 	}
 
-	private RecoverableWriter getNewFileSystemWriter() throws IOException {
+	private RecoverableWriter getNewFileSystemWriter() throws Exception {
 		return getFileSystem().createRecoverableWriter();
 	}
 
@@ -358,7 +358,7 @@ public void testResumeWithWrongOffset() throws Exception {
 		fail();
 	}
 
-	private Map<Path, String> getFileContentByPath(Path directory) throws IOException {
+	private Map<Path, String> getFileContentByPath(Path directory) throws Exception {
 		Map<Path, String> contents = new HashMap<>();
 
 		final FileStatus[] filesInBucket = getFileSystem().listStatus(directory);
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemRecoverableWriterTest.java
similarity index 89%
rename from flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
rename to flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemRecoverableWriterTest.java
index d347609c5bf..30905834925 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemRecoverableWriterTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.core.fs.local;
 
-import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.AbstractRecoverableWriterTest;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
@@ -28,7 +28,7 @@
 /**
  * Tests for the {@link LocalRecoverableWriter}.
  */
-public class LocalFileSystemResumableWriterTest extends AbstractResumableWriterTest {
+public class LocalFileSystemRecoverableWriterTest extends AbstractRecoverableWriterTest {
 
 	@ClassRule
 	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 065ba5a66f8..dceca1e36bb 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -206,7 +206,7 @@ public RecoverableWriter createRecoverableWriter() throws IOException {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
+	public static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
 		return new org.apache.hadoop.fs.Path(path.toUri());
 	}
 
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java
similarity index 95%
rename from flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
rename to flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java
index 5827a5529a7..1ff42c079c9 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopResumableWriterTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.fs.hdfs;
 
-import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.AbstractRecoverableWriterTest;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.util.HadoopUtils;
@@ -37,7 +37,7 @@
 /**
  * Tests for the {@link HadoopRecoverableWriter}.
  */
-public class HadoopResumableWriterTest extends AbstractResumableWriterTest {
+public class HadoopRecoverableWriterTest extends AbstractRecoverableWriterTest {
 
 	@ClassRule
 	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 58144f08927..318fd39829c 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -21,13 +21,17 @@
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URI;
 
@@ -36,6 +40,24 @@
  */
 public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 
+	public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE = ConfigOptions
+			.key("s3.upload.min.part.size")
+			.defaultValue(FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE)
+			.withDescription(
+					"This option is relevant to the Recoverable Writer and sets the min size of data that " +
+					"buffered locally, before being sent to S3. Flink also takes care of checkpointing locally " +
+					"buffered data. This value cannot be less than 5MB or greater than 5GB (limits set by Amazon)."
+			);
+
+	public static final ConfigOption<Integer> MAX_CONCURRENT_UPLOADS = ConfigOptions
+			.key("s3.upload.max.concurrent.uploads")
+			.defaultValue(Runtime.getRuntime().availableProcessors())
+			.withDescription(
+					"This option is relevant to the Recoverable Writer and limits the number of " +
+					"parts that can be concurrently in-flight. By default, this is set to " +
+					Runtime.getRuntime().availableProcessors() + "."
+			);
+
 	/**
 	 * The substring to be replaced by random entropy in checkpoint paths.
 	 */
@@ -116,7 +138,19 @@ public FileSystem create(URI fsUri) throws IOException {
 				}
 			}
 
-			return new FlinkS3FileSystem(fs, entropyInjectionKey, numEntropyChars);
+			final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS);
+			final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
+			final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
+			final S3MultiPartUploader s3AccessHelper = getS3AccessHelper(fs);
+
+			return new FlinkS3FileSystem(
+					fs,
+					localTmpDirectory,
+					entropyInjectionKey,
+					numEntropyChars,
+					s3AccessHelper,
+					s3minPartSize,
+					maxConcurrentUploads);
 		}
 		catch (IOException e) {
 			throw e;
@@ -130,5 +164,8 @@ public FileSystem create(URI fsUri) throws IOException {
 
 	protected abstract URI getInitURI(
 		URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
+
+	@Nullable
+	protected abstract S3MultiPartUploader getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
 }
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 84883bbafc1..553edde75b0 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -20,11 +20,22 @@
 
 import org.apache.flink.core.fs.EntropyInjectingFileSystem;
 import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
+import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -39,15 +50,23 @@
 
 	private final int entropyLength;
 
-	/**
-	 * Creates a FlinkS3FileSystem based on the given Hadoop S3 file system.
-	 * The given Hadoop file system object is expected to be initialized already.
-	 *
-	 * @param hadoopS3FileSystem The Hadoop FileSystem that will be used under the hood.
-	 */
-	public FlinkS3FileSystem(org.apache.hadoop.fs.FileSystem hadoopS3FileSystem) {
-		this(hadoopS3FileSystem, null, -1);
-	}
+	// ------------------- Recoverable Writer Parameters -------------------
+
+	/** The minimum size of a part in the multipart upload, except for the last part: 5 MIBytes. */
+	public static final long S3_MULTIPART_MIN_PART_SIZE = 5L << 20;
+
+	private final String localTmpDir;
+
+	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator;
+
+	@Nullable
+	private final S3MultiPartUploader s3UploadHelper;
+
+	private final Executor uploadThreadPool;
+
+	private final long s3uploadPartSize;
+
+	private final int maxConcurrentUploadsPerStream;
 
 	/**
 	 * Creates a FlinkS3FileSystem based on the given Hadoop S3 file system.
@@ -61,8 +80,12 @@ public FlinkS3FileSystem(org.apache.hadoop.fs.FileSystem hadoopS3FileSystem) {
 	 */
 	public FlinkS3FileSystem(
 			org.apache.hadoop.fs.FileSystem hadoopS3FileSystem,
+			String localTmpDirectory,
 			@Nullable String entropyInjectionKey,
-			int entropyLength) {
+			int entropyLength,
+			@Nullable S3MultiPartUploader s3UploadHelper,
+			long s3uploadPartSize,
+			int maxConcurrentUploadsPerStream) {
 
 		super(hadoopS3FileSystem);
 
@@ -72,6 +95,16 @@ public FlinkS3FileSystem(
 
 		this.entropyInjectionKey = entropyInjectionKey;
 		this.entropyLength = entropyLength;
+
+		// recoverable writer parameter configuration initialization
+		this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory);
+		this.tmpFileCreator = RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
+		this.s3UploadHelper = s3UploadHelper;
+		this.uploadThreadPool = Executors.newCachedThreadPool();
+
+		Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
+		this.s3uploadPartSize = s3uploadPartSize;
+		this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
 	}
 
 	// ------------------------------------------------------------------------
@@ -91,4 +124,24 @@ public String generateEntropy() {
 	public FileSystemKind getKind() {
 		return FileSystemKind.OBJECT_STORE;
 	}
+
+	public String getLocalTmpDir() {
+		return localTmpDir;
+	}
+
+	@Override
+	public RecoverableWriter createRecoverableWriter() throws IOException {
+		if (s3UploadHelper == null) {
+			// this is the case for Presto
+			throw new UnsupportedOperationException("This s3 file system implementation does not support recoverable writers.");
+		}
+
+		return S3RecoverableWriter.writer(
+				getHadoopFileSystem(),
+				tmpFileCreator,
+				s3UploadHelper,
+				uploadThreadPool,
+				s3uploadPartSize,
+				maxConcurrentUploadsPerStream);
+	}
 }
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java
new file mode 100644
index 00000000000..d0dd7c8f734
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An executor decorator that allows only a certain number of concurrent executions.
+ * The {@link #execute(Runnable)} method blocks once that number of executions is exceeded.
+ */
+@Internal
+public final class BackPressuringExecutor implements Executor {
+
+	/** The executor for the actual execution. */
+	private final Executor delegate;
+
+	/** The semaphore to track permits and block until permits are available. */
+	private final Semaphore permits;
+
+	public BackPressuringExecutor(Executor delegate, int numConcurrentExecutions) {
+		checkArgument(numConcurrentExecutions > 0, "numConcurrentExecutions must be > 0");
+		this.delegate = checkNotNull(delegate, "delegate");
+		this.permits = new Semaphore(numConcurrentExecutions, true);
+	}
+
+	@Override
+	public void execute(Runnable command) {
+		// To not block interrupts here (faster cancellation) we acquire interruptibly.
+		// Unfortunately, we need to rethrow this as a RuntimeException (suboptimal), because
+		// the method signature does not permit anything else, and we want to maintain the
+		// Executor interface for transparent drop-in.
+		try {
+			permits.acquire();
+		}
+		catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+			throw new FlinkRuntimeException("interrupted:", e);
+		}
+
+		final SemaphoreReleasingRunnable runnable = new SemaphoreReleasingRunnable(command, permits);
+		try {
+			delegate.execute(runnable);
+		} catch (RejectedExecutionException e) {
+			runnable.release();
+			ExceptionUtils.rethrow(e, e.getMessage());
+		} catch (Throwable t) {
+			ExceptionUtils.rethrow(t, t.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class SemaphoreReleasingRunnable implements Runnable {
+
+		private final Runnable delegate;
+
+		private final Semaphore toRelease;
+
+		private final AtomicBoolean released = new AtomicBoolean();
+
+		SemaphoreReleasingRunnable(Runnable delegate, Semaphore toRelease) {
+			this.delegate = delegate;
+			this.toRelease = toRelease;
+		}
+
+		@Override
+		public void run() {
+			try {
+				delegate.run();
+			}
+			finally {
+				release();
+			}
+		}
+
+		void release() {
+			if (released.compareAndSet(false, true)) {
+				toRelease.release();
+			}
+		}
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/OffsetAwareOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/OffsetAwareOutputStream.java
new file mode 100644
index 00000000000..dc6e5ded737
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/OffsetAwareOutputStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.IOUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@link OutputStream} that keeps track of its current length.
+ */
+@Internal
+public final class OffsetAwareOutputStream implements Closeable {
+
+	private final OutputStream currentOut;
+
+	private long position;
+
+	OffsetAwareOutputStream(OutputStream currentOut, long position) {
+		this.currentOut = checkNotNull(currentOut);
+		this.position = position;
+	}
+
+	public long getLength() {
+		return position;
+	}
+
+	public void write(byte[] b, int off, int len) throws IOException {
+		currentOut.write(b, off, len);
+		position += len;
+	}
+
+	public void flush() throws IOException {
+		currentOut.flush();
+	}
+
+	@Override
+	public void close() {
+		IOUtils.closeQuietly(currentOut);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCounted.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCounted.java
new file mode 100644
index 00000000000..84b0fa08611
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCounted.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface to simply add reference counting functionality.
+ */
+@Internal
+public interface RefCounted {
+
+	/**
+	 * Increases the reference counter.
+	 */
+	void retain();
+
+	/**
+	 * Decreases the reference counter.
+	 *
+	 * @return {@code true} if the reference
+	 * counter reached 0, {@code false} otherwise.
+	 */
+	boolean release();
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
new file mode 100644
index 00000000000..8f3aff899f1
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RefCountedFile} that also uses an in-memory buffer for buffering small writes.
+ * This is done to avoid frequent 'flushes' of the file stream to disk.
+ */
+@Internal
+public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {
+
+	public static final int BUFFER_SIZE = 4096;
+
+	private final RefCountedFile currentTmpFile;
+
+	/** The write buffer. */
+	private final byte[] buffer;
+
+	/** Current position in the buffer, must be in [0, buffer.length]. */
+	private int positionInBuffer;
+
+	private boolean closed;
+
+	@VisibleForTesting
+	public RefCountedBufferingFileStream(
+			final RefCountedFile file,
+			final int bufferSize) {
+
+		checkArgument(bufferSize > 0L);
+
+		this.currentTmpFile = checkNotNull(file);
+		this.buffer = new byte[bufferSize];
+		this.positionInBuffer = 0;
+		this.closed = false;
+	}
+
+	@Override
+	public InputStream getInputStream() throws IOException {
+		return Files.newInputStream(currentTmpFile.getFile().toPath(), StandardOpenOption.READ);
+	}
+
+	@Override
+	public long getPos() {
+		return currentTmpFile.getLength() + positionInBuffer;
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		if (positionInBuffer >= buffer.length) {
+			flush();
+		}
+
+		requireOpen();
+
+		buffer[positionInBuffer++] = (byte) b;
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		if (len >= buffer.length) {
+			// circumvent the internal buffer for large writes
+			flush();
+			currentTmpFile.write(b, off, len);
+			return;
+		}
+
+		requireOpen();
+
+		if (len > buffer.length - positionInBuffer) {
+			flush();
+		}
+
+		System.arraycopy(b, off, buffer, positionInBuffer, len);
+		positionInBuffer += len;
+	}
+
+	@Override
+	public void flush() throws IOException {
+		currentTmpFile.write(buffer, 0, positionInBuffer);
+		currentTmpFile.flush();
+		positionInBuffer = 0;
+	}
+
+	@Override
+	public void sync() throws IOException {
+		throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream cannot sync state to S3. " +
+				"Use persist() to create a persistent recoverable intermediate point.");
+	}
+
+	@Override
+	public boolean isClosed() throws IOException {
+		return closed;
+	}
+
+	@Override
+	public void close() {
+		if (!closed) {
+			currentTmpFile.closeStream();
+			closed = true;
+		}
+	}
+
+	@Override
+	public void retain() {
+		currentTmpFile.retain();
+	}
+
+	@Override
+	public boolean release() {
+		return currentTmpFile.release();
+	}
+
+	private void requireOpen() throws IOException {
+		if (closed) {
+			throw new IOException("Stream closed.");
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "Reference Counted File with {" +
+				"path=\'" + currentTmpFile.getFile().toPath().toAbsolutePath() + "\'" +
+				", size=" + getPos() +
+				", reference counter=" + currentTmpFile.getReferenceCounter() +
+				", closed=" + closed +
+				'}';
+	}
+
+	@VisibleForTesting
+	int getPositionInBuffer() {
+		return positionInBuffer;
+	}
+
+	@VisibleForTesting
+	public int getReferenceCounter() {
+		return currentTmpFile.getReferenceCounter();
+	}
+
+	// ------------------------- Factory Methods -------------------------
+
+	public static RefCountedBufferingFileStream openNew(
+			final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider) throws IOException {
+
+		return new RefCountedBufferingFileStream(
+				tmpFileProvider.apply(null),
+				BUFFER_SIZE);
+	}
+
+	public static RefCountedBufferingFileStream restore(
+			final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider,
+			final File initialTmpFile) throws IOException {
+
+		return new RefCountedBufferingFileStream(
+				tmpFileProvider.apply(initialTmpFile),
+				BUFFER_SIZE);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
new file mode 100644
index 00000000000..d4b962e1463
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link FSDataOutputStream} with the {@link RefCounted} functionality.
+ */
+@Internal
+public abstract class RefCountedFSOutputStream extends FSDataOutputStream implements RefCounted {
+
+	/**
+	 * Gets an {@link InputStream} that allows to read the contents of the file.
+	 *
+	 * @return An input stream to the contents of the file.
+	 */
+	public abstract InputStream getInputStream() throws IOException;
+
+	/**
+	 * Checks if the file is closed for writes.
+	 *
+	 * @return {@link true} if the file is closed, {@link false} otherwise.
+	 */
+	public abstract boolean isClosed() throws IOException;
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
new file mode 100644
index 00000000000..178763631e1
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A reference counted file which is deleted as soon as no caller
+ * holds a reference to the wrapped {@link File}.
+ */
+@Internal
+public class RefCountedFile implements RefCounted {
+
+	private final File file;
+
+	private final OffsetAwareOutputStream stream;
+
+	private final AtomicInteger references;
+
+	private boolean closed;
+
+	private RefCountedFile(
+			final File file,
+			final OutputStream currentOut,
+			final long bytesInCurrentPart) {
+		this.file = checkNotNull(file);
+		this.references = new AtomicInteger(1);
+		this.stream = new OffsetAwareOutputStream(
+				currentOut,
+				bytesInCurrentPart);
+		this.closed = false;
+	}
+
+	public File getFile() {
+		return file;
+	}
+
+	public OffsetAwareOutputStream getStream() {
+		return stream;
+	}
+
+	public long getLength() {
+		return stream.getLength();
+	}
+
+	public void write(byte[] b, int off, int len) throws IOException {
+		requireOpened();
+		if (len > 0) {
+			stream.write(b, off, len);
+		}
+	}
+
+	public void flush() throws IOException {
+		requireOpened();
+		stream.flush();
+	}
+
+	public void closeStream() {
+		if (!closed) {
+			IOUtils.closeQuietly(stream);
+			closed = true;
+		}
+	}
+
+	@Override
+	public void retain() {
+		references.incrementAndGet();
+	}
+
+	@Override
+	public boolean release() {
+		if (references.decrementAndGet() == 0) {
+			return tryClose();
+		}
+		return false;
+	}
+
+	private boolean tryClose() {
+		try {
+			Files.deleteIfExists(file.toPath());
+			return true;
+		} catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalError(t);
+		}
+		return false;
+	}
+
+	private void requireOpened() throws IOException {
+		if (closed) {
+			throw new IOException("Stream closed.");
+		}
+	}
+
+	@VisibleForTesting
+	int getReferenceCounter() {
+		return references.get();
+	}
+
+	// ------------------------------ Factory methods for initializing a temporary file ------------------------------
+
+	public static RefCountedFile newFile(
+			final File file,
+			final OutputStream currentOut) throws IOException {
+		return new RefCountedFile(file, currentOut, 0L);
+	}
+
+	public static RefCountedFile restoredFile(
+			final File file,
+			final OutputStream currentOut,
+			final long bytesInCurrentPart) {
+		return new RefCountedFile(file, currentOut, bytesInCurrentPart);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
new file mode 100644
index 00000000000..7a928d0c99a
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A utility class that creates local {@link RefCountedFile reference counted files} that serve as temporary files.
+ */
+@Internal
+public class RefCountedTmpFileCreator implements FunctionWithException<File, RefCountedFile, IOException> {
+
+	private final File[] tempDirectories;
+
+	private final AtomicInteger next;
+
+	/**
+	 * Creates a new TmpReferenceCountedCreator.
+	 *
+	 * @param tempDirectories The temp directories, not null, and at least one.
+	 */
+	private RefCountedTmpFileCreator(File... tempDirectories) {
+		checkArgument(tempDirectories.length > 0, "tempDirectories must not be empty");
+		for (File f : tempDirectories) {
+			if (f == null) {
+				throw new IllegalArgumentException("tempDirectories contains null entries");
+			}
+		}
+
+		this.tempDirectories = tempDirectories.clone();
+		this.next = new AtomicInteger(new Random().nextInt(this.tempDirectories.length));
+	}
+
+	/**
+	 * Gets the next temp file and stream to temp file.
+	 * This creates the temp file atomically, making sure no previous file is overwritten.
+	 *
+	 * <p>This method is safe against concurrent use.
+	 *
+	 * @return A pair of temp file and output stream to that temp file.
+	 * @throws IOException Thrown, if the stream to the temp file could not be opened.
+	 */
+	@Override
+	public RefCountedFile apply(File file) throws IOException {
+		final File directory = tempDirectories[nextIndex()];
+
+		while (true) {
+			try {
+				if (file == null) {
+					final File newFile = new File(directory, ".tmp_" + UUID.randomUUID());
+					final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+					return RefCountedFile.newFile(newFile, out);
+				} else {
+					final OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
+					return RefCountedFile.restoredFile(file, out, file.length());
+				}
+			} catch (FileAlreadyExistsException ignored) {
+				// fall through the loop and retry
+			}
+		}
+	}
+
+	private int nextIndex() {
+		int currIndex, newIndex;
+		do {
+			currIndex = next.get();
+			newIndex = currIndex + 1;
+			if (newIndex == tempDirectories.length) {
+				newIndex = 0;
+			}
+		}
+		while (!next.compareAndSet(currIndex, newIndex));
+
+		return currIndex;
+	}
+
+	public static RefCountedTmpFileCreator inDirectories(File... tmpDirectories) {
+		return new RefCountedTmpFileCreator(tmpDirectories);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/MultiPartUploadInfo.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/MultiPartUploadInfo.java
new file mode 100644
index 00000000000..c4b0b2d500d
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/MultiPartUploadInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.annotation.Internal;
+
+import com.amazonaws.services.s3.model.PartETag;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data structure containing information concerning the in-flight MPU.
+ */
+@Internal
+final class MultiPartUploadInfo {
+
+	private final String objectName;
+
+	private final String uploadId;
+
+	private final List<PartETag> completeParts;
+
+	private final Optional<File> incompletePart;
+
+	/**
+	 * This contains both the parts that are already uploaded but also
+	 * the ones that are queued to be uploaded at the {@link RecoverableMultiPartUpload}.
+	 */
+	private int numberOfRegisteredParts;
+
+	/**
+	 * This is the total size of the upload, i.e. also with the parts
+	 * that are queued but not uploaded yet.
+	 */
+	private long expectedSizeInBytes;
+
+	MultiPartUploadInfo(
+			final String objectName,
+			final String uploadId,
+			final List<PartETag> completeParts,
+			final long numBytes,
+			final Optional<File> incompletePart) {
+
+		checkArgument(numBytes >= 0L);
+
+		this.objectName = checkNotNull(objectName);
+		this.uploadId = checkNotNull(uploadId);
+		this.completeParts = checkNotNull(completeParts);
+		this.incompletePart = checkNotNull(incompletePart);
+
+		this.numberOfRegisteredParts = completeParts.size();
+		this.expectedSizeInBytes = numBytes;
+	}
+
+	String getObjectName() {
+		return objectName;
+	}
+
+	String getUploadId() {
+		return uploadId;
+	}
+
+	int getNumberOfRegisteredParts() {
+		return numberOfRegisteredParts;
+	}
+
+	long getExpectedSizeInBytes() {
+		return expectedSizeInBytes;
+	}
+
+	Optional<File> getIncompletePart() {
+		return incompletePart;
+	}
+
+	List<PartETag> getCopyOfEtagsOfCompleteParts() {
+		return new ArrayList<>(completeParts);
+	}
+
+	void registerNewPart(long length) {
+		this.expectedSizeInBytes += length;
+		this.numberOfRegisteredParts++;
+	}
+
+	void registerCompletePart(PartETag eTag) {
+		completeParts.add(eTag);
+	}
+
+	int getRemainingParts() {
+		return numberOfRegisteredParts - completeParts.size();
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUpload.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUpload.java
new file mode 100644
index 00000000000..bac1ae64c78
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUpload.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * An uploader for parts of a multipart upload (MPU).
+ * The caller can add parts to the MPU and snapshot its state
+ * to be recovered after a failure.
+ */
+@Internal
+interface RecoverableMultiPartUpload {
+
+	/**
+	 * Creates a snapshot of this MultiPartUpload (MPU) and returns a
+	 * {@link RecoverableFsDataOutputStream.Committer Committer} which can be used to finalize the MPU.
+	 *
+	 * @return The {@link RecoverableFsDataOutputStream.Committer Committer} that can be used to complete the MPU.
+	 */
+	RecoverableFsDataOutputStream.Committer snapshotAndGetCommitter() throws IOException;
+
+	/**
+	 * Creates a snapshot of this MultiPartUpload, from which the upload can be resumed.
+	 *
+	 * @param incompletePartFile The file containing the in-progress part which has not yet reached the minimum
+	 *                           part size in order to be uploaded.
+	 *
+	 * @return The {@link RecoverableWriter.ResumeRecoverable ResumeRecoverable} which
+	 * can be used to resume the upload.
+	 */
+	RecoverableWriter.ResumeRecoverable snapshotAndGetRecoverable(
+			@Nullable final RefCountedFSOutputStream incompletePartFile) throws IOException;
+
+	/**
+	 * Adds a part to the uploads without any size limitations.
+	 *
+	 * @param file The file with the part data.
+	 *
+	 * @throws IOException If this method throws an exception, the RecoverableS3MultiPartUpload
+	 *                     should not be used any more, but recovered instead.
+	 */
+	void uploadPart(final RefCountedFSOutputStream file) throws IOException;
+
+	/**
+	 * In case of an incomplete part which had not reached the minimum part size at the time of
+	 * snapshotting, this returns a file containing the in-progress part at which writing can be resumed.
+	 */
+	Optional<File> getIncompletePart();
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
new file mode 100644
index 00000000000..80042ce22f7
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
+
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartResult;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An uploader for parts of a multipart upload. The uploader can snapshot its state to
+ * be recovered after a failure.
+ *
+ * <p><b>Note:</b> This class is NOT thread safe and relies on external synchronization.
+ *
+ * <p><b>Note:</b> If any of the methods to add parts throws an exception, this class may be
+ * in an inconsistent state (bookkeeping wise) and should be discarded and recovered.
+ */
+@Internal
+@NotThreadSafe
+final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload {
+
+	private final S3MultiPartUploader s3MPUploader;
+
+	private final Executor uploadThreadPool;
+
+	private final Deque<CompletableFuture<PartETag>> uploadsInProgress;
+
+	private final String namePrefixForTempObjects;
+
+	private final MultiPartUploadInfo currentUploadInfo;
+
+	// ------------------------------------------------------------------------
+
+	private RecoverableMultiPartUploadImpl(
+			S3MultiPartUploader s3uploader,
+			Executor uploadThreadPool,
+			String uploadId,
+			String objectName,
+			List<PartETag> partsSoFar,
+			long numBytes,
+			Optional<File> incompletePart
+	) {
+		checkArgument(numBytes >= 0L);
+
+		this.s3MPUploader = checkNotNull(s3uploader);
+		this.uploadThreadPool = checkNotNull(uploadThreadPool);
+		this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
+		this.namePrefixForTempObjects = incompleteObjectNamePrefix(objectName);
+		this.uploadsInProgress = new ArrayDeque<>();
+	}
+
+	/**
+	 * Adds a part to the uploads without any size limitations.
+	 *
+	 * <p>This method is non-blocking and does not wait for the part upload to complete.
+	 *
+	 * @param file The file with the part data.
+	 *
+	 * @throws IOException If this method throws an exception, the RecoverableS3MultiPartUpload
+	 *                     should not be used any more, but recovered instead.
+	 */
+	@Override
+	public void uploadPart(RefCountedFSOutputStream file) throws IOException {
+		// this is to guarantee that nobody is
+		// writing to the file we are uploading.
+		checkState(file.isClosed());
+
+		final CompletableFuture<PartETag> future = new CompletableFuture<>();
+		uploadsInProgress.add(future);
+
+		final long partLength = file.getPos();
+		currentUploadInfo.registerNewPart(partLength);
+
+		file.retain(); // keep the file while the async upload still runs
+		uploadThreadPool.execute(new UploadTask(s3MPUploader, currentUploadInfo, file, future));
+	}
+
+	@Override
+	public Optional<File> getIncompletePart() {
+		return currentUploadInfo.getIncompletePart();
+	}
+
+	@Override
+	public S3Committer snapshotAndGetCommitter() throws IOException {
+		final S3Recoverable snapshot = snapshotAndGetRecoverable(null);
+
+		return new S3Committer(
+				s3MPUploader,
+				snapshot.getObjectName(),
+				snapshot.uploadId(),
+				snapshot.parts(),
+				snapshot.numBytesInParts());
+	}
+
+	/**
+	 * Creates a snapshot of this MultiPartUpload, from which the upload can be resumed.
+	 *
+	 * <p>Data buffered locally which is less than
+	 * {@link org.apache.flink.fs.s3.common.FlinkS3FileSystem#S3_MULTIPART_MIN_PART_SIZE S3_MULTIPART_MIN_PART_SIZE},
+	 * and cannot be uploaded as part of the MPU and set to S3 as independent objects.
+	 *
+	 * <p>This implementation currently blocks until all part uploads are complete and returns
+	 * a completed future.
+	 */
+	@Override
+	public S3Recoverable snapshotAndGetRecoverable(@Nullable final RefCountedFSOutputStream incompletePartFile) throws IOException {
+
+		final String incompletePartObjectName = safelyUploadSmallPart(incompletePartFile);
+
+		// make sure all other uploads are complete
+		// this currently makes the method blocking,
+		// to be made non-blocking in the future
+		awaitPendingPartsUpload();
+
+		final String objectName = currentUploadInfo.getObjectName();
+		final String uploadId = currentUploadInfo.getUploadId();
+		final List<PartETag> completedParts = currentUploadInfo.getCopyOfEtagsOfCompleteParts();
+		final long sizeInBytes = currentUploadInfo.getExpectedSizeInBytes();
+
+		if (incompletePartObjectName == null) {
+			return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes);
+		} else {
+			return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes, incompletePartObjectName, incompletePartFile.getPos());
+		}
+	}
+
+	@Nullable
+	private String safelyUploadSmallPart(@Nullable RefCountedFSOutputStream file) throws IOException {
+
+		if (file == null || file.getPos() == 0L) {
+			return null;
+		}
+
+		// first, upload the trailing data file. during that time, other in-progress uploads may complete.
+		final String incompletePartObjectName = createTmpObjectName();
+		file.retain();
+		try (InputStream inputStream = file.getInputStream()) {
+
+			// TODO: staged incomplete parts are not cleaned up as
+			// they do not fall under the user's global TTL on S3.
+			// Figure out a way to clean them.
+
+			s3MPUploader.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
+		}
+		finally {
+			file.release();
+		}
+		return incompletePartObjectName;
+	}
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	static String incompleteObjectNamePrefix(String objectName) {
+		checkNotNull(objectName);
+
+		final int lastSlash = objectName.lastIndexOf('/');
+		final String parent;
+		final String child;
+
+		if (lastSlash == -1) {
+			parent = "";
+			child = objectName;
+		} else {
+			parent = objectName.substring(0, lastSlash + 1);
+			child = objectName.substring(lastSlash + 1);
+		}
+		return parent + (child.isEmpty() ? "" : '_') + child + "_tmp_";
+	}
+
+	private void awaitPendingPartsUpload() throws IOException {
+		checkState(currentUploadInfo.getRemainingParts() == uploadsInProgress.size());
+
+		while (currentUploadInfo.getRemainingParts() > 0) {
+			CompletableFuture<PartETag> next = uploadsInProgress.peekFirst();
+			PartETag nextPart = awaitPendingPartUploadToComplete(next);
+			currentUploadInfo.registerCompletePart(nextPart);
+			uploadsInProgress.removeFirst();
+		}
+	}
+
+	private PartETag awaitPendingPartUploadToComplete(CompletableFuture<PartETag> upload) throws IOException {
+		final PartETag completedUploadEtag;
+		try {
+			completedUploadEtag = upload.get();
+		}
+		catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+			throw new IOException("Interrupted while waiting for part uploads to complete");
+		}
+		catch (ExecutionException e) {
+			throw new IOException("Uploading parts failed", e.getCause());
+		}
+		return completedUploadEtag;
+	}
+
+	private String createTmpObjectName() {
+		return namePrefixForTempObjects + UUID.randomUUID().toString();
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory methods
+	// ------------------------------------------------------------------------
+
+	public static RecoverableMultiPartUploadImpl newUpload(
+			final S3MultiPartUploader s3uploader,
+			final Executor uploadThreadPool,
+			final String objectName) throws IOException {
+
+		final String multiPartUploadId = s3uploader.startMultiPartUpload(objectName);
+
+		return new RecoverableMultiPartUploadImpl(
+				s3uploader,
+				uploadThreadPool,
+				multiPartUploadId,
+				objectName,
+				new ArrayList<>(),
+				0L,
+				Optional.empty());
+	}
+
+	public static RecoverableMultiPartUploadImpl recoverUpload(
+			final S3MultiPartUploader s3uploader,
+			final Executor uploadThreadPool,
+			final String multipartUploadId,
+			final String objectName,
+			final List<PartETag> partsSoFar,
+			final long numBytesSoFar,
+			final Optional<File> incompletePart) {
+
+		return new RecoverableMultiPartUploadImpl(
+				s3uploader,
+				uploadThreadPool,
+				multipartUploadId,
+				objectName,
+				new ArrayList<>(partsSoFar),
+				numBytesSoFar,
+				incompletePart);
+
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory methods
+	// ------------------------------------------------------------------------
+
+	private static class UploadTask implements Runnable {
+
+		private final S3MultiPartUploader s3uploader;
+
+		private final String objectName;
+
+		private final String uploadId;
+
+		private final int partNumber;
+
+		private final RefCountedFSOutputStream file;
+
+		private final CompletableFuture<PartETag> future;
+
+		UploadTask(
+				final S3MultiPartUploader s3uploader,
+				final MultiPartUploadInfo currentUpload,
+				final RefCountedFSOutputStream file,
+				final CompletableFuture<PartETag> future) {
+
+			checkNotNull(currentUpload);
+
+			this.objectName = currentUpload.getObjectName();
+			this.uploadId = currentUpload.getUploadId();
+			this.partNumber = currentUpload.getNumberOfRegisteredParts();
+
+			// these are limits put by Amazon
+			checkArgument(partNumber >= 1  && partNumber <= 10_000);
+
+			this.s3uploader = checkNotNull(s3uploader);
+			this.file = checkNotNull(file);
+			this.future = checkNotNull(future);
+		}
+
+		@Override
+		public void run() {
+			try (final InputStream inputStream = file.getInputStream()) {
+				final UploadPartResult result = s3uploader.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
+				future.complete(new PartETag(result.getPartNumber(), result.getETag()));
+				file.release();
+			}
+			catch (Throwable t) {
+				future.completeExceptionally(t);
+			}
+		}
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
new file mode 100644
index 00000000000..1fc8bf1eaa8
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Data object to commit an S3 MultiPartUpload.
+ */
+public final class S3Committer implements RecoverableFsDataOutputStream.Committer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(S3Committer.class);
+
+	private final S3MultiPartUploader s3uploader;
+
+	private final String uploadId;
+
+	private final String objectName;
+
+	private final List<PartETag> parts;
+
+	private final long totalLength;
+
+	S3Committer(S3MultiPartUploader s3uploader, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
+		this.s3uploader = checkNotNull(s3uploader);
+		this.objectName = checkNotNull(objectName);
+		this.uploadId = checkNotNull(uploadId);
+		this.parts = checkNotNull(parts);
+		this.totalLength = totalLength;
+	}
+
+	@Override
+	public void commit() throws IOException {
+		if (totalLength > 0L) {
+			LOG.info("Committing {} with MPU ID {}", objectName, uploadId);
+
+			final AtomicInteger errorCount = new AtomicInteger();
+			s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
+
+			if (errorCount.get() == 0) {
+				LOG.debug("Successfully committed {} with MPU ID {}", objectName, uploadId);
+			} else {
+				LOG.debug("Successfully committed {} with MPU ID {} after {} retries.", objectName, uploadId, errorCount.get());
+			}
+		} else {
+			LOG.debug("No data to commit for file: {}", objectName);
+		}
+	}
+
+	@Override
+	public void commitAfterRecovery() throws IOException {
+		if (totalLength > 0L) {
+			LOG.info("Trying to commit after recovery {} with MPU ID {}", objectName, uploadId);
+
+			try {
+				s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
+			} catch (IOException e) {
+				LOG.info("Failed to commit after recovery {} with MPU ID {}. " +
+						"Checking if file was committed before...", objectName, uploadId);
+				LOG.trace("Exception when committing:", e);
+
+				try {
+					ObjectMetadata metadata = s3uploader.getObjectMetadata(objectName);
+					if (totalLength != metadata.getContentLength()) {
+						String message = String.format("Inconsistent result for object %s: conflicting lengths. " +
+										"Recovered committer for upload %s indicates %s bytes, present object is %s bytes",
+								objectName, uploadId, totalLength, metadata.getContentLength());
+						LOG.warn(message);
+						throw new IOException(message, e);
+					}
+				} catch (FileNotFoundException fnf) {
+					LOG.warn("Object {} not existing after failed recovery commit with MPU ID {}", objectName, uploadId);
+					throw new IOException(String.format("Recovering commit failed for object %s. " +
+							"Object does not exist and MultiPart Upload %s is not valid.", objectName, uploadId), e);
+				}
+			}
+		} else {
+			LOG.debug("No data to commit for file: {}", objectName);
+		}
+	}
+
+	@Override
+	public RecoverableWriter.CommitRecoverable getRecoverable() {
+		return new S3Recoverable(objectName, uploadId, parts, totalLength);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3ConfigOptions.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3ConfigOptions.java
new file mode 100644
index 00000000000..421880554e5
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3ConfigOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+/**
+ * Configuration keys for the S3 file system based using Hadoop's s3a.
+ */
+public final class S3ConfigOptions {
+
+	// ------------------------------------------------------------------------
+
+	/** Not meant to be instantiated. */
+	private S3ConfigOptions() {}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
new file mode 100644
index 00000000000..da227a47488
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.annotation.Internal;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An interface that abstracts away the Multi-Part Upload (MPU) functionality offered by S3,
+ * from the specific implementation of the file system. This is needed so that we can
+ * accommodate both Hadoop S3 and Presto.
+ *
+ * <p>Multipart uploads are convenient for large object. These will be uploaded in
+ * multiple parts and the mutli-part upload is the equivalent of a transaction, where
+ * the upload with all its parts will be either committed or discarded.
+ */
+@Internal
+public interface S3MultiPartUploader {
+
+	/**
+	 * Initializes a Multi-Part Upload.
+	 *
+	 * @param key the key whose value we want to upload in parts.
+	 * @return The id of the initiated Multi-Part Upload which will be used during the uploading of the parts.
+	 * @throws IOException
+	 */
+	String startMultiPartUpload(String key) throws IOException;
+
+	/**
+	 * Uploads a part and associates it with the MPU with the provided {@code uploadId}.
+	 *
+	 * @param key the key this MPU is associated with.
+	 * @param uploadId the id of the MPU.
+	 * @param partNumber the number of the part being uploaded (has to be in [1 ... 10000]).
+	 * @param file the (local) file holding the part to be uploaded.
+	 * @param length the length of the part.
+	 * @return The {@link UploadPartResult result} of the attempt to upload the part.
+	 * @throws IOException
+	 */
+	UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException;
+
+	/**
+	 * Uploads a part and associates it with the MPU with the provided {@code uploadId}.
+	 *
+	 * <p>Contrary to the {@link #uploadIncompletePart(String, InputStream, long)}, this part can
+	 * be smaller than the minimum part size imposed by S3.
+	 *
+	 * @param key the key used to identify this part.
+	 * @param file the (local) file holding the data to be uploaded.
+	 * @param length the size of the data to be uploaded.
+	 * @return The {@link PutObjectResult result} of the attempt to stage the incomplete part.
+	 * @throws IOException
+	 */
+	PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException;
+
+	/**
+	 * Finalizes a Multi-Part Upload.
+	 *
+	 * @param key the key identifying the object we finished uploading.
+	 * @param uploadId the id of the MPU.
+	 * @param partETags the list of {@link PartETag ETags} associated with this MPU.
+	 * @param length the size of the uploaded object.
+	 * @param errorCount a counter that will be used to count any failed attempts to commit the MPU.
+	 * @return The {@link CompleteMultipartUploadResult result} of the attempt to finalize the MPU.
+	 * @throws IOException
+	 */
+	CompleteMultipartUploadResult commitMultiPartUpload(String key, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException;
+
+	/**
+	 * Fetches the metadata associated with a given key on S3.
+	 *
+	 * @param key the key.
+	 * @return The associated {@link ObjectMetadata}.
+	 * @throws IOException
+	 */
+	ObjectMetadata getObjectMetadata(String key) throws IOException;
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Recoverable.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Recoverable.java
new file mode 100644
index 00000000000..9ea992a9d8f
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Recoverable.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import com.amazonaws.services.s3.model.PartETag;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Data object to recover an S3 MultiPartUpload for a recoverable output stream.
+ */
+public final class S3Recoverable implements RecoverableWriter.ResumeRecoverable {
+
+	private final String uploadId;
+
+	private final String objectName;
+
+	private final List<PartETag> parts;
+
+	@Nullable
+	private final String lastPartObject;
+
+	private long numBytesInParts;
+
+	private long lastPartObjectLength;
+
+	S3Recoverable(
+			String objectName,
+			String uploadId,
+			List<PartETag> parts,
+			long numBytesInParts
+	) {
+		this(objectName, uploadId, parts, numBytesInParts, null, -1L);
+	}
+
+	S3Recoverable(
+			String objectName,
+			String uploadId,
+			List<PartETag> parts,
+			long numBytesInParts,
+			@Nullable String lastPartObject,
+			long lastPartObjectLength
+	) {
+		checkArgument(numBytesInParts >= 0L);
+		checkArgument(lastPartObject == null || lastPartObjectLength > 0L);
+
+		this.objectName = checkNotNull(objectName);
+		this.uploadId = checkNotNull(uploadId);
+		this.parts = checkNotNull(parts);
+		this.numBytesInParts = numBytesInParts;
+
+		this.lastPartObject = lastPartObject;
+		this.lastPartObjectLength = lastPartObjectLength;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public String uploadId() {
+		return uploadId;
+	}
+
+	public String getObjectName() {
+		return objectName;
+	}
+
+	public List<PartETag> parts() {
+		return parts;
+	}
+
+	public long numBytesInParts() {
+		return numBytesInParts;
+	}
+
+	@Nullable
+	public String incompleteObjectName() {
+		return lastPartObject;
+	}
+
+	public long incompleteObjectLength() {
+		return lastPartObjectLength;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		StringBuilder buf = new StringBuilder(128);
+		buf.append("S3Recoverable: ");
+		buf.append("key=").append(objectName);
+		buf.append(", uploadId=").append(uploadId);
+		buf.append(", bytesInParts=").append(numBytesInParts);
+		buf.append(", parts=[");
+		int num = 0;
+		for (PartETag part : parts) {
+			if (0 != num++) {
+				buf.append(", ");
+			}
+			buf.append(part.getPartNumber()).append('=').append(part.getETag());
+		}
+		buf.append("], trailingPart=").append(lastPartObject);
+		buf.append("trailingPartLen=").append(lastPartObjectLength);
+
+		return buf.toString();
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
new file mode 100644
index 00000000000..220ddd58eb9
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
+import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
+import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.commons.io.IOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.fs.s3.common.FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A RecoverableFsDataOutputStream to S3 that is based on a recoverable multipart upload.
+ *
+ * <p>This class is NOT thread-safe. Concurrent writes tho this stream result in corrupt or
+ * lost data.
+ *
+ * <p>The {@link #close()} method may be called concurrently when cancelling / shutting down.
+ * It will still ensure that local transient resources (like streams and temp files) are cleaned up,
+ * but will not touch data previously persisted in S3.
+ */
+@PublicEvolving
+@NotThreadSafe
+public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+	/** Lock that guards the critical sections when new parts are rolled over.
+	 * Despite the class being declared not thread safe, we protect certain regions to
+	 * at least enable concurrent close() calls during cancellation or abort/cleanup. */
+	private final ReentrantLock lock = new ReentrantLock();
+
+	private final RecoverableMultiPartUpload upload;
+
+	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider;
+
+	/**
+	 * The number of bytes at which we start a new part of the multipart upload.
+	 * This has to be greater than the non-configurable minimum. That is equal to
+	 * {@link org.apache.flink.fs.s3.common.FlinkS3FileSystem#S3_MULTIPART_MIN_PART_SIZE S3_MULTIPART_MIN_PART_SIZE}
+	 * and is set by Amazon.
+	 */
+	private final long userDefinedMinPartSize;
+
+	private RefCountedFSOutputStream fileStream;
+
+	private long bytesBeforeCurrentPart;
+
+	/**
+	 * Single constructor to initialize all. Actual setup of the parts happens in the
+	 * factory methods.
+	 */
+	S3RecoverableFsDataOutputStream(
+			RecoverableMultiPartUpload upload,
+			FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
+			RefCountedFSOutputStream initialTmpFile,
+			long userDefinedMinPartSize,
+			long bytesBeforeCurrentPart) {
+
+		checkArgument(bytesBeforeCurrentPart >= 0L);
+
+		this.upload = checkNotNull(upload);
+		this.tmpFileProvider = checkNotNull(tempFileCreator);
+		this.userDefinedMinPartSize = userDefinedMinPartSize;
+
+		this.fileStream = initialTmpFile;
+		this.bytesBeforeCurrentPart = bytesBeforeCurrentPart;
+	}
+
+	// ------------------------------------------------------------------------
+	//  stream methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void write(int b) throws IOException {
+		fileStream.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		fileStream.write(b, off, len);
+		openNewPartIfNecessary(userDefinedMinPartSize);
+	}
+
+	@Override
+	public void flush() throws IOException {
+		fileStream.flush();
+		openNewPartIfNecessary(userDefinedMinPartSize);
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return bytesBeforeCurrentPart + fileStream.getPos();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		fileStream.sync();
+	}
+
+	@Override
+	public void close() throws IOException {
+		lock();
+		try {
+			fileStream.flush();
+		} finally {
+			IOUtils.closeQuietly(fileStream);
+			fileStream.release();
+			unlock();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  recoverable stream methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public RecoverableWriter.ResumeRecoverable persist() throws IOException {
+		lock();
+		try {
+			fileStream.flush();
+			openNewPartIfNecessary(userDefinedMinPartSize);
+
+			// We do not stop writing to the current file, we merely limit the upload to the
+			// first n bytes of the current file
+
+			return upload.snapshotAndGetRecoverable(fileStream);
+		}
+		finally {
+			unlock();
+		}
+	}
+
+	@Override
+	public Committer closeForCommit() throws IOException {
+		lock();
+		try {
+			closeAndUploadPart();
+			return upload.snapshotAndGetCommitter();
+		}
+		finally {
+			unlock();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  S3
+	// ------------------------------------------------------------------------
+
+	private void openNewPartIfNecessary(long sizeThreshold) throws IOException {
+		final long fileLength = fileStream.getPos();
+		if (fileLength >= sizeThreshold) {
+			lock();
+			try {
+				uploadCurrentAndOpenNewPart(fileLength);
+			} finally {
+				unlock();
+			}
+		}
+	}
+
+	private void uploadCurrentAndOpenNewPart(long fileLength) throws IOException {
+		bytesBeforeCurrentPart += fileLength;
+		closeAndUploadPart();
+
+		// initialize a new temp file
+		fileStream = RefCountedBufferingFileStream.openNew(tmpFileProvider);
+	}
+
+	private void closeAndUploadPart() throws IOException {
+		fileStream.flush();
+		fileStream.close();
+		if (fileStream.getPos() > 0L) {
+			upload.uploadPart(fileStream);
+		}
+		fileStream.release();
+	}
+
+	// ------------------------------------------------------------------------
+	//  locking
+	// ------------------------------------------------------------------------
+
+	private void lock() throws IOException {
+		try {
+			lock.lockInterruptibly();
+		}
+		catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+			throw new IOException("interrupted");
+		}
+	}
+
+	private void unlock() {
+		lock.unlock();
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory methods
+	// ------------------------------------------------------------------------
+
+	public static S3RecoverableFsDataOutputStream newStream(
+			final RecoverableMultiPartUpload upload,
+			final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
+			final long userDefinedMinPartSize) throws IOException {
+
+		checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE);
+
+		final RefCountedBufferingFileStream fileStream = boundedBufferingFileStream(tmpFileCreator, Optional.empty());
+
+		return new S3RecoverableFsDataOutputStream(
+				upload,
+				tmpFileCreator,
+				fileStream,
+				userDefinedMinPartSize,
+				0L);
+	}
+
+	public static S3RecoverableFsDataOutputStream recoverStream(
+			final RecoverableMultiPartUpload upload,
+			final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
+			final long userDefinedMinPartSize,
+			final long bytesBeforeCurrentPart) throws IOException {
+
+		checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE);
+
+		final RefCountedBufferingFileStream fileStream = boundedBufferingFileStream(
+				tmpFileCreator,
+				upload.getIncompletePart());
+
+		return new S3RecoverableFsDataOutputStream(
+				upload,
+				tmpFileCreator,
+				fileStream,
+				userDefinedMinPartSize,
+				bytesBeforeCurrentPart);
+	}
+
+	private static RefCountedBufferingFileStream boundedBufferingFileStream(
+			final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
+			final Optional<File> incompletePart) throws IOException {
+
+		if (!incompletePart.isPresent()) {
+			return RefCountedBufferingFileStream.openNew(tmpFileCreator);
+		}
+
+		final File file = incompletePart.get();
+		return RefCountedBufferingFileStream.restore(tmpFileCreator, file);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
new file mode 100644
index 00000000000..b201981f31a
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
+import org.apache.flink.fs.s3.common.utils.OffsetAwareOutputStream;
+import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+
+/**
+ * A factory for creating or recovering {@link RecoverableMultiPartUpload mulitpart uploads}.
+ */
+@Internal
+final class S3RecoverableMultipartUploadFactory {
+
+	private final org.apache.hadoop.fs.FileSystem fs;
+
+	private final S3MultiPartUploader twoPhaseUploader;
+
+	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier;
+
+	private final int maxConcurrentUploadsPerStream;
+
+	private final Executor executor;
+
+	S3RecoverableMultipartUploadFactory(
+			final FileSystem fs,
+			final S3MultiPartUploader twoPhaseUploader,
+			final int maxConcurrentUploadsPerStream,
+			final Executor executor,
+			final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) {
+
+		this.fs = Preconditions.checkNotNull(fs);
+		this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+		this.executor = executor;
+		this.twoPhaseUploader = twoPhaseUploader;
+		this.tmpFileSupplier = tmpFileSupplier;
+	}
+
+	RecoverableMultiPartUpload getNewRecoverableUpload(Path path) throws IOException {
+
+		return RecoverableMultiPartUploadImpl.newUpload(
+				twoPhaseUploader,
+				limitedExecutor(),
+				pathToObjectName(path));
+	}
+
+	RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) throws IOException {
+		final Optional<File> incompletePart = downloadLastDataChunk(recoverable);
+
+		return RecoverableMultiPartUploadImpl.recoverUpload(
+				twoPhaseUploader,
+				limitedExecutor(),
+				recoverable.uploadId(),
+				recoverable.getObjectName(),
+				recoverable.parts(),
+				recoverable.numBytesInParts(),
+				incompletePart);
+	}
+
+	@VisibleForTesting
+	Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws IOException {
+
+		final String objectName = recoverable.incompleteObjectName();
+		if (objectName == null) {
+			return Optional.empty();
+		}
+
+		// download the file (simple way)
+		final RefCountedFile fileAndStream = tmpFileSupplier.apply(null);
+		final File file = fileAndStream.getFile();
+
+		long numBytes = 0L;
+
+		try (
+				final OffsetAwareOutputStream outStream = fileAndStream.getStream();
+				final org.apache.hadoop.fs.FSDataInputStream inStream =
+						fs.open(new org.apache.hadoop.fs.Path('/' + objectName))
+		) {
+			final byte[] buffer = new byte[32 * 1024];
+
+			int numRead;
+			while ((numRead = inStream.read(buffer)) > 0) {
+				outStream.write(buffer, 0, numRead);
+				numBytes += numRead;
+			}
+		}
+
+		// some sanity checks
+		if (numBytes != file.length() || numBytes != fileAndStream.getStream().getLength()) {
+			throw new IOException(String.format("Error recovering writer: " +
+							"Downloading the last data chunk file gives incorrect length. " +
+							"File=%d bytes, Stream=%d bytes",
+					file.length(), numBytes));
+		}
+
+		if (numBytes != recoverable.incompleteObjectLength()) {
+			throw new IOException(String.format("Error recovering writer: " +
+							"Downloading the last data chunk file gives incorrect length." +
+							"File length is %d bytes, RecoveryData indicates %d bytes",
+					numBytes, recoverable.incompleteObjectLength()));
+		}
+
+		return Optional.of(file);
+	}
+
+	@VisibleForTesting
+	String pathToObjectName(final Path path) {
+		org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path);
+		if (!hadoopPath.isAbsolute()) {
+			hadoopPath = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath);
+		}
+
+		return hadoopPath.toUri().getScheme() != null && hadoopPath.toUri().getPath().isEmpty()
+				? ""
+				: hadoopPath.toUri().getPath().substring(1);
+	}
+
+	private Executor limitedExecutor() {
+		return maxConcurrentUploadsPerStream <= 0 ?
+				executor :
+				new BackPressuringExecutor(executor, maxConcurrentUploadsPerStream);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializer.java
new file mode 100644
index 00000000000..54f37fe414a
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import com.amazonaws.services.s3.model.PartETag;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer implementation for a {@link S3Recoverable}.
+ */
+@Internal
+final class S3RecoverableSerializer implements SimpleVersionedSerializer<S3Recoverable> {
+
+	static final S3RecoverableSerializer INSTANCE = new S3RecoverableSerializer();
+
+	private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+	private static final int MAGIC_NUMBER = 0x98761432;
+
+	/** Do not instantiate, use reusable {@link #INSTANCE} instead. */
+	private S3RecoverableSerializer() {}
+
+	@Override
+	public int getVersion() {
+		return 1;
+	}
+
+	@Override
+	public byte[] serialize(S3Recoverable obj) throws IOException {
+		final List<PartETag> partList = obj.parts();
+		final PartETag[] parts = partList.toArray(new PartETag[partList.size()]);
+
+		final byte[] keyBytes = obj.getObjectName().getBytes(CHARSET);
+		final byte[] uploadIdBytes = obj.uploadId().getBytes(CHARSET);
+
+		final byte[][] etags = new byte[parts.length][];
+		int partEtagBytes = 0;
+		for (int i = 0; i < parts.length; i++) {
+			etags[i] = parts[i].getETag().getBytes(CHARSET);
+			partEtagBytes += etags[i].length + 2 * Integer.BYTES;
+		}
+
+		final String lastObjectKey = obj.incompleteObjectName();
+		final byte[] lastPartBytes = lastObjectKey == null ? null : lastObjectKey.getBytes(CHARSET);
+
+		final byte[] targetBytes = new byte[
+						Integer.BYTES + // magic number
+						Integer.BYTES + keyBytes.length +
+						Integer.BYTES + uploadIdBytes.length +
+						Integer.BYTES + partEtagBytes +
+						Long.BYTES +
+						Integer.BYTES + (lastPartBytes == null ? 0 : lastPartBytes.length) +
+						Long.BYTES
+				];
+
+		ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+		bb.putInt(MAGIC_NUMBER);
+
+		bb.putInt(keyBytes.length);
+		bb.put(keyBytes);
+
+		bb.putInt(uploadIdBytes.length);
+		bb.put(uploadIdBytes);
+
+		bb.putInt(etags.length);
+		for (int i = 0; i < parts.length; i++) {
+			PartETag pe = parts[i];
+			bb.putInt(pe.getPartNumber());
+			bb.putInt(etags[i].length);
+			bb.put(etags[i]);
+		}
+
+		bb.putLong(obj.numBytesInParts());
+
+		if (lastPartBytes == null) {
+			bb.putInt(0);
+		}
+		else {
+			bb.putInt(lastPartBytes.length);
+			bb.put(lastPartBytes);
+		}
+
+		bb.putLong(obj.incompleteObjectLength());
+
+		return targetBytes;
+	}
+
+	@Override
+	public S3Recoverable deserialize(int version, byte[] serialized) throws IOException {
+		switch (version) {
+			case 1:
+				return deserializeV1(serialized);
+			default:
+				throw new IOException("Unrecognized version or corrupt state: " + version);
+		}
+	}
+
+	private static S3Recoverable deserializeV1(byte[] serialized) throws IOException {
+		final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+		if (bb.getInt() != MAGIC_NUMBER) {
+			throw new IOException("Corrupt data: Unexpected magic number.");
+		}
+
+		final byte[] keyBytes = new byte[bb.getInt()];
+		bb.get(keyBytes);
+
+		final byte[] uploadIdBytes = new byte[bb.getInt()];
+		bb.get(uploadIdBytes);
+
+		final int numParts = bb.getInt();
+		final ArrayList<PartETag> parts = new ArrayList<>(numParts);
+		for (int i = 0; i < numParts; i++) {
+			final int partNum = bb.getInt();
+			final byte[] buffer = new byte[bb.getInt()];
+			bb.get(buffer);
+			parts.add(new PartETag(partNum, new String(buffer, CHARSET)));
+		}
+
+		final long numBytes = bb.getLong();
+
+		final String lastPart;
+		final int lastObjectArraySize = bb.getInt();
+		if (lastObjectArraySize == 0) {
+			lastPart = null;
+		} else {
+			byte[] lastPartBytes = new byte[lastObjectArraySize];
+			bb.get(lastPartBytes);
+			lastPart = new String(lastPartBytes, CHARSET);
+		}
+
+		final long lastPartLength = bb.getLong();
+
+		return new S3Recoverable(
+				new String(keyBytes, CHARSET),
+				new String(uploadIdBytes, CHARSET),
+				parts,
+				numBytes,
+				lastPart,
+				lastPartLength);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
new file mode 100644
index 00000000000..2a84308d5b0
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.fs.s3.common.FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of the {@link RecoverableWriter} against S3.
+ *
+ * <p>This implementation makes heavy use of MultiPart Uploads in S3 to persist
+ * intermediate data as soon as possible.
+ *
+ * <p>This class partially reuses utility classes and implementations from the Hadoop
+ * project, specifically around configuring S3 requests and handling retries.
+ */
+@PublicEvolving
+public class S3RecoverableWriter implements RecoverableWriter {
+
+	private final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator;
+
+	private final long userDefinedMinPartSize;
+
+	private final S3RecoverableMultipartUploadFactory uploadFactory;
+
+	@VisibleForTesting
+	S3RecoverableWriter(
+			final S3RecoverableMultipartUploadFactory uploadFactory,
+			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
+			final long userDefinedMinPartSize) {
+
+		this.uploadFactory = Preconditions.checkNotNull(uploadFactory);
+		this.tempFileCreator = Preconditions.checkNotNull(tempFileCreator);
+		this.userDefinedMinPartSize = userDefinedMinPartSize;
+	}
+
+	@Override
+	public RecoverableFsDataOutputStream open(Path path) throws IOException {
+		final RecoverableMultiPartUpload upload = uploadFactory.getNewRecoverableUpload(path);
+
+		return S3RecoverableFsDataOutputStream.newStream(
+				upload,
+				tempFileCreator,
+				userDefinedMinPartSize);
+	}
+
+	@Override
+	public Committer recoverForCommit(RecoverableWriter.CommitRecoverable recoverable) throws IOException {
+		final S3Recoverable s3recoverable = castToS3Recoverable(recoverable);
+		final S3RecoverableFsDataOutputStream recovered = recover(s3recoverable);
+		return recovered.closeForCommit();
+	}
+
+	@Override
+	public S3RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable recoverable) throws IOException {
+		final S3Recoverable s3recoverable = castToS3Recoverable(recoverable);
+
+		final RecoverableMultiPartUpload upload = uploadFactory.recoverRecoverableUpload(s3recoverable);
+
+		return S3RecoverableFsDataOutputStream.recoverStream(
+				upload,
+				tempFileCreator,
+				userDefinedMinPartSize,
+				s3recoverable.numBytesInParts());
+	}
+
+	@Override
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitRecoverableSerializer() {
+		return (SimpleVersionedSerializer) S3RecoverableSerializer.INSTANCE;
+	}
+
+	@Override
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeRecoverableSerializer() {
+		return (SimpleVersionedSerializer) S3RecoverableSerializer.INSTANCE;
+	}
+
+	@Override
+	public boolean supportsResume() {
+		return true;
+	}
+
+	// --------------------------- Utils ---------------------------
+
+	private static S3Recoverable castToS3Recoverable(RecoverableWriter.CommitRecoverable recoverable) {
+		if (recoverable instanceof S3Recoverable) {
+			return (S3Recoverable) recoverable;
+		}
+		throw new IllegalArgumentException(
+				"S3 File System cannot recover recoverable for other file system: " + recoverable);
+	}
+
+	// --------------------------- Static Constructor ---------------------------
+
+	public static S3RecoverableWriter writer(
+			final FileSystem fs,
+			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
+			final S3MultiPartUploader twoPhaseUploader,
+			final Executor uploadThreadPool,
+			final long userDefinedMinPartSize,
+			final int maxConcurrentUploadsPerStream) {
+
+		checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE);
+
+		final S3RecoverableMultipartUploadFactory uploadFactory =
+				new S3RecoverableMultipartUploadFactory(
+						fs,
+						twoPhaseUploader,
+						maxConcurrentUploadsPerStream,
+						uploadThreadPool,
+						tempFileCreator);
+
+		return new S3RecoverableWriter(uploadFactory, tempFileCreator, userDefinedMinPartSize);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index 832bbf42e9a..ebb03599302 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -19,10 +19,14 @@
 package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import javax.annotation.Nullable;
+
 import java.net.URI;
 import java.util.Collections;
 
@@ -71,6 +75,12 @@ protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopC
 			return fsUri;
 		}
 
+		@Nullable
+		@Override
+		protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+			return null;
+		}
+
 		@Override
 		public String getScheme() {
 			return "test";
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
new file mode 100644
index 00000000000..5b7d1ccfb9f
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * Tests for the {@link RefCountedBufferingFileStream}.
+ */
+public class RefCountedBufferingFileStreamTest {
+
+	private static final int BUFFER_SIZE = 10;
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void testSmallWritesGoToBuffer() throws IOException {
+		RefCountedBufferingFileStream stream = getStreamToTest();
+
+		final byte[] contentToWrite = bytesOf("hello");
+		stream.write(contentToWrite);
+
+		Assert.assertEquals(contentToWrite.length, stream.getPositionInBuffer());
+		Assert.assertEquals(contentToWrite.length, stream.getPos());
+
+		stream.close();
+		stream.release();
+	}
+
+	@Test(expected = IOException.class)
+	public void testExceptionWhenWritingToClosedFile() throws IOException {
+		RefCountedBufferingFileStream stream = getStreamToTest();
+
+		final byte[] contentToWrite = bytesOf("hello");
+		stream.write(contentToWrite);
+
+		Assert.assertEquals(contentToWrite.length, stream.getPositionInBuffer());
+		Assert.assertEquals(contentToWrite.length, stream.getPos());
+
+		stream.close();
+
+		stream.write(contentToWrite);
+	}
+
+	@Test
+	public void testBigWritesGoToFile() throws IOException {
+		RefCountedBufferingFileStream stream = getStreamToTest();
+
+		final byte[] contentToWrite = bytesOf("hello big world");
+		stream.write(contentToWrite);
+
+		Assert.assertEquals(0, stream.getPositionInBuffer());
+		Assert.assertEquals(contentToWrite.length, stream.getPos());
+
+		stream.close();
+		stream.release();
+	}
+
+	@Test
+	public void testSpillingWhenBufferGetsFull() throws IOException {
+		RefCountedBufferingFileStream stream = getStreamToTest();
+
+		final byte[] firstContentToWrite = bytesOf("hello");
+		stream.write(firstContentToWrite);
+
+		Assert.assertEquals(firstContentToWrite.length, stream.getPositionInBuffer());
+		Assert.assertEquals(firstContentToWrite.length, stream.getPos());
+
+		final byte[] secondContentToWrite = bytesOf(" world!");
+		stream.write(secondContentToWrite);
+
+		Assert.assertEquals(secondContentToWrite.length, stream.getPositionInBuffer());
+		Assert.assertEquals(firstContentToWrite.length + secondContentToWrite.length, stream.getPos());
+
+		stream.close();
+		stream.release();
+	}
+
+	@Test
+	public void testFlush() throws IOException {
+		RefCountedBufferingFileStream stream = getStreamToTest();
+
+		final byte[] contentToWrite = bytesOf("hello");
+		stream.write(contentToWrite);
+
+		Assert.assertEquals(contentToWrite.length, stream.getPositionInBuffer());
+		Assert.assertEquals(contentToWrite.length, stream.getPos());
+
+		stream.flush();
+
+		Assert.assertEquals(0, stream.getPositionInBuffer());
+		Assert.assertEquals(contentToWrite.length, stream.getPos());
+
+		final byte[] contentRead = new byte[contentToWrite.length];
+		stream.getInputStream().read(contentRead, 0, contentRead.length);
+		Assert.assertTrue(Arrays.equals(contentToWrite, contentRead));
+
+		stream.release();
+	}
+
+	// ---------------------------- Utility Classes ----------------------------
+
+	private RefCountedBufferingFileStream getStreamToTest() throws IOException {
+		return new RefCountedBufferingFileStream(getRefCountedFileWithContent(), BUFFER_SIZE);
+	}
+
+	private RefCountedFile getRefCountedFileWithContent() throws IOException {
+		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
+		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+
+		return RefCountedFile.newFile(newFile, out);
+	}
+
+	private static byte[] bytesOf(String str) {
+		return str.getBytes(StandardCharsets.UTF_8);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
new file mode 100644
index 00000000000..1042683c916
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.UUID;
+
+/**
+ * Tests for the {@link RefCountedFile}.
+ */
+public class RefCountedFileTest {
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void releaseToZeroRefCounterShouldDeleteTheFile() throws IOException {
+		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
+		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+
+		RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
+		verifyTheFileIsStillThere();
+
+		fileUnderTest.release();
+
+		Assert.assertEquals(0L, Files.list(temporaryFolder.getRoot().toPath()).count());
+	}
+
+	@Test
+	public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOException {
+		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
+		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+
+		// the reference counter always starts with 1 (not 0). This is why we need +1 releases
+		RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
+		verifyTheFileIsStillThere();
+
+		fileUnderTest.retain();
+		fileUnderTest.retain();
+
+		Assert.assertEquals(3, fileUnderTest.getReferenceCounter());
+
+		fileUnderTest.release();
+		Assert.assertEquals(2, fileUnderTest.getReferenceCounter());
+		verifyTheFileIsStillThere();
+
+		fileUnderTest.release();
+		Assert.assertEquals(1, fileUnderTest.getReferenceCounter());
+		verifyTheFileIsStillThere();
+
+		fileUnderTest.release();
+		// the file is deleted now
+		Assert.assertEquals(0L, Files.list(temporaryFolder.getRoot().toPath()).count());
+	}
+
+	@Test
+	public void writeShouldSucceed() throws IOException {
+		byte[] content = bytesOf("hello world");
+
+		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent(content);
+		long fileLength = fileUnderTest.getLength();
+
+		Assert.assertEquals(content.length, fileLength);
+	}
+
+	@Test
+	public void closeShouldNotReleaseReference() throws IOException {
+		getClosedRefCountedFileWithContent("hello world");
+		verifyTheFileIsStillThere();
+	}
+
+	@Test(expected = IOException.class)
+	public void writeAfterCloseShouldThrowException() throws IOException {
+		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world");
+		byte[] content = bytesOf("Hello Again");
+		fileUnderTest.write(content, 0, content.length);
+	}
+
+	@Test(expected = IOException.class)
+	public void flushAfterCloseShouldThrowException() throws IOException {
+		final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world");
+		fileUnderTest.flush();
+	}
+
+	// ------------------------------------- Utilities -------------------------------------
+
+	private void verifyTheFileIsStillThere() throws IOException {
+		Assert.assertEquals(1L, Files.list(temporaryFolder.getRoot().toPath()).count());
+	}
+
+	private RefCountedFile getClosedRefCountedFileWithContent(String content) throws IOException {
+		return getClosedRefCountedFileWithContent(bytesOf(content));
+	}
+
+	private RefCountedFile getClosedRefCountedFileWithContent(byte[] content) throws IOException {
+		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
+		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+
+		final RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out);
+
+		fileUnderTest.write(content, 0, content.length);
+
+		fileUnderTest.closeStream();
+		return fileUnderTest;
+	}
+
+	private static byte[] bytesOf(String str) {
+		return str.getBytes(StandardCharsets.UTF_8);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java
new file mode 100644
index 00000000000..a3164f18a16
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link RecoverableMultiPartUploadImpl#incompleteObjectNamePrefix(String)}.
+ */
+public class IncompletePartPrefixTest {
+
+	@Test(expected = NullPointerException.class)
+	public void nullObjectNameShouldThroughException() {
+		RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix(null);
+	}
+
+	@Test
+	public void emptyInitialNameShouldSucceed() {
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("");
+		Assert.assertEquals("_tmp_", objectNamePrefix);
+	}
+
+	@Test
+	public void nameWithoutSlashShouldSucceed() {
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("no_slash_path");
+		Assert.assertEquals("_no_slash_path_tmp_", objectNamePrefix);
+	}
+
+	@Test
+	public void nameWithOnlySlashShouldSucceed() {
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("/");
+		Assert.assertEquals("/_tmp_", objectNamePrefix);
+	}
+
+	@Test
+	public void normalPathShouldSucceed() {
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("/root/home/test-file");
+		Assert.assertEquals("/root/home/_test-file_tmp_", objectNamePrefix);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
new file mode 100644
index 00000000000..72554e19773
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
+import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.util.MathUtils;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+
+/**
+ * Tests for the {@link RecoverableMultiPartUploadImpl}.
+ */
+public class RecoverableMultiPartUploadImplTest {
+
+	private static final int BUFFER_SIZE = 10;
+
+	private static final String TEST_OBJECT_NAME = "TEST-OBJECT";
+
+	private StubMultiPartUploader stubMultiPartUploader;
+
+	private RecoverableMultiPartUploadImpl multiPartUploadUnderTest;
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws IOException {
+		stubMultiPartUploader = new StubMultiPartUploader();
+		multiPartUploadUnderTest = RecoverableMultiPartUploadImpl
+				.newUpload(stubMultiPartUploader, new MainThreadExecutor(), TEST_OBJECT_NAME);
+	}
+
+	@Test
+	public void singlePartUploadShouldBeIncluded() throws IOException {
+		final byte[] part = bytesOf("hello world");
+
+		uploadPart(part);
+
+		assertThat(stubMultiPartUploader, hasMultiPartUploadWithPart(1, part));
+	}
+
+	@Test
+	public void incompletePartShouldBeUploadedAsIndividualObject() throws IOException {
+		final byte[] incompletePart = bytesOf("Hi!");
+
+		uploadObject(incompletePart);
+
+		assertThat(stubMultiPartUploader, hasUploadedObject(incompletePart));
+	}
+
+	@Test
+	public void multiplePartAndObjectUploadsShouldBeIncluded() throws IOException {
+		final byte[] firstCompletePart = bytesOf("hello world");
+		final byte[] secondCompletePart = bytesOf("hello again");
+		final byte[] thirdIncompletePart = bytesOf("!!!");
+
+		uploadPart(firstCompletePart);
+		uploadPart(secondCompletePart);
+		uploadObject(thirdIncompletePart);
+
+		assertThat(
+				stubMultiPartUploader,
+				allOf(
+						hasMultiPartUploadWithPart(1, firstCompletePart),
+						hasMultiPartUploadWithPart(2, secondCompletePart),
+						hasUploadedObject(thirdIncompletePart)
+				)
+		);
+	}
+
+	@Test
+	public void multiplePartAndObjectUploadsShouldBeReflectedInRecoverable() throws IOException {
+		final byte[] firstCompletePart = bytesOf("hello world");
+		final byte[] secondCompletePart = bytesOf("hello again");
+		final byte[] thirdIncompletePart = bytesOf("!!!");
+
+		uploadPart(firstCompletePart);
+		uploadPart(secondCompletePart);
+
+		final S3Recoverable recoverable = uploadObject(thirdIncompletePart);
+
+		assertThat(recoverable, isEqualTo(thirdIncompletePart, firstCompletePart, secondCompletePart));
+	}
+
+	@Test
+	public void s3RecoverableReflectsTheLatestPartialObject() throws IOException {
+		final byte[] incompletePartOne = bytesOf("AB");
+		final byte[] incompletePartTwo = bytesOf("ABC");
+
+		S3Recoverable recoverableOne = uploadObject(incompletePartOne);
+		S3Recoverable recoverableTwo = uploadObject(incompletePartTwo);
+
+		assertThat(
+				recoverableTwo.incompleteObjectName(),
+				not(equalTo(recoverableOne.incompleteObjectName())));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void uploadingNonClosedFileAsCompleteShouldThroughException() throws IOException {
+		final byte[] incompletePart = bytesOf("!!!");
+
+		final RefCountedBufferingFileStream incompletePartFile =
+				writeContent(incompletePart);
+
+		multiPartUploadUnderTest.uploadPart(incompletePartFile);
+	}
+
+	// --------------------------------- Matchers ---------------------------------
+
+	private static TypeSafeMatcher<StubMultiPartUploader> hasMultiPartUploadWithPart(
+			final int partNo, final byte[] content) {
+
+		final TestUploadPartResult expectedCompletePart =
+				createUploadPartResult(TEST_OBJECT_NAME, partNo, content);
+
+		return new TypeSafeMatcher<StubMultiPartUploader>() {
+
+			@Override
+			protected boolean matchesSafely(StubMultiPartUploader testMultipartUploader) {
+				final List<TestUploadPartResult> actualCompleteParts =
+						testMultipartUploader.getCompletePartsUploaded();
+
+				for (TestUploadPartResult result : actualCompleteParts) {
+					if (result.equals(expectedCompletePart)) {
+						return true;
+					}
+				}
+				return false;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description
+						.appendText("a TestMultiPartUploader with complete part=")
+						.appendValue(expectedCompletePart);
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<StubMultiPartUploader> hasUploadedObject(final byte[] content) {
+
+		final TestPutObjectResult expectedIncompletePart =
+				createPutObjectResult(TEST_OBJECT_NAME, content);
+
+		return new TypeSafeMatcher<StubMultiPartUploader>() {
+
+			@Override
+			protected boolean matchesSafely(StubMultiPartUploader testMultipartUploader) {
+				final List<TestPutObjectResult> actualIncompleteParts =
+						testMultipartUploader.getIncompletePartsUploaded();
+
+				for (TestPutObjectResult result : actualIncompleteParts) {
+					if (result.equals(expectedIncompletePart)) {
+						return true;
+					}
+				}
+				return false;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description
+						.appendText("a TestMultiPartUploader with complete parts=")
+						.appendValue(expectedIncompletePart);
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<S3Recoverable> isEqualTo(byte[] incompletePart, byte[]... completeParts) {
+		return new TypeSafeMatcher<S3Recoverable>() {
+
+			private final S3Recoverable expectedRecoverable =
+					createS3Recoverable(incompletePart, completeParts);
+
+			@Override
+			protected boolean matchesSafely(S3Recoverable actualRecoverable) {
+
+				return Objects.equals(expectedRecoverable.getObjectName(), actualRecoverable.getObjectName())
+						&& Objects.equals(expectedRecoverable.uploadId(), actualRecoverable.uploadId())
+						&& expectedRecoverable.numBytesInParts() == actualRecoverable.numBytesInParts()
+						&& expectedRecoverable.incompleteObjectLength() == actualRecoverable.incompleteObjectLength()
+						&& compareLists(expectedRecoverable.parts(), actualRecoverable.parts());
+			}
+
+			private boolean compareLists(final List<PartETag> first, final List<PartETag> second) {
+				return Arrays.equals(
+						first.stream().map(PartETag::getETag).toArray(),
+						second.stream().map(PartETag::getETag).toArray()
+				);
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText(expectedRecoverable + " with ignored LAST_PART_OBJECT_NAME.");
+			}
+		};
+	}
+
+	// ---------------------------------- Test Methods -------------------------------------------
+
+	private static byte[] bytesOf(String str) {
+		return str.getBytes(StandardCharsets.UTF_8);
+	}
+
+	private static S3Recoverable createS3Recoverable(byte[] incompletePart, byte[]... completeParts) {
+		final List<PartETag> eTags = new ArrayList<>();
+
+		int index = 1;
+		long bytesInPart = 0L;
+		for (byte[] part : completeParts) {
+			eTags.add(new PartETag(index, createETag(TEST_OBJECT_NAME, index)));
+			bytesInPart += part.length;
+			index++;
+		}
+
+		return new S3Recoverable(
+				TEST_OBJECT_NAME,
+				createMPUploadId(TEST_OBJECT_NAME),
+				eTags,
+				bytesInPart,
+				"IGNORED-DUE-TO-RANDOMNESS",
+				(long) incompletePart.length);
+	}
+
+	private static RecoverableMultiPartUploadImplTest.TestPutObjectResult createPutObjectResult(String key, byte[] content) {
+		final RecoverableMultiPartUploadImplTest.TestPutObjectResult result = new RecoverableMultiPartUploadImplTest.TestPutObjectResult();
+		result.setETag(createETag(key, -1));
+		result.setContent(content);
+		return result;
+	}
+
+	private static RecoverableMultiPartUploadImplTest.TestUploadPartResult createUploadPartResult(String key, int number, byte[] payload) {
+		final RecoverableMultiPartUploadImplTest.TestUploadPartResult result = new RecoverableMultiPartUploadImplTest.TestUploadPartResult();
+		result.setETag(createETag(key, number));
+		result.setPartNumber(number);
+		result.setContent(payload);
+		return result;
+	}
+
+	private static String createMPUploadId(String key) {
+		return "MPU-" + key;
+	}
+
+	private static String createETag(String key, int partNo) {
+		return "ETAG-" + key + '-' + partNo;
+	}
+
+	private S3Recoverable uploadObject(byte[] content) throws IOException {
+		final RefCountedBufferingFileStream incompletePartFile = writeContent(content);
+		incompletePartFile.flush();
+
+		// as in the production code, we assume that a file containing
+		// a in-progress part is flushed but not closed before being passed
+		// to the uploader.
+
+		return multiPartUploadUnderTest.snapshotAndGetRecoverable(incompletePartFile);
+	}
+
+	private void uploadPart(final byte[] content) throws IOException {
+		RefCountedBufferingFileStream partFile = writeContent(content);
+
+		// as in the production code, we assume that a file containing
+		// a completed part is closed before being passed to the uploader.
+
+		partFile.close();
+
+		multiPartUploadUnderTest.uploadPart(partFile);
+	}
+
+	private RefCountedBufferingFileStream writeContent(byte[] content) throws IOException {
+		final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
+		final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+
+		final RefCountedBufferingFileStream testStream =
+				new RefCountedBufferingFileStream(RefCountedFile.newFile(newFile, out), BUFFER_SIZE);
+
+		testStream.write(content, 0, content.length);
+		return testStream;
+	}
+
+	// ---------------------------------- Test Classes -------------------------------------------
+
+	/**
+	 * A simple executor that executes the runnable on the main thread.
+	 */
+	private static class MainThreadExecutor implements Executor {
+
+		@Override
+		public void execute(Runnable command) {
+			command.run();
+		}
+	}
+
+	/**
+	 * A {@link S3MultiPartUploader} that simulates uploading part files to S3 by
+	 * simply putting complete and incomplete part files in lists for further validation.
+	 */
+	private static class StubMultiPartUploader implements S3MultiPartUploader {
+
+		private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
+		private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
+
+		public List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
+			return completePartsUploaded;
+		}
+
+		public List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
+			return incompletePartsUploaded;
+		}
+
+		@Override
+		public String startMultiPartUpload(String key) throws IOException {
+			return createMPUploadId(key);
+		}
+
+		@Override
+		public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException {
+			final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
+			return storeAndGetUploadPartResult(key, partNumber, content);
+		}
+
+		@Override
+		public PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException {
+			final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
+			return storeAndGetPutObjectResult(key, content);
+		}
+
+		@Override
+		public CompleteMultipartUploadResult commitMultiPartUpload(
+				String key,
+				String uploadId,
+				List<PartETag> partETags,
+				long length,
+				AtomicInteger errorCount) throws IOException {
+			return null;
+		}
+
+		@Override
+		public ObjectMetadata getObjectMetadata(String key) throws IOException {
+			return null;
+		}
+
+		private byte[] getFileContentBytes(InputStream file, int length) throws IOException {
+			final byte[] content = new byte[length];
+			file.read(content, 0, length);
+			return content;
+		}
+
+		private RecoverableMultiPartUploadImplTest.TestUploadPartResult storeAndGetUploadPartResult(String key, int number, byte[] payload) {
+			final RecoverableMultiPartUploadImplTest.TestUploadPartResult result = createUploadPartResult(key, number, payload);
+			completePartsUploaded.add(result);
+			return result;
+		}
+
+		private RecoverableMultiPartUploadImplTest.TestPutObjectResult storeAndGetPutObjectResult(String key, byte[] payload) {
+			final RecoverableMultiPartUploadImplTest.TestPutObjectResult result = createPutObjectResult(key, payload);
+			incompletePartsUploaded.add(result);
+			return result;
+		}
+	}
+
+	/**
+	 * A {@link PutObjectResult} that also contains the actual content of the uploaded part.
+	 */
+	private static class TestPutObjectResult extends PutObjectResult {
+		private static final long serialVersionUID = 1L;
+
+		private byte[] content;
+
+		void setContent(byte[] payload) {
+			this.content = payload;
+		}
+
+		public byte[] getContent() {
+			return content;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			final TestPutObjectResult that = (TestPutObjectResult) o;
+			// we ignore the etag as it contains randomness
+			return Arrays.equals(getContent(), that.getContent());
+		}
+
+		@Override
+		public int hashCode() {
+			return Arrays.hashCode(getContent());
+		}
+
+		@Override
+		public String toString() {
+			return '{' +
+					" eTag=" + getETag() +
+					", payload=" + Arrays.toString(content) +
+					'}';
+		}
+	}
+
+	/**
+	 * A {@link UploadPartResult} that also contains the actual content of the uploaded part.
+	 */
+	private static class TestUploadPartResult extends UploadPartResult {
+
+		private static final long serialVersionUID = 1L;
+
+		private byte[] content;
+
+		void setContent(byte[] content) {
+			this.content = content;
+		}
+
+		public byte[] getContent() {
+			return content;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			final TestUploadPartResult that = (TestUploadPartResult) o;
+			return getETag().equals(that.getETag())
+					&& getPartNumber() == that.getPartNumber()
+					&& Arrays.equals(content, that.content);
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 * Objects.hash(getETag(), getPartNumber()) + Arrays.hashCode(getContent());
+		}
+
+		@Override
+		public String toString() {
+			return '{' +
+					"etag=" + getETag() +
+					", partNo=" + getPartNumber() +
+					", content=" + Arrays.toString(content) +
+					'}';
+		}
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
new file mode 100644
index 00000000000..7a32392f885
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
+import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
+import org.apache.flink.fs.s3.common.utils.RefCountedFile;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.SplittableRandom;
+import java.util.UUID;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for the {@link S3RecoverableFsDataOutputStream}.
+ */
+public class S3RecoverableFsDataOutputStreamTest {
+
+	private static final long USER_DEFINED_MIN_PART_SIZE = 10L;
+
+	private TestMultipartUpload multipartUploadUnderTest;
+
+	private TestFileProvider fileProvider;
+
+	private S3RecoverableFsDataOutputStream streamUnderTest;
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Before
+	public void beforeTest() throws IOException {
+		fileProvider = new TestFileProvider(TEMP_FOLDER);
+
+		multipartUploadUnderTest = new TestMultipartUpload(fileProvider);
+
+		RefCountedBufferingFileStream fileStream = RefCountedBufferingFileStream.openNew(fileProvider);
+
+		streamUnderTest = new S3RecoverableFsDataOutputStream(
+				multipartUploadUnderTest,
+				fileProvider,
+				fileStream,
+				USER_DEFINED_MIN_PART_SIZE,
+				0L
+		);
+	}
+
+	@Test
+	public void simpleUsage() throws IOException {
+		streamUnderTest.write(bytesOf("hello world"));
+
+		RecoverableFsDataOutputStream.Committer committer = streamUnderTest.closeForCommit();
+		committer.commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world")));
+	}
+
+	@Test
+	public void noWritesShouldResolveInAnEmptyFile() throws IOException {
+		RecoverableFsDataOutputStream.Committer committer = streamUnderTest.closeForCommit();
+		committer.commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(new byte[0]));
+	}
+
+	@Test
+	public void closingWithoutCommittingDiscardsTheData() throws IOException {
+		streamUnderTest.write(bytesOf("hello world"));
+
+		streamUnderTest.close();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("")));
+	}
+
+	@Test
+	public void twoWritesAreConcatenated() throws IOException {
+		streamUnderTest.write(bytesOf("hello"));
+		streamUnderTest.write(bytesOf(" "));
+		streamUnderTest.write(bytesOf("world"));
+
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world")));
+	}
+
+	@Test
+	public void writeLargeFile() throws IOException {
+		List<byte[]> testDataBuffers = createRandomLargeTestDataBuffers();
+
+		for (byte[] buffer : testDataBuffers) {
+			streamUnderTest.write(buffer);
+		}
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(testDataBuffers));
+	}
+
+	@Test
+	public void simpleRecovery() throws IOException {
+		streamUnderTest.write(bytesOf("hello"));
+
+		streamUnderTest.persist();
+
+		streamUnderTest = reopenStreamUnderTestAfterRecovery();
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello")));
+	}
+
+	@Test
+	public void multiplePersistsDoesNotIntroduceJunk() throws IOException {
+		streamUnderTest.write(bytesOf("hello"));
+
+		streamUnderTest.persist();
+		streamUnderTest.persist();
+		streamUnderTest.persist();
+		streamUnderTest.persist();
+
+		streamUnderTest.write(bytesOf(" "));
+		streamUnderTest.write(bytesOf("world"));
+
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world")));
+	}
+
+	@Test
+	public void multipleWritesAndPersists() throws IOException {
+		streamUnderTest.write(bytesOf("a"));
+
+		streamUnderTest.persist();
+		streamUnderTest.write(bytesOf("b"));
+
+		streamUnderTest.persist();
+		streamUnderTest.write(bytesOf("c"));
+
+		streamUnderTest.persist();
+		streamUnderTest.write(bytesOf("d"));
+
+		streamUnderTest.persist();
+		streamUnderTest.write(bytesOf("e"));
+
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("abcde")));
+	}
+
+	@Test
+	public void multipleWritesAndPersistsWithBigChunks() throws IOException {
+		List<byte[]> testDataBuffers = createRandomLargeTestDataBuffers();
+
+		for (byte[] buffer : testDataBuffers) {
+			streamUnderTest.write(buffer);
+			streamUnderTest.persist();
+		}
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(testDataBuffers));
+	}
+
+	@Test
+	public void addDataAfterRecovery() throws IOException {
+		streamUnderTest.write(bytesOf("hello"));
+
+		streamUnderTest.persist();
+
+		streamUnderTest = reopenStreamUnderTestAfterRecovery();
+		streamUnderTest.write(bytesOf(" "));
+		streamUnderTest.write(bytesOf("world"));
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world")));
+	}
+
+	@Test
+	public void discardingUnpersistedNotYetUploadedData() throws IOException {
+		streamUnderTest.write(bytesOf("hello"));
+
+		streamUnderTest.persist();
+
+		streamUnderTest.write(bytesOf("goodbye"));
+		streamUnderTest = reopenStreamUnderTestAfterRecovery();
+
+		streamUnderTest.write(bytesOf(" world"));
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world")));
+	}
+
+	@Test
+	public void discardingUnpersistedUploadedData() throws IOException {
+		streamUnderTest.write(bytesOf("hello"));
+
+		streamUnderTest.persist();
+		streamUnderTest.write(randomBuffer(RefCountedBufferingFileStream.BUFFER_SIZE + 1));
+		streamUnderTest = reopenStreamUnderTestAfterRecovery();
+
+		streamUnderTest.write(bytesOf(" world"));
+		streamUnderTest.closeForCommit().commit();
+
+		assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world")));
+	}
+
+	@Test
+	public void commitEmptyStreamShouldBeSuccessful() throws IOException {
+		streamUnderTest.closeForCommit().commit();
+	}
+
+	@Test(expected = IOException.class)
+	public void closeForCommitOnClosedStreamShouldFail() throws IOException {
+		streamUnderTest.closeForCommit().commit();
+		streamUnderTest.closeForCommit().commit();
+	}
+
+
+
+	// ------------------------------------------------------------------------------------------------------------
+	// Utils
+	// ------------------------------------------------------------------------------------------------------------
+
+	private S3RecoverableFsDataOutputStream reopenStreamUnderTestAfterRecovery() throws IOException {
+		final long bytesBeforeCurrentPart = multipartUploadUnderTest.numBytes;
+		final Optional<File> incompletePart = multipartUploadUnderTest.getIncompletePart();
+
+		RefCountedBufferingFileStream fileStream = RefCountedBufferingFileStream.restore(fileProvider, incompletePart.get());
+		multipartUploadUnderTest.discardUnpersistedData();
+
+		return new S3RecoverableFsDataOutputStream(
+				multipartUploadUnderTest,
+				fileProvider,
+				fileStream,
+				USER_DEFINED_MIN_PART_SIZE,
+				bytesBeforeCurrentPart
+		);
+	}
+
+	private static List<byte[]> createRandomLargeTestDataBuffers() {
+		final List<byte[]> testData = new ArrayList<>();
+		final SplittableRandom random = new SplittableRandom();
+
+		long totalSize = 0L;
+
+		int expectedSize = (int) random.nextLong(USER_DEFINED_MIN_PART_SIZE * 5L, USER_DEFINED_MIN_PART_SIZE * 100L);
+		while (totalSize < expectedSize) {
+
+			int len = random.nextInt(0, (int) (2L * USER_DEFINED_MIN_PART_SIZE));
+			byte[] buffer = randomBuffer(random, len);
+			totalSize += buffer.length;
+			testData.add(buffer);
+		}
+		return testData;
+	}
+
+	private static byte[] randomBuffer(int len) {
+		final SplittableRandom random = new SplittableRandom();
+		return randomBuffer(random, len);
+	}
+
+	private static byte[] randomBuffer(SplittableRandom random, int len) {
+		byte[] buffer = new byte[len];
+		for (int i = 0; i < buffer.length; i++) {
+			buffer[i] = (byte) (random.nextInt() & 0xFF);
+		}
+		return buffer;
+	}
+
+	private static byte[] bytesOf(String str) {
+		return str.getBytes(StandardCharsets.UTF_8);
+	}
+
+	private static byte[] readFileContents(RefCountedFSOutputStream file) throws IOException {
+		final byte[] content = new byte[MathUtils.checkedDownCast(file.getPos())];
+		try (InputStream inputStream = file.getInputStream()) {
+			int bytesRead = inputStream.read(content, 0, content.length); // TODO: 10/2/18 see if closed in download
+			Assert.assertEquals(file.getPos(), bytesRead);
+		}
+		return content;
+	}
+
+	// ------------------------------------------------------------------------------------------------------------
+	// Matchers
+	// ------------------------------------------------------------------------------------------------------------
+
+	private static TypeSafeMatcher<TestMultipartUpload> hasContent(final byte[] expectedContent) {
+		return new TypeSafeMatcher<TestMultipartUpload>() {
+			@Override
+			protected boolean matchesSafely(TestMultipartUpload testMultipartUpload) {
+				return Arrays.equals(
+						testMultipartUpload.getPublishedContents(),
+						expectedContent
+				);
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a TestMultipartUpload with contents='")
+						.appendValue(expectedContent)
+						.appendText("'");
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<TestMultipartUpload> hasContent(final Collection<byte[]> expectedContents) throws IOException {
+
+		ByteArrayOutputStream stream = new ByteArrayOutputStream();
+		for (byte[] c : expectedContents) {
+			stream.write(c);
+		}
+
+		byte[] expectedContent = stream.toByteArray();
+
+		return new TypeSafeMatcher<TestMultipartUpload>() {
+			@Override
+			protected boolean matchesSafely(TestMultipartUpload testMultipartUpload) {
+				return Arrays.equals(
+						testMultipartUpload.getPublishedContents(),
+						expectedContent
+				);
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a TestMultipartUpload with contents='")
+						.appendValue(expectedContent)
+						.appendText("'");
+			}
+		};
+	}
+
+	// ------------------------------------------------------------------------------------------------------------
+	// Test Classes
+	// ------------------------------------------------------------------------------------------------------------
+
+	private static class TestMultipartUpload implements RecoverableMultiPartUpload {
+
+		private final TestFileProvider fileProvider;
+
+		private List<byte[]> uploadedContent = new ArrayList<>();
+
+		private int lastPersistedIndex;
+
+		private int numParts;
+
+		private long numBytes;
+
+		private byte[] published;
+
+		private final ByteArrayOutputStream publishedContents = new ByteArrayOutputStream();
+
+		private Optional<byte[]> uncompleted = Optional.empty();
+
+		TestMultipartUpload(TestFileProvider fileProvider) {
+			this.published = new byte[0];
+			this.lastPersistedIndex = 0;
+			this.fileProvider = fileProvider;
+		}
+
+		public void discardUnpersistedData() {
+			uploadedContent = uploadedContent.subList(0, lastPersistedIndex);
+		}
+
+		@Override
+		public Optional<File> getIncompletePart() {
+			if (!uncompleted.isPresent()) {
+				return Optional.empty();
+			}
+			byte[] uncompletedBytes = uncompleted.get();
+			try {
+				File uncompletedTempFile = fileProvider.apply(null).getFile();
+				Files.write(uncompletedTempFile.toPath(), uncompletedBytes);
+				return Optional.of(uncompletedTempFile);
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public RecoverableFsDataOutputStream.Committer snapshotAndGetCommitter() throws IOException {
+			lastPersistedIndex = uploadedContent.size();
+
+			return new RecoverableFsDataOutputStream.Committer() {
+				@Override
+				public void commit() throws IOException {
+					published = getPublishedContents();
+					uploadedContent.clear();
+					lastPersistedIndex = 0;
+				}
+
+				@Override
+				public void commitAfterRecovery() throws IOException {
+					if (published.length == 0) {
+						commit();
+					}
+				}
+
+				@Override
+				public RecoverableWriter.CommitRecoverable getRecoverable() {
+					return null;
+				}
+			};
+		}
+
+		@Override
+		public RecoverableWriter.ResumeRecoverable snapshotAndGetRecoverable(RefCountedFSOutputStream incompletePartFile) throws IOException {
+			lastPersistedIndex = uploadedContent.size();
+
+			if (incompletePartFile.getPos() >= 0L) {
+				byte[] bytes = readFileContents(incompletePartFile);
+				uncompleted = Optional.of(bytes);
+			}
+
+			return null;
+		}
+
+		@Override
+		public void uploadPart(RefCountedFSOutputStream file) throws IOException {
+			numParts++;
+			numBytes += file.getPos();
+
+			uploadedContent.add(readFileContents(file));
+		}
+
+		public byte[] getPublishedContents() {
+			for (int i = 0; i < lastPersistedIndex; i++) {
+				try {
+					publishedContents.write(uploadedContent.get(i));
+				} catch (IOException e) {
+					throw new RuntimeException(e);
+				}
+			}
+			return publishedContents.toByteArray();
+		}
+
+		@Override
+		public String toString() {
+			return "TestMultipartUpload{" +
+					"contents="
+					+ Arrays.toString(published)
+					+ '}';
+		}
+	}
+
+	private static class TestFileProvider implements FunctionWithException<File, RefCountedFile, IOException> {
+
+		private final TemporaryFolder folder;
+
+		TestFileProvider(TemporaryFolder folder) {
+			this.folder = Preconditions.checkNotNull(folder);
+		}
+
+		@Override
+		public RefCountedFile apply(@Nullable File file) throws IOException {
+			while (true) {
+				try {
+					if (file == null) {
+						final File newFile = new File(folder.getRoot(), ".tmp_" + UUID.randomUUID());
+						final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+						return RefCountedFile.newFile(newFile, out);
+					} else {
+						final OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
+						return RefCountedFile.restoredFile(file, out, file.length());
+					}
+				} catch (FileAlreadyExistsException e) {
+					// fall through the loop and retry
+				}
+			}
+		}
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializerTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializerTest.java
new file mode 100644
index 00000000000..3e7833a5f82
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableSerializerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.writer;
+
+import com.amazonaws.services.s3.model.PartETag;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for the {@link S3RecoverableSerializer}.
+ */
+public class S3RecoverableSerializerTest {
+
+	private final S3RecoverableSerializer serializer = S3RecoverableSerializer.INSTANCE;
+
+	private static final String TEST_OBJECT_NAME = "TEST-OBJECT";
+
+	private static final String TEST_UPLOAD_ID = "TEST-UPLOAD-ID";
+
+	private static final String INCOMPLETE_OBJECT_NAME = "TEST-INCOMPLETE-PART";
+
+	private static final String ETAG_PREFIX = "TEST-ETAG-";
+
+	@Test
+	public void serializeEmptyS3Recoverable() throws IOException {
+		S3Recoverable originalEmptyRecoverable = createTestS3Recoverable(false);
+
+		byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+		S3Recoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+		assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+	}
+
+	@Test
+	public void serializeS3RecoverableWithoutIncompleteObject() throws IOException {
+		S3Recoverable originalNoIncompletePartRecoverable = createTestS3Recoverable(false, 1, 5, 9);
+
+		byte[] serializedRecoverable = serializer.serialize(originalNoIncompletePartRecoverable);
+		S3Recoverable copiedNoIncompletePartRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+		assertThat(originalNoIncompletePartRecoverable, isEqualTo(copiedNoIncompletePartRecoverable));
+	}
+
+	@Test
+	public void serializeS3RecoverableOnlyWithIncompleteObject() throws IOException {
+		S3Recoverable originalOnlyIncompletePartRecoverable = createTestS3Recoverable(true);
+
+		byte[] serializedRecoverable = serializer.serialize(originalOnlyIncompletePartRecoverable);
+		S3Recoverable copiedOnlyIncompletePartRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+		assertThat(originalOnlyIncompletePartRecoverable, isEqualTo(copiedOnlyIncompletePartRecoverable));
+	}
+
+	@Test
+	public void serializeS3RecoverableWithCompleteAndIncompleteParts() throws IOException {
+		S3Recoverable originalFullRecoverable = createTestS3Recoverable(true, 1, 5, 9);
+
+		byte[] serializedRecoverable = serializer.serialize(originalFullRecoverable);
+		S3Recoverable copiedFullRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+		assertThat(originalFullRecoverable, isEqualTo(copiedFullRecoverable));
+	}
+
+	// --------------------------------- Matchers ---------------------------------
+
+	private static TypeSafeMatcher<S3Recoverable> isEqualTo(S3Recoverable expectedRecoverable) {
+		return new TypeSafeMatcher<S3Recoverable>() {
+
+			@Override
+			protected boolean matchesSafely(S3Recoverable actualRecoverable) {
+
+				return Objects.equals(expectedRecoverable.getObjectName(), actualRecoverable.getObjectName())
+						&& Objects.equals(expectedRecoverable.uploadId(), actualRecoverable.uploadId())
+						&& expectedRecoverable.numBytesInParts() == actualRecoverable.numBytesInParts()
+						&& Objects.equals(expectedRecoverable.incompleteObjectName(), actualRecoverable.incompleteObjectName())
+						&& expectedRecoverable.incompleteObjectLength() == actualRecoverable.incompleteObjectLength()
+						&& compareLists(expectedRecoverable.parts(), actualRecoverable.parts());
+			}
+
+			private boolean compareLists(final List<PartETag> first, final List<PartETag> second) {
+				return Arrays.equals(
+						first.stream().map(PartETag::getETag).toArray(),
+						second.stream().map(PartETag::getETag).toArray()
+				);
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText(expectedRecoverable + " with ignored LAST_PART_OBJECT_NAME.");
+			}
+		};
+	}
+
+	// --------------------------------- Test Utils ---------------------------------
+
+	private static S3Recoverable createTestS3Recoverable(boolean withIncompletePart, int... partNumbers) {
+		List<PartETag> etags = new ArrayList<>();
+		for (int i : partNumbers) {
+			etags.add(createEtag(i));
+		}
+
+		if (withIncompletePart) {
+			return new S3Recoverable(TEST_OBJECT_NAME, TEST_UPLOAD_ID, etags, 12345L, INCOMPLETE_OBJECT_NAME, 54321L);
+		} else {
+			return new S3Recoverable(TEST_OBJECT_NAME, TEST_UPLOAD_ID, etags, 12345L);
+		}
+	}
+
+	private static PartETag createEtag(int partNumber) {
+		return new PartETag(partNumber, ETAG_PREFIX + partNumber);
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
new file mode 100644
index 00000000000..f446f70e2a7
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.util.MathUtils;
+
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link S3MultiPartUploader} for the Hadoop S3A filesystem.
+ */
+public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
+
+	private final S3AFileSystem s3a;
+
+	private final InternalWriteOperationHelper s3uploader;
+
+	public HadoopS3MultiPartUploader(S3AFileSystem s3a, Configuration conf) {
+		this.s3uploader = new InternalWriteOperationHelper(
+				checkNotNull(s3a),
+				checkNotNull(conf)
+		);
+		this.s3a = s3a;
+	}
+
+	@Override
+	public String startMultiPartUpload(String key) throws IOException {
+		return s3uploader.initiateMultiPartUpload(key);
+	}
+
+	@Override
+	public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException {
+		final UploadPartRequest uploadRequest = s3uploader.newUploadPartRequest(
+				key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L);
+		return s3uploader.uploadPart(uploadRequest);
+	}
+
+	@Override
+	public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException {
+		final PutObjectRequest putRequest = s3uploader.createPutObjectRequest(key, inputStream, length);
+		return s3uploader.putObject(putRequest);
+	}
+
+	@Override
+	public CompleteMultipartUploadResult commitMultiPartUpload(String destKey, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException {
+		return s3uploader.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
+	}
+
+	@Override
+	public ObjectMetadata getObjectMetadata(String key) throws IOException {
+		try {
+			return s3a.getObjectMetadata(new Path('/' + key));
+		}
+		catch (SdkBaseException e) {
+			throw S3AUtils.translateException("getObjectMetadata", key, e);
+		}
+	}
+
+	/**
+	 * Internal {@link WriteOperationHelper} that is wrapped so that it only exposes
+	 * the functionality we need for the {@link S3MultiPartUploader}.
+	 */
+	private static final class InternalWriteOperationHelper extends WriteOperationHelper {
+
+		InternalWriteOperationHelper(S3AFileSystem owner, Configuration conf) {
+			super(owner, conf);
+		}
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 2b46dbdc671..897629f7910 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -21,11 +21,15 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
+import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.net.URI;
 import java.util.Collections;
 import java.util.Set;
@@ -34,6 +38,7 @@
  * Simple factory for the S3 file system.
  */
 public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
+
 	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
 
 	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.singleton("com.amazonaws.");
@@ -88,4 +93,11 @@ else if (scheme != null && authority == null) {
 
 		return fsUri;
 	}
+
+	@Nullable
+	@Override
+	protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+		final S3AFileSystem s3Afs = (S3AFileSystem) fs;
+		return new HadoopS3MultiPartUploader(s3Afs, s3Afs.getConf());
+	}
 }
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterExceptionTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterExceptionTest.java
new file mode 100644
index 00000000000..b9e4ffe8a70
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterExceptionTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.MAX_CONCURRENT_UPLOADS;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.PART_UPLOAD_MIN_SIZE;
+
+/**
+ * Javadoc.
+ */
+public class HadoopS3RecoverableWriterExceptionTest extends TestLogger {
+
+	// ----------------------- S3 general configuration -----------------------
+
+	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
+	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
+
+	private static final long PART_UPLOAD_MIN_SIZE_VALUE = 7L << 20;
+	private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
+
+	// ----------------------- Test Specific configuration -----------------------
+
+	private static final Random RND = new Random();
+
+	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+	private static final Path basePath = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+
+	private static FlinkS3FileSystem fileSystem;
+
+	// this is set for every test @Before
+	private Path basePathForTest;
+
+	// ----------------------- Test Data to be used -----------------------
+
+	private static final String testData1 = "THIS IS A TEST 1.";
+	private static final String testData2 = "THIS IS A TEST 2.";
+	private static final String testData3 = "THIS IS A TEST 3.";
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@BeforeClass
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials exist
+		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
+		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
+		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+		// initialize configuration with valid credentials
+		final Configuration conf = new Configuration();
+		conf.setString("s3.access.key", ACCESS_KEY);
+		conf.setString("s3.secret.key", SECRET_KEY);
+
+		conf.setLong(PART_UPLOAD_MIN_SIZE, PART_UPLOAD_MIN_SIZE_VALUE);
+		conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
+
+		final String defaultTmpDir = TEMP_FOLDER.getRoot().getAbsolutePath() + "s3_tmp_dir";
+		conf.setString(CoreOptions.TMP_DIRS, defaultTmpDir);
+
+		FileSystem.initialize(conf);
+	}
+
+	@AfterClass
+	public static void cleanUp() throws Exception {
+		getFileSystem().delete(basePath, true);
+
+		FileSystem.initialize(new Configuration());
+	}
+
+	@Before
+	public void prepare() throws Exception {
+		basePathForTest = new Path(
+				basePath,
+				StringUtils.getRandomString(RND, 16, 16, 'a', 'z'));
+
+		final String defaultTmpDir = getFileSystem().getLocalTmpDir();
+		final java.nio.file.Path path = Paths.get(defaultTmpDir);
+
+		if (!Files.exists(path)) {
+			Files.createDirectory(path);
+		}
+	}
+
+	@After
+	public void cleanup() throws Exception {
+		getFileSystem().delete(basePathForTest, true);
+	}
+
+	private static FlinkS3FileSystem getFileSystem() throws Exception {
+		if (fileSystem == null) {
+			fileSystem = (FlinkS3FileSystem) FileSystem.get(basePath.toUri());
+		}
+		return fileSystem;
+	}
+
+	@Test(expected = IOException.class)
+	public void testExceptionWritingAfterCloseForCommit() throws Exception {
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = getFileSystem().createRecoverableWriter().open(path);
+		stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+		stream.closeForCommit().getRecoverable();
+		stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+	}
+
+	// IMPORTANT FOR THE FOLLOWING TWO TESTS:
+
+	// These tests illustrate a difference in the user-perceived behavior of the different writers.
+	// In HDFS this will fail when trying to recover the stream while here is will fail at "commit", i.e.
+	// when we try to "publish" the multipart upload and we realize that the MPU is no longer active.
+
+	@Test(expected = IOException.class)
+	public void testResumeAfterCommit() throws Exception {
+		final RecoverableWriter writer = getFileSystem().createRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+		final RecoverableWriter.ResumeRecoverable recoverable = stream.persist();
+		stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+		stream.closeForCommit().commit();
+
+		final RecoverableFsDataOutputStream recoveredStream = writer.recover(recoverable);
+		recoveredStream.closeForCommit().commit();
+	}
+
+	@Test(expected = IOException.class)
+	public void testResumeWithWrongOffset() throws Exception {
+		// this is a rather unrealistic scenario, but it is to trigger
+		// truncation of the file and try to resume with missing data.
+
+		final RecoverableWriter writer = getFileSystem().createRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+
+		final RecoverableWriter.ResumeRecoverable recoverable1 = stream.persist();
+		stream.write(testData2.getBytes(StandardCharsets.UTF_8));
+
+		final RecoverableWriter.ResumeRecoverable recoverable2 = stream.persist();
+		stream.write(testData3.getBytes(StandardCharsets.UTF_8));
+
+		final RecoverableFsDataOutputStream recoveredStream = writer.recover(recoverable1);
+		recoveredStream.closeForCommit().commit();
+
+		// this should throw an exception
+		final RecoverableFsDataOutputStream newRecoveredStream = writer.recover(recoverable2);
+		newRecoveredStream.closeForCommit().commit();
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
new file mode 100644
index 00000000000..49cdb8a8e4a
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.MAX_CONCURRENT_UPLOADS;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.PART_UPLOAD_MIN_SIZE;
+
+/**
+ * Tests for the {@link org.apache.flink.fs.s3.common.writer.S3RecoverableWriter S3RecoverableWriter}.
+ */
+public class HadoopS3RecoverableWriterTest extends TestLogger {
+
+	// ----------------------- S3 general configuration -----------------------
+
+	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
+	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
+
+	private static final long PART_UPLOAD_MIN_SIZE_VALUE = 7L << 20;
+	private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
+
+	// ----------------------- Test Specific configuration -----------------------
+
+	private static final Random RND = new Random();
+
+	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+	private static final Path basePath = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+
+	private static FlinkS3FileSystem fileSystem;
+
+	// this is set for every test @Before
+	private Path basePathForTest;
+
+	// ----------------------- Test Data to be used -----------------------
+
+	private static final String testData1 = "THIS IS A TEST 1.";
+	private static final String testData2 = "THIS IS A TEST 2.";
+	private static final String testData3 = "THIS IS A TEST 3.";
+
+	private static final String bigDataChunk = createBigDataChunk(testData1, PART_UPLOAD_MIN_SIZE_VALUE);
+
+
+	// ----------------------- Test Lifecycle -----------------------
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@BeforeClass
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials exist
+		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
+		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
+		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+		// initialize configuration with valid credentials
+		final Configuration conf = new Configuration();
+		conf.setString("s3.access.key", ACCESS_KEY);
+		conf.setString("s3.secret.key", SECRET_KEY);
+
+		conf.setLong(PART_UPLOAD_MIN_SIZE, PART_UPLOAD_MIN_SIZE_VALUE);
+		conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
+
+		final String defaultTmpDir = TEMP_FOLDER.getRoot().getAbsolutePath() + "s3_tmp_dir";
+		conf.setString(CoreOptions.TMP_DIRS, defaultTmpDir);
+
+		FileSystem.initialize(conf);
+	}
+
+	@AfterClass
+	public static void cleanUp() throws Exception {
+		getFileSystem().delete(basePath, true);
+
+		FileSystem.initialize(new Configuration());
+	}
+
+	@Before
+	public void prepare() throws Exception {
+		basePathForTest = new Path(
+				basePath,
+				StringUtils.getRandomString(RND, 16, 16, 'a', 'z'));
+
+		cleanupLocalDir();
+	}
+
+	private void cleanupLocalDir() throws Exception {
+		final String defaultTmpDir = getFileSystem().getLocalTmpDir();
+		final java.nio.file.Path defaultTmpPath = Paths.get(defaultTmpDir);
+
+		if (Files.exists(defaultTmpPath)) {
+			try (Stream<java.nio.file.Path> files = Files.list(defaultTmpPath)) {
+				files.forEach(p -> {
+					try {
+						Files.delete(p);
+					} catch (IOException e) {
+						e.printStackTrace();
+					}
+				});
+			}
+		} else {
+			Files.createDirectory(defaultTmpPath);
+		}
+	}
+
+	@After
+	public void cleanupAndCheckTmpCleanup() throws Exception {
+		final String defaultTmpDir = getFileSystem().getLocalTmpDir();
+		final java.nio.file.Path localTmpDir = Paths.get(defaultTmpDir);
+
+		// delete local tmp dir.
+		Assert.assertTrue(Files.exists(localTmpDir));
+		Assert.assertEquals(0L, Files.list(localTmpDir).count());
+		Files.delete(localTmpDir);
+
+		// delete also S3 dir.
+		getFileSystem().delete(basePathForTest, true);
+	}
+
+	private static FlinkS3FileSystem getFileSystem() throws Exception {
+		if (fileSystem == null) {
+			fileSystem = (FlinkS3FileSystem) FileSystem.get(basePath.toUri());
+		}
+		return fileSystem;
+	}
+
+	// ----------------------- Test Normal Execution -----------------------
+
+	@Test
+	public void testCloseWithNoData() throws Exception {
+		final RecoverableWriter writer = getRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+
+		stream.closeForCommit().commit();
+	}
+
+	@Test
+	public void testCommitAfterNormalClose() throws Exception {
+		final RecoverableWriter writer = getRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		stream.write(bytesOf(testData1));
+		stream.closeForCommit().commit();
+
+		Assert.assertEquals(testData1, getContentsOfFile(path));
+	}
+
+	@Test
+	public void testCommitAfterPersist() throws Exception {
+		final RecoverableWriter writer = getRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		stream.write(bytesOf(testData1));
+		stream.persist();
+
+		stream.write(bytesOf(testData2));
+		stream.closeForCommit().commit();
+
+		Assert.assertEquals(testData1 + testData2, getContentsOfFile(path));
+	}
+
+	// ----------------------- Test Recovery -----------------------
+
+	@Test
+	public void testCommitAfterRecovery() throws Exception {
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableWriter initWriter = getRecoverableWriter();
+
+		final RecoverableFsDataOutputStream stream = initWriter.open(path);
+		stream.write(bytesOf(testData1));
+
+		stream.persist();
+		stream.persist();
+
+		// and write some more data
+		stream.write(bytesOf(testData2));
+
+		final RecoverableWriter.CommitRecoverable recoverable = stream.closeForCommit().getRecoverable();
+
+		final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable);
+
+		// get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+		final RecoverableWriter newWriter = getRecoverableWriter();
+
+		final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> deserializer = newWriter.getCommitRecoverableSerializer();
+		final RecoverableWriter.CommitRecoverable recoveredRecoverable = deserializer.deserialize(deserializer.getVersion(), serializedRecoverable);
+
+		final RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable);
+		committer.commitAfterRecovery();
+
+		Assert.assertEquals(testData1 + testData2, getContentsOfFile(path));
+	}
+
+	private static final String INIT_EMPTY_PERSIST = "EMPTY";
+	private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
+	private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
+	private static final String FINAL_WITH_EXTRA_STATE = "FINAL";
+
+	@Test
+	public void testRecoverWithEmptyState() throws Exception {
+		testResumeAfterMultiplePersistWithSmallData(
+				INIT_EMPTY_PERSIST,
+				testData3);
+	}
+
+	@Test
+	public void testRecoverWithState() throws Exception {
+		testResumeAfterMultiplePersistWithSmallData(
+				INTERM_WITH_STATE_PERSIST,
+				testData1 + testData3);
+	}
+
+	@Test
+	public void testRecoverFromIntermWithoutAdditionalState() throws Exception {
+		testResumeAfterMultiplePersistWithSmallData(
+				INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
+				testData1 + testData3);
+	}
+
+	@Test
+	public void testRecoverAfterMultiplePersistsState() throws Exception {
+		testResumeAfterMultiplePersistWithSmallData(
+				FINAL_WITH_EXTRA_STATE,
+				testData1 + testData2 + testData3);
+	}
+
+	@Test
+	public void testRecoverWithStateWithMultiPart() throws Exception {
+		testResumeAfterMultiplePersistWithMultiPartUploads(
+				INTERM_WITH_STATE_PERSIST,
+				bigDataChunk + bigDataChunk);
+	}
+
+	@Test
+	public void testRecoverFromIntermWithoutAdditionalStateWithMultiPart() throws Exception {
+		testResumeAfterMultiplePersistWithMultiPartUploads(
+				INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
+				bigDataChunk + bigDataChunk);
+	}
+
+	@Test
+	public void testRecoverAfterMultiplePersistsStateWithMultiPart() throws Exception {
+		testResumeAfterMultiplePersistWithMultiPartUploads(
+				FINAL_WITH_EXTRA_STATE,
+				bigDataChunk + bigDataChunk + bigDataChunk);
+	}
+
+	private void testResumeAfterMultiplePersistWithSmallData(
+			final String persistName,
+			final String expectedFinalContents) throws Exception {
+		testResumeAfterMultiplePersist(
+				persistName,
+				expectedFinalContents,
+				testData1,
+				testData2,
+				testData3
+		);
+	}
+
+	private void testResumeAfterMultiplePersistWithMultiPartUploads(
+			final String persistName,
+			final String expectedFinalContents) throws Exception {
+		testResumeAfterMultiplePersist(
+				persistName,
+				expectedFinalContents,
+				bigDataChunk,
+				bigDataChunk,
+				bigDataChunk
+		);
+	}
+
+	private void testResumeAfterMultiplePersist(
+			final String persistName,
+			final String expectedFinalContents,
+			final String firstItemToWrite,
+			final String secondItemToWrite,
+			final String thirdItemToWrite) throws Exception {
+
+		final Path path = new Path(basePathForTest, "part-0");
+		final RecoverableWriter initWriter = getRecoverableWriter();
+
+		final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4);
+		try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+			recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
+
+			stream.write(bytesOf(firstItemToWrite));
+
+			recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
+			recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());
+
+			// and write some more data
+			stream.write(bytesOf(secondItemToWrite));
+
+			recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
+		}
+
+		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer();
+		final byte[] serializedRecoverable = serializer.serialize(recoverables.get(persistName));
+
+		// get a new serializer from a new writer to make sure that no pre-initialized state leaks in.
+		final RecoverableWriter newWriter = getRecoverableWriter();
+		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> deserializer = newWriter.getResumeRecoverableSerializer();
+		final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
+				deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
+
+		final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable);
+		recoveredStream.write(bytesOf(thirdItemToWrite));
+		recoveredStream.closeForCommit().commit();
+
+		Assert.assertEquals(expectedFinalContents, getContentsOfFile(path));
+	}
+
+	// -------------------------- Test Utilities --------------------------
+
+	private String getContentsOfFile(Path path) throws Exception {
+		final StringBuilder builder = new StringBuilder();
+		try (
+				FSDataInputStream inStream = getFileSystem().open(path);
+				BufferedReader reader = new BufferedReader(new InputStreamReader(inStream))
+		) {
+			String line;
+			while ((line = reader.readLine()) != null) {
+				builder.append(line);
+			}
+		}
+		return builder.toString();
+	}
+
+	// ----------------------- Test utilities -----------------------
+
+	private static String createBigDataChunk(String pattern, long size) {
+		final StringBuilder stringBuilder = new StringBuilder();
+
+		int sampleLength = bytesOf(pattern).length;
+		int repeats = MathUtils.checkedDownCast(size) / sampleLength + 100;
+
+		for (int i = 0; i < repeats; i++) {
+			stringBuilder.append(pattern);
+		}
+		return stringBuilder.toString();
+	}
+
+	private static byte[] bytesOf(String str) {
+		return str.getBytes(StandardCharsets.UTF_8);
+	}
+
+	private RecoverableWriter getRecoverableWriter() throws Exception {
+		return getFileSystem().createRecoverableWriter();
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index 4b9db97bcab..b579d6ebf04 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -21,9 +21,13 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
+import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.apache.hadoop.fs.FileSystem;
+
+import javax.annotation.Nullable;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -86,6 +90,12 @@ else if (scheme != null && authority == null) {
 		return initUri;
 	}
 
+	@Nullable
+	@Override
+	protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+		return null;
+	}
+
 	private URI createURI(String str) {
 		try {
 			return new URI(str);
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java
new file mode 100644
index 00000000000..580d957db23
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.MAX_CONCURRENT_UPLOADS;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.PART_UPLOAD_MIN_SIZE;
+
+/**
+ * Tests for the {@link org.apache.flink.core.fs.RecoverableWriter} of the Presto S3 FS.
+ */
+public class PrestoS3RecoverableWriterTest {
+
+	// ----------------------- S3 general configuration -----------------------
+
+	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
+	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
+
+	private static final long PART_UPLOAD_MIN_SIZE_VALUE = 7L << 20;
+	private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
+
+	// ----------------------- Test Specific configuration -----------------------
+
+	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+	private static final Path basePath = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+
+	// ----------------------- Test Lifecycle -----------------------
+
+	@BeforeClass
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials exist
+		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
+		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
+		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+		// initialize configuration with valid credentials
+		final Configuration conf = new Configuration();
+		conf.setString("s3.access.key", ACCESS_KEY);
+		conf.setString("s3.secret.key", SECRET_KEY);
+
+		conf.setLong(PART_UPLOAD_MIN_SIZE, PART_UPLOAD_MIN_SIZE_VALUE);
+		conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
+
+		final String defaultTmpDir = conf.getString(CoreOptions.TMP_DIRS) + "s3_tmp_dir";
+		conf.setString(CoreOptions.TMP_DIRS, defaultTmpDir);
+
+		FileSystem.initialize(conf);
+	}
+
+	@AfterClass
+	public static void cleanUp() throws IOException {
+		FileSystem.initialize(new Configuration());
+	}
+
+	// ----------------------- Tests -----------------------
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void requestingRecoverableWriterShouldThroughException() throws Exception {
+		FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) FileSystem.get(basePath.toUri());
+		fileSystem.createRecoverableWriter();
+	}
+}
diff --git a/tools/travis/stage.sh b/tools/travis/stage.sh
index b751383f957..3b2252ed7dd 100644
--- a/tools/travis/stage.sh
+++ b/tools/travis/stage.sh
@@ -58,6 +58,7 @@ MODULES_CONNECTORS="\
 flink-contrib/flink-connector-wikiedits,\
 flink-filesystems/flink-hadoop-fs,\
 flink-filesystems/flink-mapr-fs,\
+flink-filesystems/flink-s3-fs-base,\
 flink-filesystems/flink-s3-fs-hadoop,\
 flink-filesystems/flink-s3-fs-presto,\
 flink-formats/flink-avro,\


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services