You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/04 16:23:56 UTC

[GitHub] asfgit closed pull request #7161: [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of in-progress files.

asfgit closed pull request #7161: [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of in-progress files.
URL: https://github.com/apache/flink/pull/7161
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
index e5bfdb84a02..7d54b11bbbc 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -121,6 +121,34 @@
 	 */
 	RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException;
 
+	/**
+	 * Marks if the writer requires to do any additional cleanup/freeing of resources occupied
+	 * as part of a {@link ResumeRecoverable}, e.g. temporarily files created or objects uploaded
+	 * to external systems.
+	 *
+	 * <p>In case cleanup is required, then {@link #cleanupRecoverableState(ResumeRecoverable)} should
+	 * be called.
+	 *
+	 * @return {@code true} if cleanup is required, {@code false} otherwise.
+	 */
+	boolean requiresCleanupOfRecoverableState();
+
+	/**
+	 * Frees up any resources that were previously occupied in order to be able to
+	 * recover from a (potential) failure. These can be temporary files that were written
+	 * to the filesystem or objects that were uploaded to S3.
+	 *
+	 * <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already
+	 * been cleaned up and the resources have been freed. But the contract is that it will throw
+	 * an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter}
+	 * whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}.
+	 *
+	 * @param resumable The {@link ResumeRecoverable} whose state we want to clean-up.
+	 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+	 * (e.g. the file to be deleted was not there for any reason - already deleted or never created).
+	 */
+	boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException;
+
 	/**
 	 * Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable
 	 * for finalizing and committing. This will publish the target file with exactly the data
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
index a2f0485a003..a43e0b6b6bf 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -70,6 +70,16 @@ public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) thro
 		}
 	}
 
+	@Override
+	public boolean requiresCleanupOfRecoverableState() {
+		return false;
+	}
+
+	@Override
+	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
 	@Override
 	public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
 		if (recoverable instanceof LocalRecoverable) {
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index 305f8ee5612..03d741b4b82 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -77,6 +77,16 @@ public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) thro
 		}
 	}
 
+	@Override
+	public boolean requiresCleanupOfRecoverableState() {
+		return false;
+	}
+
+	@Override
+	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
 	@Override
 	public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
 		if (recoverable instanceof HadoopFsRecoverable) {
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 318fd39829c..6ccdeae7df8 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,7 +25,7 @@
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,7 +141,7 @@ public FileSystem create(URI fsUri) throws IOException {
 			final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS);
 			final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
 			final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
-			final S3MultiPartUploader s3AccessHelper = getS3AccessHelper(fs);
+			final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs);
 
 			return new FlinkS3FileSystem(
 					fs,
@@ -166,6 +166,6 @@ protected abstract URI getInitURI(
 		URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
 
 	@Nullable
-	protected abstract S3MultiPartUploader getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
+	protected abstract S3AccessHelper getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
 }
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 553edde75b0..5248e061a12 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -23,7 +23,7 @@
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
 import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
@@ -60,7 +60,7 @@
 	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator;
 
 	@Nullable
-	private final S3MultiPartUploader s3UploadHelper;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final Executor uploadThreadPool;
 
@@ -83,7 +83,7 @@ public FlinkS3FileSystem(
 			String localTmpDirectory,
 			@Nullable String entropyInjectionKey,
 			int entropyLength,
-			@Nullable S3MultiPartUploader s3UploadHelper,
+			@Nullable S3AccessHelper s3UploadHelper,
 			long s3uploadPartSize,
 			int maxConcurrentUploadsPerStream) {
 
@@ -99,7 +99,7 @@ public FlinkS3FileSystem(
 		// recoverable writer parameter configuration initialization
 		this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory);
 		this.tmpFileCreator = RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
-		this.s3UploadHelper = s3UploadHelper;
+		this.s3AccessHelper = s3UploadHelper;
 		this.uploadThreadPool = Executors.newCachedThreadPool();
 
 		Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
@@ -131,7 +131,7 @@ public String getLocalTmpDir() {
 
 	@Override
 	public RecoverableWriter createRecoverableWriter() throws IOException {
-		if (s3UploadHelper == null) {
+		if (s3AccessHelper == null) {
 			// this is the case for Presto
 			throw new UnsupportedOperationException("This s3 file system implementation does not support recoverable writers.");
 		}
@@ -139,7 +139,7 @@ public RecoverableWriter createRecoverableWriter() throws IOException {
 		return S3RecoverableWriter.writer(
 				getHadoopFileSystem(),
 				tmpFileCreator,
-				s3UploadHelper,
+				s3AccessHelper,
 				uploadThreadPool,
 				s3uploadPartSize,
 				maxConcurrentUploadsPerStream);
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 80042ce22f7..9d88e65f64d 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -58,7 +58,7 @@
 @NotThreadSafe
 final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload {
 
-	private final S3MultiPartUploader s3MPUploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final Executor uploadThreadPool;
 
@@ -71,7 +71,7 @@
 	// ------------------------------------------------------------------------
 
 	private RecoverableMultiPartUploadImpl(
-			S3MultiPartUploader s3uploader,
+			S3AccessHelper s3AccessHelper,
 			Executor uploadThreadPool,
 			String uploadId,
 			String objectName,
@@ -81,10 +81,10 @@ private RecoverableMultiPartUploadImpl(
 	) {
 		checkArgument(numBytes >= 0L);
 
-		this.s3MPUploader = checkNotNull(s3uploader);
+		this.s3AccessHelper = checkNotNull(s3AccessHelper);
 		this.uploadThreadPool = checkNotNull(uploadThreadPool);
 		this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
-		this.namePrefixForTempObjects = incompleteObjectNamePrefix(objectName);
+		this.namePrefixForTempObjects = createIncompletePartObjectNamePrefix(objectName);
 		this.uploadsInProgress = new ArrayDeque<>();
 	}
 
@@ -111,7 +111,7 @@ public void uploadPart(RefCountedFSOutputStream file) throws IOException {
 		currentUploadInfo.registerNewPart(partLength);
 
 		file.retain(); // keep the file while the async upload still runs
-		uploadThreadPool.execute(new UploadTask(s3MPUploader, currentUploadInfo, file, future));
+		uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future));
 	}
 
 	@Override
@@ -124,7 +124,7 @@ public S3Committer snapshotAndGetCommitter() throws IOException {
 		final S3Recoverable snapshot = snapshotAndGetRecoverable(null);
 
 		return new S3Committer(
-				s3MPUploader,
+				s3AccessHelper,
 				snapshot.getObjectName(),
 				snapshot.uploadId(),
 				snapshot.parts(),
@@ -171,15 +171,10 @@ private String safelyUploadSmallPart(@Nullable RefCountedFSOutputStream file) th
 		}
 
 		// first, upload the trailing data file. during that time, other in-progress uploads may complete.
-		final String incompletePartObjectName = createTmpObjectName();
+		final String incompletePartObjectName = createIncompletePartObjectName();
 		file.retain();
 		try (InputStream inputStream = file.getInputStream()) {
-
-			// TODO: staged incomplete parts are not cleaned up as
-			// they do not fall under the user's global TTL on S3.
-			// Figure out a way to clean them.
-
-			s3MPUploader.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
+			s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos());
 		}
 		finally {
 			file.release();
@@ -192,7 +187,7 @@ private String safelyUploadSmallPart(@Nullable RefCountedFSOutputStream file) th
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
-	static String incompleteObjectNamePrefix(String objectName) {
+	static String createIncompletePartObjectNamePrefix(String objectName) {
 		checkNotNull(objectName);
 
 		final int lastSlash = objectName.lastIndexOf('/');
@@ -209,6 +204,10 @@ static String incompleteObjectNamePrefix(String objectName) {
 		return parent + (child.isEmpty() ? "" : '_') + child + "_tmp_";
 	}
 
+	private String createIncompletePartObjectName() {
+		return namePrefixForTempObjects + UUID.randomUUID().toString();
+	}
+
 	private void awaitPendingPartsUpload() throws IOException {
 		checkState(currentUploadInfo.getRemainingParts() == uploadsInProgress.size());
 
@@ -235,23 +234,19 @@ private PartETag awaitPendingPartUploadToComplete(CompletableFuture<PartETag> up
 		return completedUploadEtag;
 	}
 
-	private String createTmpObjectName() {
-		return namePrefixForTempObjects + UUID.randomUUID().toString();
-	}
-
 	// ------------------------------------------------------------------------
 	//  factory methods
 	// ------------------------------------------------------------------------
 
 	public static RecoverableMultiPartUploadImpl newUpload(
-			final S3MultiPartUploader s3uploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final String objectName) throws IOException {
 
-		final String multiPartUploadId = s3uploader.startMultiPartUpload(objectName);
+		final String multiPartUploadId = s3AccessHelper.startMultiPartUpload(objectName);
 
 		return new RecoverableMultiPartUploadImpl(
-				s3uploader,
+				s3AccessHelper,
 				uploadThreadPool,
 				multiPartUploadId,
 				objectName,
@@ -261,7 +256,7 @@ public static RecoverableMultiPartUploadImpl newUpload(
 	}
 
 	public static RecoverableMultiPartUploadImpl recoverUpload(
-			final S3MultiPartUploader s3uploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final String multipartUploadId,
 			final String objectName,
@@ -270,7 +265,7 @@ public static RecoverableMultiPartUploadImpl recoverUpload(
 			final Optional<File> incompletePart) {
 
 		return new RecoverableMultiPartUploadImpl(
-				s3uploader,
+				s3AccessHelper,
 				uploadThreadPool,
 				multipartUploadId,
 				objectName,
@@ -286,7 +281,7 @@ public static RecoverableMultiPartUploadImpl recoverUpload(
 
 	private static class UploadTask implements Runnable {
 
-		private final S3MultiPartUploader s3uploader;
+		private final S3AccessHelper s3AccessHelper;
 
 		private final String objectName;
 
@@ -299,7 +294,7 @@ public static RecoverableMultiPartUploadImpl recoverUpload(
 		private final CompletableFuture<PartETag> future;
 
 		UploadTask(
-				final S3MultiPartUploader s3uploader,
+				final S3AccessHelper s3AccessHelper,
 				final MultiPartUploadInfo currentUpload,
 				final RefCountedFSOutputStream file,
 				final CompletableFuture<PartETag> future) {
@@ -313,7 +308,7 @@ public static RecoverableMultiPartUploadImpl recoverUpload(
 			// these are limits put by Amazon
 			checkArgument(partNumber >= 1  && partNumber <= 10_000);
 
-			this.s3uploader = checkNotNull(s3uploader);
+			this.s3AccessHelper = checkNotNull(s3AccessHelper);
 			this.file = checkNotNull(file);
 			this.future = checkNotNull(future);
 		}
@@ -321,7 +316,7 @@ public static RecoverableMultiPartUploadImpl recoverUpload(
 		@Override
 		public void run() {
 			try (final InputStream inputStream = file.getInputStream()) {
-				final UploadPartResult result = s3uploader.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
+				final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
 				future.complete(new PartETag(result.getPartNumber(), result.getETag()));
 				file.release();
 			}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
similarity index 78%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index da227a47488..bcdea3c00b0 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -26,6 +26,7 @@
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.UploadPartResult;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -41,7 +42,7 @@
  * the upload with all its parts will be either committed or discarded.
  */
 @Internal
-public interface S3MultiPartUploader {
+public interface S3AccessHelper {
 
 	/**
 	 * Initializes a Multi-Part Upload.
@@ -66,10 +67,9 @@
 	UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException;
 
 	/**
-	 * Uploads a part and associates it with the MPU with the provided {@code uploadId}.
-	 *
-	 * <p>Contrary to the {@link #uploadIncompletePart(String, InputStream, long)}, this part can
-	 * be smaller than the minimum part size imposed by S3.
+	 * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, InputStream, long)} method,
+	 * this object is not going to be associated to any MPU and, as such, it is not subject to the garbage collection
+	 * policies specified for your S3 bucket.
 	 *
 	 * @param key the key used to identify this part.
 	 * @param file the (local) file holding the data to be uploaded.
@@ -77,7 +77,7 @@
 	 * @return The {@link PutObjectResult result} of the attempt to stage the incomplete part.
 	 * @throws IOException
 	 */
-	PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException;
+	PutObjectResult putObject(String key, InputStream file, long length) throws IOException;
 
 	/**
 	 * Finalizes a Multi-Part Upload.
@@ -92,6 +92,27 @@
 	 */
 	CompleteMultipartUploadResult commitMultiPartUpload(String key, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException;
 
+	/**
+	 * Deletes the object associated with the provided key.
+	 *
+	 * @param key The key to be deleted.
+	 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+	 * (e.g. the file to be deleted was not there).
+	 * @throws IOException
+	 */
+	boolean deleteObject(String key) throws IOException;
+
+	/**
+	 * Gets the object associated with the provided {@code key} from S3 and
+	 * puts it in the provided {@code targetLocation}.
+	 *
+	 * @param key the key of the object to fetch.
+	 * @param targetLocation the file to read the object to.
+	 * @return The number of bytes read.
+	 * @throws IOException
+	 */
+	long getObject(String key, File targetLocation) throws IOException;
+
 	/**
 	 * Fetches the metadata associated with a given key on S3.
 	 *
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
index 1fc8bf1eaa8..5fbc5bb5d42 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
@@ -40,7 +40,7 @@
 
 	private static final Logger LOG = LoggerFactory.getLogger(S3Committer.class);
 
-	private final S3MultiPartUploader s3uploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final String uploadId;
 
@@ -50,8 +50,8 @@
 
 	private final long totalLength;
 
-	S3Committer(S3MultiPartUploader s3uploader, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
-		this.s3uploader = checkNotNull(s3uploader);
+	S3Committer(S3AccessHelper s3AccessHelper, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
+		this.s3AccessHelper = checkNotNull(s3AccessHelper);
 		this.objectName = checkNotNull(objectName);
 		this.uploadId = checkNotNull(uploadId);
 		this.parts = checkNotNull(parts);
@@ -64,7 +64,7 @@ public void commit() throws IOException {
 			LOG.info("Committing {} with MPU ID {}", objectName, uploadId);
 
 			final AtomicInteger errorCount = new AtomicInteger();
-			s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
+			s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
 
 			if (errorCount.get() == 0) {
 				LOG.debug("Successfully committed {} with MPU ID {}", objectName, uploadId);
@@ -82,14 +82,14 @@ public void commitAfterRecovery() throws IOException {
 			LOG.info("Trying to commit after recovery {} with MPU ID {}", objectName, uploadId);
 
 			try {
-				s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
+				s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
 			} catch (IOException e) {
 				LOG.info("Failed to commit after recovery {} with MPU ID {}. " +
 						"Checking if file was committed before...", objectName, uploadId);
 				LOG.trace("Exception when committing:", e);
 
 				try {
-					ObjectMetadata metadata = s3uploader.getObjectMetadata(objectName);
+					ObjectMetadata metadata = s3AccessHelper.getObjectMetadata(objectName);
 					if (totalLength != metadata.getContentLength()) {
 						String message = String.format("Inconsistent result for object %s: conflicting lengths. " +
 										"Recovered committer for upload %s indicates %s bytes, present object is %s bytes",
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index b201981f31a..3727e257904 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -19,10 +19,8 @@
 package org.apache.flink.fs.s3.common.writer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
-import org.apache.flink.fs.s3.common.utils.OffsetAwareOutputStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
@@ -43,7 +41,7 @@
 
 	private final org.apache.hadoop.fs.FileSystem fs;
 
-	private final S3MultiPartUploader twoPhaseUploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier;
 
@@ -53,7 +51,7 @@
 
 	S3RecoverableMultipartUploadFactory(
 			final FileSystem fs,
-			final S3MultiPartUploader twoPhaseUploader,
+			final S3AccessHelper s3AccessHelper,
 			final int maxConcurrentUploadsPerStream,
 			final Executor executor,
 			final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) {
@@ -61,23 +59,23 @@
 		this.fs = Preconditions.checkNotNull(fs);
 		this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
 		this.executor = executor;
-		this.twoPhaseUploader = twoPhaseUploader;
+		this.s3AccessHelper = s3AccessHelper;
 		this.tmpFileSupplier = tmpFileSupplier;
 	}
 
 	RecoverableMultiPartUpload getNewRecoverableUpload(Path path) throws IOException {
 
 		return RecoverableMultiPartUploadImpl.newUpload(
-				twoPhaseUploader,
+				s3AccessHelper,
 				limitedExecutor(),
 				pathToObjectName(path));
 	}
 
 	RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) throws IOException {
-		final Optional<File> incompletePart = downloadLastDataChunk(recoverable);
+		final Optional<File> incompletePart = recoverInProgressPart(recoverable);
 
 		return RecoverableMultiPartUploadImpl.recoverUpload(
-				twoPhaseUploader,
+				s3AccessHelper,
 				limitedExecutor(),
 				recoverable.uploadId(),
 				recoverable.getObjectName(),
@@ -86,41 +84,17 @@ RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) t
 				incompletePart);
 	}
 
-	@VisibleForTesting
-	Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws IOException {
+	private Optional<File> recoverInProgressPart(S3Recoverable recoverable) throws IOException {
 
-		final String objectName = recoverable.incompleteObjectName();
-		if (objectName == null) {
+		final String objectKey = recoverable.incompleteObjectName();
+		if (objectKey == null) {
 			return Optional.empty();
 		}
 
 		// download the file (simple way)
-		final RefCountedFile fileAndStream = tmpFileSupplier.apply(null);
-		final File file = fileAndStream.getFile();
-
-		long numBytes = 0L;
-
-		try (
-				final OffsetAwareOutputStream outStream = fileAndStream.getStream();
-				final org.apache.hadoop.fs.FSDataInputStream inStream =
-						fs.open(new org.apache.hadoop.fs.Path('/' + objectName))
-		) {
-			final byte[] buffer = new byte[32 * 1024];
-
-			int numRead;
-			while ((numRead = inStream.read(buffer)) > 0) {
-				outStream.write(buffer, 0, numRead);
-				numBytes += numRead;
-			}
-		}
-
-		// some sanity checks
-		if (numBytes != file.length() || numBytes != fileAndStream.getStream().getLength()) {
-			throw new IOException(String.format("Error recovering writer: " +
-							"Downloading the last data chunk file gives incorrect length. " +
-							"File=%d bytes, Stream=%d bytes",
-					file.length(), numBytes));
-		}
+		final RefCountedFile refCountedFile = tmpFileSupplier.apply(null);
+		final File file = refCountedFile.getFile();
+		final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
 		if (numBytes != recoverable.incompleteObjectLength()) {
 			throw new IOException(String.format("Error recovering writer: " +
@@ -132,8 +106,7 @@ RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) t
 		return Optional.of(file);
 	}
 
-	@VisibleForTesting
-	String pathToObjectName(final Path path) {
+	private String pathToObjectName(final Path path) {
 		org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path);
 		if (!hadoopPath.isAbsolute()) {
 			hadoopPath = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath);
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index 2a84308d5b0..ddb4443c585 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -26,7 +26,6 @@
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +36,7 @@
 
 import static org.apache.flink.fs.s3.common.FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An implementation of the {@link RecoverableWriter} against S3.
@@ -54,16 +54,20 @@
 
 	private final long userDefinedMinPartSize;
 
+	private final S3AccessHelper s3AccessHelper;
+
 	private final S3RecoverableMultipartUploadFactory uploadFactory;
 
 	@VisibleForTesting
 	S3RecoverableWriter(
+			final S3AccessHelper s3AccessHelper,
 			final S3RecoverableMultipartUploadFactory uploadFactory,
 			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
 			final long userDefinedMinPartSize) {
 
-		this.uploadFactory = Preconditions.checkNotNull(uploadFactory);
-		this.tempFileCreator = Preconditions.checkNotNull(tempFileCreator);
+		this.s3AccessHelper = checkNotNull(s3AccessHelper);
+		this.uploadFactory = checkNotNull(uploadFactory);
+		this.tempFileCreator = checkNotNull(tempFileCreator);
 		this.userDefinedMinPartSize = userDefinedMinPartSize;
 	}
 
@@ -78,14 +82,14 @@ public RecoverableFsDataOutputStream open(Path path) throws IOException {
 	}
 
 	@Override
-	public Committer recoverForCommit(RecoverableWriter.CommitRecoverable recoverable) throws IOException {
+	public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
 		final S3Recoverable s3recoverable = castToS3Recoverable(recoverable);
 		final S3RecoverableFsDataOutputStream recovered = recover(s3recoverable);
 		return recovered.closeForCommit();
 	}
 
 	@Override
-	public S3RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable recoverable) throws IOException {
+	public S3RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
 		final S3Recoverable s3recoverable = castToS3Recoverable(recoverable);
 
 		final RecoverableMultiPartUpload upload = uploadFactory.recoverRecoverableUpload(s3recoverable);
@@ -97,15 +101,27 @@ public S3RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverab
 				s3recoverable.numBytesInParts());
 	}
 
+	@Override
+	public boolean requiresCleanupOfRecoverableState() {
+		return true;
+	}
+
+	@Override
+	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+		final S3Recoverable s3recoverable = castToS3Recoverable(resumable);
+		final String smallPartObjectToDelete = s3recoverable.incompleteObjectName();
+		return smallPartObjectToDelete != null && s3AccessHelper.deleteObject(smallPartObjectToDelete);
+	}
+
 	@Override
 	@SuppressWarnings({"rawtypes", "unchecked"})
-	public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitRecoverableSerializer() {
+	public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
 		return (SimpleVersionedSerializer) S3RecoverableSerializer.INSTANCE;
 	}
 
 	@Override
 	@SuppressWarnings({"rawtypes", "unchecked"})
-	public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeRecoverableSerializer() {
+	public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
 		return (SimpleVersionedSerializer) S3RecoverableSerializer.INSTANCE;
 	}
 
@@ -116,7 +132,7 @@ public boolean supportsResume() {
 
 	// --------------------------- Utils ---------------------------
 
-	private static S3Recoverable castToS3Recoverable(RecoverableWriter.CommitRecoverable recoverable) {
+	private static S3Recoverable castToS3Recoverable(CommitRecoverable recoverable) {
 		if (recoverable instanceof S3Recoverable) {
 			return (S3Recoverable) recoverable;
 		}
@@ -129,7 +145,7 @@ private static S3Recoverable castToS3Recoverable(RecoverableWriter.CommitRecover
 	public static S3RecoverableWriter writer(
 			final FileSystem fs,
 			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
-			final S3MultiPartUploader twoPhaseUploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final long userDefinedMinPartSize,
 			final int maxConcurrentUploadsPerStream) {
@@ -139,11 +155,11 @@ public static S3RecoverableWriter writer(
 		final S3RecoverableMultipartUploadFactory uploadFactory =
 				new S3RecoverableMultipartUploadFactory(
 						fs,
-						twoPhaseUploader,
+						s3AccessHelper,
 						maxConcurrentUploadsPerStream,
 						uploadThreadPool,
 						tempFileCreator);
 
-		return new S3RecoverableWriter(uploadFactory, tempFileCreator, userDefinedMinPartSize);
+		return new S3RecoverableWriter(s3AccessHelper, uploadFactory, tempFileCreator, userDefinedMinPartSize);
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index d3d25c347e8..5b15652db62 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -78,7 +78,7 @@ protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopC
 
 		@Nullable
 		@Override
-		protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+		protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 			return null;
 		}
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java
index a3164f18a16..9cee040c067 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java
@@ -22,36 +22,36 @@
 import org.junit.Test;
 
 /**
- * Tests for the {@link RecoverableMultiPartUploadImpl#incompleteObjectNamePrefix(String)}.
+ * Tests for the {@link RecoverableMultiPartUploadImpl#createIncompletePartObjectNamePrefix(String)}.
  */
 public class IncompletePartPrefixTest {
 
 	@Test(expected = NullPointerException.class)
 	public void nullObjectNameShouldThroughException() {
-		RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix(null);
+		RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix(null);
 	}
 
 	@Test
 	public void emptyInitialNameShouldSucceed() {
-		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("");
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix("");
 		Assert.assertEquals("_tmp_", objectNamePrefix);
 	}
 
 	@Test
 	public void nameWithoutSlashShouldSucceed() {
-		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("no_slash_path");
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix("no_slash_path");
 		Assert.assertEquals("_no_slash_path_tmp_", objectNamePrefix);
 	}
 
 	@Test
 	public void nameWithOnlySlashShouldSucceed() {
-		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("/");
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix("/");
 		Assert.assertEquals("/_tmp_", objectNamePrefix);
 	}
 
 	@Test
 	public void normalPathShouldSucceed() {
-		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("/root/home/test-file");
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix("/root/home/test-file");
 		Assert.assertEquals("/root/home/_test-file_tmp_", objectNamePrefix);
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 72554e19773..673796d6c80 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -339,19 +339,19 @@ public void execute(Runnable command) {
 	}
 
 	/**
-	 * A {@link S3MultiPartUploader} that simulates uploading part files to S3 by
+	 * A {@link S3AccessHelper} that simulates uploading part files to S3 by
 	 * simply putting complete and incomplete part files in lists for further validation.
 	 */
-	private static class StubMultiPartUploader implements S3MultiPartUploader {
+	private static class StubMultiPartUploader implements S3AccessHelper {
 
 		private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
 		private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
 
-		public List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
+		List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
 			return completePartsUploaded;
 		}
 
-		public List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
+		List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
 			return incompletePartsUploaded;
 		}
 
@@ -367,11 +367,21 @@ public UploadPartResult uploadPart(String key, String uploadId, int partNumber,
 		}
 
 		@Override
-		public PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException {
+		public PutObjectResult putObject(String key, InputStream file, long length) throws IOException {
 			final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
 			return storeAndGetPutObjectResult(key, content);
 		}
 
+		@Override
+		public boolean deleteObject(String key) throws IOException {
+			return false;
+		}
+
+		@Override
+		public long getObject(String key, File targetLocation) throws IOException {
+			return 0;
+		}
+
 		@Override
 		public CompleteMultipartUploadResult commitMultiPartUpload(
 				String key,
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
similarity index 58%
rename from flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
rename to flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f446f70e2a7..b9612adf63a 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.fs.s3hadoop;
 
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.MathUtils;
 
 import com.amazonaws.SdkBaseException;
@@ -35,24 +35,27 @@
 import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * An implementation of the {@link S3MultiPartUploader} for the Hadoop S3A filesystem.
+ * An implementation of the {@link S3AccessHelper} for the Hadoop S3A filesystem.
  */
-public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
+public class HadoopS3AccessHelper implements S3AccessHelper {
 
 	private final S3AFileSystem s3a;
 
-	private final InternalWriteOperationHelper s3uploader;
+	private final InternalWriteOperationHelper s3accessHelper;
 
-	public HadoopS3MultiPartUploader(S3AFileSystem s3a, Configuration conf) {
-		this.s3uploader = new InternalWriteOperationHelper(
+	public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) {
+		this.s3accessHelper = new InternalWriteOperationHelper(
 				checkNotNull(s3a),
 				checkNotNull(conf)
 		);
@@ -61,25 +64,58 @@ public HadoopS3MultiPartUploader(S3AFileSystem s3a, Configuration conf) {
 
 	@Override
 	public String startMultiPartUpload(String key) throws IOException {
-		return s3uploader.initiateMultiPartUpload(key);
+		return s3accessHelper.initiateMultiPartUpload(key);
 	}
 
 	@Override
 	public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException {
-		final UploadPartRequest uploadRequest = s3uploader.newUploadPartRequest(
+		final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
 				key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L);
-		return s3uploader.uploadPart(uploadRequest);
+		return s3accessHelper.uploadPart(uploadRequest);
 	}
 
 	@Override
-	public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException {
-		final PutObjectRequest putRequest = s3uploader.createPutObjectRequest(key, inputStream, length);
-		return s3uploader.putObject(putRequest);
+	public PutObjectResult putObject(String key, InputStream inputStream, long length) throws IOException {
+		final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
+		return s3accessHelper.putObject(putRequest);
 	}
 
 	@Override
 	public CompleteMultipartUploadResult commitMultiPartUpload(String destKey, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException {
-		return s3uploader.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
+		return s3accessHelper.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
+	}
+
+	@Override
+	public boolean deleteObject(String key) throws IOException {
+		return s3a.delete(new org.apache.hadoop.fs.Path('/' + key), false);
+	}
+
+	@Override
+	public long getObject(String key, File targetLocation) throws IOException {
+		long numBytes = 0L;
+		try (
+				final OutputStream outStream = new FileOutputStream(targetLocation);
+				final org.apache.hadoop.fs.FSDataInputStream inStream =
+						s3a.open(new org.apache.hadoop.fs.Path('/' + key))
+		) {
+			final byte[] buffer = new byte[32 * 1024];
+
+			int numRead;
+			while ((numRead = inStream.read(buffer)) != -1) {
+				outStream.write(buffer, 0, numRead);
+				numBytes += numRead;
+			}
+		}
+
+		// some sanity checks
+		if (numBytes != targetLocation.length()) {
+			throw new IOException(String.format("Error recovering writer: " +
+							"Downloading the last data chunk file gives incorrect length. " +
+							"File=%d bytes, Stream=%d bytes",
+					targetLocation.length(), numBytes));
+		}
+
+		return numBytes;
 	}
 
 	@Override
@@ -94,7 +130,7 @@ public ObjectMetadata getObjectMetadata(String key) throws IOException {
 
 	/**
 	 * Internal {@link WriteOperationHelper} that is wrapped so that it only exposes
-	 * the functionality we need for the {@link S3MultiPartUploader}.
+	 * the functionality we need for the {@link S3AccessHelper}.
 	 */
 	private static final class InternalWriteOperationHelper extends WriteOperationHelper {
 
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 897629f7910..2637e7b2e23 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -21,7 +21,7 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -96,8 +96,8 @@ else if (scheme != null && authority == null) {
 
 	@Nullable
 	@Override
-	protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+	protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 		final S3AFileSystem s3Afs = (S3AFileSystem) fs;
-		return new HadoopS3MultiPartUploader(s3Afs, s3Afs.getConf());
+		return new HadoopS3AccessHelper(s3Afs, s3Afs.getConf());
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
index 17fb02b188c..6c8619de0ed 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
@@ -27,6 +27,7 @@
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.fs.s3.common.writer.S3Recoverable;
 import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.StringUtils;
@@ -42,6 +43,7 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedReader;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
@@ -213,6 +215,50 @@ public void testCommitAfterPersist() throws Exception {
 		Assert.assertEquals(testData1 + testData2, getContentsOfFile(path));
 	}
 
+	@Test(expected = FileNotFoundException.class)
+	public void testCleanupRecoverableState() throws Exception {
+		final RecoverableWriter writer = getRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		stream.write(bytesOf(testData1));
+		S3Recoverable recoverable = (S3Recoverable) stream.persist();
+
+		stream.closeForCommit().commit();
+
+		// still the data is there as we have not deleted them from the tmp object
+		final String content = getContentsOfFile(new Path('/' + recoverable.incompleteObjectName()));
+		Assert.assertEquals(testData1, content);
+
+		boolean successfullyDeletedState = writer.cleanupRecoverableState(recoverable);
+		Assert.assertTrue(successfullyDeletedState);
+
+		// this should throw the exception as we deleted the file.
+		getContentsOfFile(new Path('/' + recoverable.incompleteObjectName()));
+	}
+
+	@Test
+	public void testCallingDeleteObjectTwiceDoesNotThroughException() throws Exception {
+		final RecoverableWriter writer = getRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		stream.write(bytesOf(testData1));
+		S3Recoverable recoverable = (S3Recoverable) stream.persist();
+
+		stream.closeForCommit().commit();
+
+		// still the data is there as we have not deleted them from the tmp object
+		final String content = getContentsOfFile(new Path('/' + recoverable.incompleteObjectName()));
+		Assert.assertEquals(testData1, content);
+
+		boolean successfullyDeletedState = writer.cleanupRecoverableState(recoverable);
+		Assert.assertTrue(successfullyDeletedState);
+
+		boolean unsuccessfulDeletion = writer.cleanupRecoverableState(recoverable);
+		Assert.assertFalse(unsuccessfulDeletion);
+	}
+
 	// ----------------------- Test Recovery -----------------------
 
 	@Test
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index b579d6ebf04..0fb28571960 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -21,7 +21,7 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
@@ -92,7 +92,7 @@ else if (scheme != null && authority == null) {
 
 	@Nullable
 	@Override
-	protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+	protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 		return null;
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 65a7628578c..b59c84ea92e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -21,8 +21,10 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,11 +33,15 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Objects;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
@@ -62,14 +68,16 @@
 
 	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint;
+	private final NavigableMap<Long, ResumeRecoverable> resumablesPerCheckpoint;
+
+	private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
 
 	private long partCounter;
 
 	@Nullable
 	private PartFileWriter<IN, BucketID> inProgressPart;
 
-	private List<RecoverableWriter.CommitRecoverable> pendingPartsForCurrentCheckpoint;
+	private List<CommitRecoverable> pendingPartsForCurrentCheckpoint;
 
 	/**
 	 * Constructor to create a new empty bucket.
@@ -83,16 +91,17 @@ private Bucket(
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy) {
 
-		this.fsWriter = Preconditions.checkNotNull(fsWriter);
+		this.fsWriter = checkNotNull(fsWriter);
 		this.subtaskIndex = subtaskIndex;
-		this.bucketId = Preconditions.checkNotNull(bucketId);
-		this.bucketPath = Preconditions.checkNotNull(bucketPath);
+		this.bucketId = checkNotNull(bucketId);
+		this.bucketPath = checkNotNull(bucketPath);
 		this.partCounter = initialPartCounter;
-		this.partFileFactory = Preconditions.checkNotNull(partFileFactory);
-		this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+		this.partFileFactory = checkNotNull(partFileFactory);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
 
 		this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
-		this.pendingPartsPerCheckpoint = new HashMap<>();
+		this.pendingPartsPerCheckpoint = new TreeMap<>();
+		this.resumablesPerCheckpoint = new TreeMap<>();
 	}
 
 	/**
@@ -120,20 +129,26 @@ private Bucket(
 	}
 
 	private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
+		if (!state.hasInProgressResumableFile()) {
+			return;
+		}
 
 		// we try to resume the previous in-progress file
-		if (state.hasInProgressResumableFile()) {
-			final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
-			inProgressPart = partFileFactory.resumeFrom(
-					bucketId, fsWriter, resumable, state.getInProgressFileCreationTime());
+		final ResumeRecoverable resumable = state.getInProgressResumableFile();
+		final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
+		inProgressPart = partFileFactory.resumeFrom(
+				bucketId, stream, resumable, state.getInProgressFileCreationTime());
+
+		if (fsWriter.requiresCleanupOfRecoverableState()) {
+			fsWriter.cleanupRecoverableState(resumable);
 		}
 	}
 
 	private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
 
 		// we commit pending files for checkpoints that precess the last successful one, from which we are recovering
-		for (List<RecoverableWriter.CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
-			for (RecoverableWriter.CommitRecoverable committable: committables) {
+		for (List<CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
+			for (CommitRecoverable committable: committables) {
 				fsWriter.recoverForCommit(committable).commitAfterRecovery();
 			}
 		}
@@ -156,8 +171,8 @@ boolean isActive() {
 	}
 
 	void merge(final Bucket<IN, BucketID> bucket) throws IOException {
-		Preconditions.checkNotNull(bucket);
-		Preconditions.checkState(Objects.equals(bucket.bucketPath, bucketPath));
+		checkNotNull(bucket);
+		checkState(Objects.equals(bucket.bucketPath, bucketPath));
 
 		// There should be no pending files in the "to-merge" states.
 		// The reason is that:
@@ -165,10 +180,10 @@ void merge(final Bucket<IN, BucketID> bucket) throws IOException {
 		//    So a snapshot, including the one we are recovering from, will never contain such files.
 		// 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
 
-		Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
-		Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+		checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
+		checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
 
-		RecoverableWriter.CommitRecoverable committable = bucket.closePartFile();
+		CommitRecoverable committable = bucket.closePartFile();
 		if (committable != null) {
 			pendingPartsForCurrentCheckpoint.add(committable);
 		}
@@ -195,7 +210,8 @@ private void rollPartFile(final long currentTime) throws IOException {
 		closePartFile();
 
 		final Path partFilePath = assembleNewPartPath();
-		inProgressPart = partFileFactory.openNew(bucketId, fsWriter, partFilePath, currentTime);
+		final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
+		inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
@@ -209,8 +225,8 @@ private Path assembleNewPartPath() {
 		return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
 	}
 
-	private RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
-		RecoverableWriter.CommitRecoverable committable = null;
+	private CommitRecoverable closePartFile() throws IOException {
+		CommitRecoverable committable = null;
 		if (inProgressPart != null) {
 			committable = inProgressPart.closeForCommit();
 			pendingPartsForCurrentCheckpoint.add(committable);
@@ -228,12 +244,21 @@ void disposePartFile() {
 	BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
 		prepareBucketForCheckpointing(checkpointId);
 
-		RecoverableWriter.ResumeRecoverable inProgressResumable = null;
+		ResumeRecoverable inProgressResumable = null;
 		long inProgressFileCreationTime = Long.MAX_VALUE;
 
 		if (inProgressPart != null) {
 			inProgressResumable = inProgressPart.persist();
 			inProgressFileCreationTime = inProgressPart.getCreationTime();
+
+			// the following is an optimization so that writers that do not
+			// require cleanup, they do not have to keep track of resumables
+			// and later iterate over the active buckets.
+			// (see onSuccessfulCompletionOfCheckpoint())
+
+			if (fsWriter.requiresCleanupOfRecoverableState()) {
+				this.resumablesPerCheckpoint.put(checkpointId, inProgressResumable);
+			}
 		}
 
 		return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
@@ -254,19 +279,36 @@ private void prepareBucketForCheckpointing(long checkpointId) throws IOException
 	}
 
 	void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
-		Preconditions.checkNotNull(fsWriter);
+		checkNotNull(fsWriter);
+
+		Iterator<Map.Entry<Long, List<CommitRecoverable>>> it =
+				pendingPartsPerCheckpoint.headMap(checkpointId, true)
+						.entrySet().iterator();
+
+		while (it.hasNext()) {
+			Map.Entry<Long, List<CommitRecoverable>> entry = it.next();
+
+			for (CommitRecoverable committable : entry.getValue()) {
+				fsWriter.recoverForCommit(committable).commit();
+			}
+			it.remove();
+		}
+
+		cleanupOutdatedResumables(checkpointId);
+	}
 
-		Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
-				pendingPartsPerCheckpoint.entrySet().iterator();
+	private void cleanupOutdatedResumables(long checkpointId) throws IOException {
+		Iterator<Map.Entry<Long, ResumeRecoverable>> it =
+				resumablesPerCheckpoint.headMap(checkpointId, false)
+						.entrySet().iterator();
 
 		while (it.hasNext()) {
-			Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry = it.next();
+			final ResumeRecoverable recoverable = it.next().getValue();
+			final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
+			it.remove();
 
-			if (entry.getKey() <= checkpointId) {
-				for (RecoverableWriter.CommitRecoverable committable : entry.getValue()) {
-					fsWriter.recoverForCommit(committable).commit();
-				}
-				it.remove();
+			if (LOG.isDebugEnabled() && successfullyDeleted) {
+				LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
 			}
 		}
 	}
@@ -285,7 +327,7 @@ void onProcessingTime(long timestamp) throws IOException {
 	// --------------------------- Testing Methods -----------------------------
 
 	@VisibleForTesting
-	Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPartsPerCheckpoint() {
+	Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() {
 		return pendingPartsPerCheckpoint;
 	}
 
@@ -296,7 +338,7 @@ void onProcessingTime(long timestamp) throws IOException {
 	}
 
 	@VisibleForTesting
-	List<RecoverableWriter.CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
+	List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
 		return pendingPartsForCurrentCheckpoint;
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 005ae4e737f..a44b0e8aea3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -79,14 +79,13 @@ void write(IN element, long currentTime) throws IOException {
 		@Override
 		public PartFileWriter<IN, BucketID> resumeFrom(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final RecoverableWriter.ResumeRecoverable resumable,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(resumable);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
 			final BulkWriter<IN> writer = writerFactory.create(stream);
 			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
 		}
@@ -94,14 +93,13 @@ void write(IN element, long currentTime) throws IOException {
 		@Override
 		public PartFileWriter<IN, BucketID> openNew(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final Path path,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(path);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
 			final BulkWriter<IN> writer = writerFactory.create(stream);
 			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
index 662454ba200..95a2978a4c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
@@ -111,7 +111,7 @@ public long getLastUpdateTime() {
 		/**
 		 * Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
 		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
 		 * @param resumable the state of the stream we are resurrecting.
 		 * @param creationTime the creation time of the stream.
 		 * @return the recovered {@link PartFileWriter writer}.
@@ -119,14 +119,14 @@ public long getLastUpdateTime() {
 		 */
 		PartFileWriter<IN, BucketID> resumeFrom(
 			final BucketID bucketId,
-			final RecoverableWriter fileSystemWriter,
+			final RecoverableFsDataOutputStream stream,
 			final RecoverableWriter.ResumeRecoverable resumable,
 			final long creationTime) throws IOException;
 
 		/**
 		 * Used to create a new {@link PartFileWriter writer}.
 		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
 		 * @param path the part this writer will write to.
 		 * @param creationTime the creation time of the stream.
 		 * @return the new {@link PartFileWriter writer}.
@@ -134,7 +134,7 @@ public long getLastUpdateTime() {
 		 */
 		PartFileWriter<IN, BucketID> openNew(
 			final BucketID bucketId,
-			final RecoverableWriter fileSystemWriter,
+			final RecoverableFsDataOutputStream stream,
 			final Path path,
 			final long creationTime) throws IOException;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 2478b79a527..05c160c2629 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -67,28 +67,26 @@ void write(IN element, long currentTime) throws IOException {
 		@Override
 		public PartFileWriter<IN, BucketID> resumeFrom(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final RecoverableWriter.ResumeRecoverable resumable,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(resumable);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
 			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
 		}
 
 		@Override
 		public PartFileWriter<IN, BucketID> openNew(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final Path path,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(path);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
 			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
 		}
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
new file mode 100644
index 00000000000..f328fd75d25
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.fs.local.LocalRecoverableWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link Bucket}.
+ */
+public class BucketTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void shouldNotCleanupResumablesThatArePartOfTheAckedCheckpoint() throws IOException {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0);
+
+		bucketUnderTest.write("test-element", 0L);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasActiveInProgressFile());
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+		assertThat(recoverableWriter, hasCalledDiscard(0)); // it did not discard as this is still valid.
+	}
+
+	@Test
+	public void shouldCleanupOutdatedResumablesOnCheckpointAck() throws IOException {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0);
+
+		bucketUnderTest.write("test-element", 0L);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasActiveInProgressFile());
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+
+		bucketUnderTest.onReceptionOfCheckpoint(1L);
+		bucketUnderTest.onReceptionOfCheckpoint(2L);
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L);
+		assertThat(recoverableWriter, hasCalledDiscard(2)); // that is for checkpoints 0 and 1
+	}
+
+	@Test
+	public void shouldCleanupResumableAfterRestoring() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0);
+
+		bucketUnderTest.write("test-element", 0L);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasActiveInProgressFile());
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+
+		final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path);
+		restoreBucket(newRecoverableWriter, 0, 1, state);
+
+		assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1
+	}
+
+	@Test
+	public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasNoActiveInProgressFile());
+
+		bucketUnderTest.onReceptionOfCheckpoint(1L);
+		bucketUnderTest.onReceptionOfCheckpoint(2L);
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L);
+		assertThat(recoverableWriter, hasCalledDiscard(0)); // we have no in-progress file.
+	}
+
+	// ------------------------------- Matchers --------------------------------
+
+	private static TypeSafeMatcher<TestRecoverableWriter> hasCalledDiscard(int times) {
+		return new TypeSafeMatcher<TestRecoverableWriter>() {
+			@Override
+			protected boolean matchesSafely(TestRecoverableWriter writer) {
+				return writer.getCleanupCallCounter() == times;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description
+						.appendText("the TestRecoverableWriter to have called discardRecoverableState() ")
+						.appendValue(times)
+						.appendText(" times.");
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<BucketState<String>> hasActiveInProgressFile() {
+		return new TypeSafeMatcher<BucketState<String>>() {
+			@Override
+			protected boolean matchesSafely(BucketState<String> state) {
+				return state.getInProgressResumableFile() != null;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a BucketState with active in-progress file.");
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<BucketState<String>> hasNoActiveInProgressFile() {
+		return new TypeSafeMatcher<BucketState<String>>() {
+			@Override
+			protected boolean matchesSafely(BucketState<String> state) {
+				return state.getInProgressResumableFile() == null;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a BucketState with no active in-progress file.");
+			}
+		};
+	}
+
+	// ------------------------------- Mock Classes --------------------------------
+
+	private static class TestRecoverableWriter extends LocalRecoverableWriter {
+
+		private int cleanupCallCounter = 0;
+
+		TestRecoverableWriter(LocalFileSystem fs) {
+			super(fs);
+		}
+
+		int getCleanupCallCounter() {
+			return cleanupCallCounter;
+		}
+
+		@Override
+		public boolean requiresCleanupOfRecoverableState() {
+			// here we return true so that the cleanupRecoverableState() is called.
+			return true;
+		}
+
+		@Override
+		public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+			cleanupCallCounter++;
+			return false;
+		}
+
+		@Override
+		public String toString() {
+			return "TestRecoverableWriter has called discardRecoverableState() " + cleanupCallCounter + " times.";
+		}
+	}
+
+	// ------------------------------- Utility Methods --------------------------------
+
+	private static final String bucketId = "testing-bucket";
+
+	private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create().build();
+
+	private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
+			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
+
+	private static Bucket<String, String> createBucket(
+			final RecoverableWriter writer,
+			final Path bucketPath,
+			final int subtaskIdx,
+			final int initialPartCounter) {
+
+		return Bucket.getNew(
+				writer,
+				subtaskIdx,
+				bucketId,
+				bucketPath,
+				initialPartCounter,
+				partFileFactory,
+				rollingPolicy);
+	}
+
+	private static Bucket<String, String> restoreBucket(
+			final RecoverableWriter writer,
+			final int subtaskIndex,
+			final long initialPartCounter,
+			final BucketState<String> bucketState) throws Exception {
+
+		return Bucket.restore(
+				writer,
+				subtaskIndex,
+				initialPartCounter,
+				partFileFactory,
+				rollingPolicy,
+				bucketState
+		);
+	}
+
+	private static TestRecoverableWriter getRecoverableWriter(Path path) {
+		try {
+			final FileSystem fs = FileSystem.get(path.toUri());
+			if (!(fs instanceof LocalFileSystem)) {
+				fail("Expected Local FS but got a " + fs.getClass().getName() + " for path: " + path);
+			}
+			return new TestRecoverableWriter((LocalFileSystem) fs);
+		} catch (IOException e) {
+			fail();
+		}
+		return null;
+	}
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services