You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/18 14:12:37 UTC

[flink] 03/04: [FLINK-17593][Connectors/FileSystem] Support arbitrary recovery mechanism for PartFileWriter

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 220f7dabf24fc1e6fe77b83a00a7de47d75d5b18
Author: GuoWei Ma <gu...@gmail.com>
AuthorDate: Wed May 13 21:15:03 2020 +0800

    [FLINK-17593][Connectors/FileSystem] Support arbitrary recovery mechanism for PartFileWriter
    
    This change includes two things:
    
    1. Make the PartFileWriter generic and decouple the PartFileWriter and
    RecoverableStream. According to different pre-commit / commit methods,
    this change allows us to extend different types of PartFileWriter.
    
    2. Make the Bucket/Buckets depends on the PartFileFactory instead of
    RecoverableWriter.
---
 .../apache/flink/core/fs/RecoverableWriter.java    |   6 +-
 .../core/fs/local/LocalRecoverableWriter.java      |   2 +-
 .../runtime/fs/hdfs/HadoopRecoverableWriter.java   |   2 +-
 .../sink/filesystem/AbstractPartFileWriter.java    |  58 ++++
 .../api/functions/sink/filesystem/Bucket.java      | 166 +++++-------
 .../functions/sink/filesystem/BucketFactory.java   |   7 +-
 .../api/functions/sink/filesystem/BucketState.java |  32 +--
 .../sink/filesystem/BucketStateSerializer.java     | 146 ++++++----
 .../functions/sink/filesystem/BucketWriter.java    | 109 ++++++++
 .../api/functions/sink/filesystem/Buckets.java     |  37 +--
 .../sink/filesystem/BulkBucketWriter.java          |  72 +++++
 .../functions/sink/filesystem/BulkPartWriter.java  |  56 +---
 .../sink/filesystem/DefaultBucketFactoryImpl.java  |  13 +-
 .../sink/filesystem/InProgressFileWriter.java      |  70 +++++
 .../OutputStreamBasedPartFileWriter.java           | 296 +++++++++++++++++++++
 .../functions/sink/filesystem/PartFileWriter.java  | 141 ----------
 .../sink/filesystem/RowWiseBucketWriter.java       |  68 +++++
 .../sink/filesystem/RowWisePartWriter.java         |  50 +---
 .../sink/filesystem/StreamingFileSink.java         |   4 +-
 .../sink/filesystem/WriterProperties.java          |  67 +++++
 .../sink/filesystem/BucketAssignerITCases.java     |   3 +-
 .../sink/filesystem/BucketStateSerializerTest.java |  35 +--
 .../api/functions/sink/filesystem/BucketTest.java  |  58 ++--
 .../api/functions/sink/filesystem/BucketsTest.java |  19 +-
 .../sink/filesystem/RollingPolicyTest.java         |   3 +-
 .../api/functions/sink/filesystem/TestUtils.java   |   7 +-
 .../filesystem/utils/NoOpRecoverableWriter.java    |   2 +-
 27 files changed, 1033 insertions(+), 496 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
index 7d54b11..b92da88 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -138,10 +138,8 @@ public interface RecoverableWriter {
 	 * recover from a (potential) failure. These can be temporary files that were written
 	 * to the filesystem or objects that were uploaded to S3.
 	 *
-	 * <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already
-	 * been cleaned up and the resources have been freed. But the contract is that it will throw
-	 * an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter}
-	 * whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}.
+	 * <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
+	 *  happen for any reason.
 	 *
 	 * @param resumable The {@link ResumeRecoverable} whose state we want to clean-up.
 	 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
index bae7314..2a97b85 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -77,7 +77,7 @@ public class LocalRecoverableWriter implements RecoverableWriter {
 
 	@Override
 	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
-		throw new UnsupportedOperationException();
+		return false;
 	}
 
 	@Override
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index d325f2c..91d76c6 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -95,7 +95,7 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
 
 	@Override
 	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
-		throw new UnsupportedOperationException();
+		return false;
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java
new file mode 100644
index 0000000..0350a8f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+/**
+ * An abstract writer for the currently open part file in a specific {@link Bucket}.
+ * @param <IN> the element type.
+ * @param <BucketID> the bucket id type.
+ */
+public abstract class AbstractPartFileWriter<IN, BucketID> implements InProgressFileWriter<IN, BucketID> {
+
+	private final BucketID bucketID;
+
+	private final long creationTime;
+
+	private long lastUpdateTime;
+
+	public AbstractPartFileWriter(final BucketID bucketID, final long createTime) {
+		this.bucketID = bucketID;
+		this.creationTime = createTime;
+		this.lastUpdateTime = createTime;
+	}
+
+	@Override
+	public BucketID getBucketId() {
+		return bucketID;
+	}
+
+	@Override
+	public long getCreationTime() {
+		return creationTime;
+	}
+
+	@Override
+	public long getLastUpdateTime() {
+		return lastUpdateTime;
+	}
+
+	void markWrite(long now) {
+		this.lastUpdateTime = now;
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index e7abd37..5e9a72b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -21,10 +21,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
-import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,48 +56,44 @@ public class Bucket<IN, BucketID> {
 
 	private final int subtaskIndex;
 
-	private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;
-
-	private final RecoverableWriter fsWriter;
+	private final BucketWriter<IN, BucketID> bucketWriter;
 
 	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-	private final NavigableMap<Long, ResumeRecoverable> resumablesPerCheckpoint;
+	private final NavigableMap<Long, InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverablesPerCheckpoint;
 
-	private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
+	private final NavigableMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
 
 	private final OutputFileConfig outputFileConfig;
 
 	private long partCounter;
 
 	@Nullable
-	private PartFileWriter<IN, BucketID> inProgressPart;
+	private InProgressFileWriter<IN, BucketID> inProgressPart;
 
-	private List<CommitRecoverable> pendingPartsForCurrentCheckpoint;
+	private List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverablesForCurrentCheckpoint;
 
 	/**
 	 * Constructor to create a new empty bucket.
 	 */
 	private Bucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final OutputFileConfig outputFileConfig) {
-		this.fsWriter = checkNotNull(fsWriter);
 		this.subtaskIndex = subtaskIndex;
 		this.bucketId = checkNotNull(bucketId);
 		this.bucketPath = checkNotNull(bucketPath);
 		this.partCounter = initialPartCounter;
-		this.partFileFactory = checkNotNull(partFileFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
 		this.rollingPolicy = checkNotNull(rollingPolicy);
 
-		this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
-		this.pendingPartsPerCheckpoint = new TreeMap<>();
-		this.resumablesPerCheckpoint = new TreeMap<>();
+		this.pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
+		this.pendingFileRecoverablesPerCheckpoint = new TreeMap<>();
+		this.inProgressFileRecoverablesPerCheckpoint = new TreeMap<>();
 
 		this.outputFileConfig = checkNotNull(outputFileConfig);
 	}
@@ -110,16 +102,14 @@ public class Bucket<IN, BucketID> {
 	 * Constructor to restore a bucket from checkpointed state.
 	 */
 	private Bucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			final BucketWriter<IN, BucketID> partFileFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
 			final OutputFileConfig outputFileConfig) throws IOException {
 
 		this(
-				fsWriter,
 				subtaskIndex,
 				bucketState.getBucketId(),
 				bucketState.getBucketPath(),
@@ -133,31 +123,29 @@ public class Bucket<IN, BucketID> {
 	}
 
 	private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
-		if (!state.hasInProgressResumableFile()) {
+		if (!state.hasInProgressFileRecoverable()) {
 			return;
 		}
 
 		// we try to resume the previous in-progress file
-		final ResumeRecoverable resumable = state.getInProgressResumableFile();
+		final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
 
-		if (fsWriter.supportsResume()) {
-			final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
-			inProgressPart = partFileFactory.resumeFrom(
-					bucketId, stream, resumable, state.getInProgressFileCreationTime());
+		if (bucketWriter.getProperties().supportsResume()) {
+			inProgressPart = bucketWriter.resumeInProgressFileFrom(
+					bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
 		} else {
 			// if the writer does not support resume, then we close the
 			// in-progress part and commit it, as done in the case of pending files.
-
-			fsWriter.recoverForCommit(resumable).commitAfterRecovery();
+			bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
 		}
 	}
 
 	private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
 
 		// we commit pending files for checkpoints that precess the last successful one, from which we are recovering
-		for (List<CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
-			for (CommitRecoverable committable: committables) {
-				fsWriter.recoverForCommit(committable).commitAfterRecovery();
+		for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables: state.getPendingFileRecoverablesPerCheckpoint().values()) {
+			for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable: pendingFileRecoverables) {
+				bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
 			}
 		}
 	}
@@ -175,7 +163,7 @@ public class Bucket<IN, BucketID> {
 	}
 
 	boolean isActive() {
-		return inProgressPart != null || !pendingPartsForCurrentCheckpoint.isEmpty() || !pendingPartsPerCheckpoint.isEmpty();
+		return inProgressPart != null || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty() || !pendingFileRecoverablesPerCheckpoint.isEmpty();
 	}
 
 	void merge(final Bucket<IN, BucketID> bucket) throws IOException {
@@ -184,16 +172,16 @@ public class Bucket<IN, BucketID> {
 
 		// There should be no pending files in the "to-merge" states.
 		// The reason is that:
-		// 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()).
-		//    So a snapshot, including the one we are recovering from, will never contain such files.
-		// 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
+		// 1) the pendingFileRecoverablesForCurrentCheckpoint is emptied whenever we take a Recoverable (see prepareBucketForCheckpointing()).
+		//    So a Recoverable, including the one we are recovering from, will never contain such files.
+		// 2) the files in pendingFileRecoverablesPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
 
-		checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
-		checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+		checkState(bucket.pendingFileRecoverablesForCurrentCheckpoint.isEmpty());
+		checkState(bucket.pendingFileRecoverablesPerCheckpoint.isEmpty());
 
-		CommitRecoverable committable = bucket.closePartFile();
-		if (committable != null) {
-			pendingPartsForCurrentCheckpoint.add(committable);
+		InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = bucket.closePartFile();
+		if (pendingFileRecoverable != null) {
+			pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -218,8 +206,7 @@ public class Bucket<IN, BucketID> {
 		closePartFile();
 
 		final Path partFilePath = assembleNewPartPath();
-		final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
-		inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
+		inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
@@ -233,14 +220,14 @@ public class Bucket<IN, BucketID> {
 		return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
 	}
 
-	private CommitRecoverable closePartFile() throws IOException {
-		CommitRecoverable committable = null;
+	private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
+		InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
 		if (inProgressPart != null) {
-			committable = inProgressPart.closeForCommit();
-			pendingPartsForCurrentCheckpoint.add(committable);
+			pendingFileRecoverable = inProgressPart.closeForCommit();
+			pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
 			inProgressPart = null;
 		}
-		return committable;
+		return pendingFileRecoverable;
 	}
 
 	void disposePartFile() {
@@ -252,24 +239,16 @@ public class Bucket<IN, BucketID> {
 	BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
 		prepareBucketForCheckpointing(checkpointId);
 
-		ResumeRecoverable inProgressResumable = null;
+		InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
 		long inProgressFileCreationTime = Long.MAX_VALUE;
 
 		if (inProgressPart != null) {
-			inProgressResumable = inProgressPart.persist();
+			inProgressFileRecoverable = inProgressPart.persist();
 			inProgressFileCreationTime = inProgressPart.getCreationTime();
-
-			// the following is an optimization so that writers that do not
-			// require cleanup, they do not have to keep track of resumables
-			// and later iterate over the active buckets.
-			// (see onSuccessfulCompletionOfCheckpoint())
-
-			if (fsWriter.requiresCleanupOfRecoverableState()) {
-				this.resumablesPerCheckpoint.put(checkpointId, inProgressResumable);
-			}
+			this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
 		}
 
-		return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
+		return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);
 	}
 
 	private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
@@ -280,49 +259,46 @@ public class Bucket<IN, BucketID> {
 			closePartFile();
 		}
 
-		if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
-			pendingPartsPerCheckpoint.put(checkpointId, pendingPartsForCurrentCheckpoint);
-			pendingPartsForCurrentCheckpoint = new ArrayList<>();
+		if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
+			pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
+			pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
 		}
 	}
 
 	void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
-		checkNotNull(fsWriter);
+		checkNotNull(bucketWriter);
 
-		Iterator<Map.Entry<Long, List<CommitRecoverable>>> it =
-				pendingPartsPerCheckpoint.headMap(checkpointId, true)
+		Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
+				pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
 						.entrySet().iterator();
 
 		while (it.hasNext()) {
-			Map.Entry<Long, List<CommitRecoverable>> entry = it.next();
+			Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
 
-			for (CommitRecoverable committable : entry.getValue()) {
-				fsWriter.recoverForCommit(committable).commit();
+			for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
+				bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
 			}
 			it.remove();
 		}
 
-		cleanupOutdatedResumables(checkpointId);
+		cleanupInProgressFileRecoverables(checkpointId);
 	}
 
-	private void cleanupOutdatedResumables(long checkpointId) throws IOException {
-		Iterator<Map.Entry<Long, ResumeRecoverable>> it =
-				resumablesPerCheckpoint.headMap(checkpointId, false)
+	private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
+		Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
+				inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
 						.entrySet().iterator();
 
 		while (it.hasNext()) {
-			final ResumeRecoverable recoverable = it.next().getValue();
+			final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();
 
-			// this check is redundant, as we only put entries in the resumablesPerCheckpoint map
-			// list when the requiresCleanupOfRecoverableState() returns true, but having it makes
+			// this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
+			// list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
 			// the code more readable.
 
-			if (fsWriter.requiresCleanupOfRecoverableState()) {
-				final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
-
-				if (LOG.isDebugEnabled() && successfullyDeleted) {
-					LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
-				}
+			final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
+			if (LOG.isDebugEnabled() && successfullyDeleted) {
+				LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
 			}
 			it.remove();
 		}
@@ -342,54 +318,51 @@ public class Bucket<IN, BucketID> {
 	// --------------------------- Testing Methods -----------------------------
 
 	@VisibleForTesting
-	Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() {
-		return pendingPartsPerCheckpoint;
+	Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
+		return pendingFileRecoverablesPerCheckpoint;
 	}
 
 	@Nullable
 	@VisibleForTesting
-	PartFileWriter<IN, BucketID> getInProgressPart() {
+	InProgressFileWriter<IN, BucketID> getInProgressPart() {
 		return inProgressPart;
 	}
 
 	@VisibleForTesting
-	List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
-		return pendingPartsForCurrentCheckpoint;
+	List<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverablesForCurrentCheckpoint() {
+		return pendingFileRecoverablesForCurrentCheckpoint;
 	}
 
 	// --------------------------- Static Factory Methods -----------------------------
 
 	/**
 	 * Creates a new empty {@code Bucket}.
-	 * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
 	 * @param subtaskIndex the index of the subtask creating the bucket.
 	 * @param bucketId the identifier of the bucket, as returned by the {@link BucketAssigner}.
 	 * @param bucketPath the path to where the part files for the bucket will be written to.
 	 * @param initialPartCounter the initial counter for the part files of the bucket.
-	 * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
+	 * @param bucketWriter the {@link BucketWriter} used to write part files in the bucket.
 	 * @param <IN> the type of input elements to the sink.
 	 * @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
 	 * @param outputFileConfig the part file configuration.
 	 * @return The new Bucket.
 	 */
 	static <IN, BucketID> Bucket<IN, BucketID> getNew(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final OutputFileConfig outputFileConfig) {
-		return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
+		return new Bucket<>(subtaskIndex, bucketId, bucketPath, initialPartCounter, bucketWriter, rollingPolicy, outputFileConfig);
 	}
 
 	/**
 	 * Restores a {@code Bucket} from the state included in the provided {@link BucketState}.
-	 * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
 	 * @param subtaskIndex the index of the subtask creating the bucket.
 	 * @param initialPartCounter the initial counter for the part files of the bucket.
-	 * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
+	 * @param bucketWriter the {@link BucketWriter} used to write part files in the bucket.
 	 * @param bucketState the initial state of the restored bucket.
 	 * @param <IN> the type of input elements to the sink.
 	 * @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
@@ -397,13 +370,12 @@ public class Bucket<IN, BucketID> {
 	 * @return The restored Bucket.
 	 */
 	static <IN, BucketID> Bucket<IN, BucketID> restore(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
 			final OutputFileConfig outputFileConfig) throws IOException {
-		return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
+		return new Bucket<>(subtaskIndex, initialPartCounter, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 260e82c..6423627 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -32,20 +31,18 @@ import java.io.Serializable;
 interface BucketFactory<IN, BucketID> extends Serializable {
 
 	Bucket<IN, BucketID> getNewBucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final OutputFileConfig outputFileConfig) throws IOException;
 
 	Bucket<IN, BucketID> restoreBucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
 			final OutputFileConfig outputFileConfig) throws IOException;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index 1829381..75c00b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -46,30 +46,30 @@ class BucketState<BucketID> {
 	private final long inProgressFileCreationTime;
 
 	/**
-	 * A {@link RecoverableWriter.ResumeRecoverable} for the currently open
+	 * A {@link InProgressFileWriter.InProgressFileRecoverable} for the currently open
 	 * part file, or null if there is no currently open part file.
 	 */
 	@Nullable
-	private final RecoverableWriter.ResumeRecoverable inProgressResumableFile;
+	private final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable;
 
 	/**
 	 * The {@link RecoverableWriter.CommitRecoverable files} pending to be
 	 * committed, organized by checkpoint id.
 	 */
-	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> committableFilesPerCheckpoint;
+	private final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
 
 	BucketState(
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long inProgressFileCreationTime,
-			@Nullable final RecoverableWriter.ResumeRecoverable inProgressResumableFile,
-			final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint
+			@Nullable final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable,
+			final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint
 	) {
 		this.bucketId = Preconditions.checkNotNull(bucketId);
 		this.bucketPath = Preconditions.checkNotNull(bucketPath);
 		this.inProgressFileCreationTime = inProgressFileCreationTime;
-		this.inProgressResumableFile = inProgressResumableFile;
-		this.committableFilesPerCheckpoint = Preconditions.checkNotNull(pendingCommittablesPerCheckpoint);
+		this.inProgressFileRecoverable = inProgressFileRecoverable;
+		this.pendingFileRecoverablesPerCheckpoint = Preconditions.checkNotNull(pendingFileRecoverablesPerCheckpoint);
 	}
 
 	BucketID getBucketId() {
@@ -84,17 +84,17 @@ class BucketState<BucketID> {
 		return inProgressFileCreationTime;
 	}
 
-	boolean hasInProgressResumableFile() {
-		return inProgressResumableFile != null;
+	boolean hasInProgressFileRecoverable() {
+		return inProgressFileRecoverable != null;
 	}
 
 	@Nullable
-	RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
-		return inProgressResumableFile;
+	InProgressFileWriter.InProgressFileRecoverable getInProgressFileRecoverable() {
+		return inProgressFileRecoverable;
 	}
 
-	Map<Long, List<RecoverableWriter.CommitRecoverable>> getCommittableFilesPerCheckpoint() {
-		return committableFilesPerCheckpoint;
+	Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
+		return pendingFileRecoverablesPerCheckpoint;
 	}
 
 	@Override
@@ -105,13 +105,13 @@ class BucketState<BucketID> {
 				.append("BucketState for bucketId=").append(bucketId)
 				.append(" and bucketPath=").append(bucketPath);
 
-		if (hasInProgressResumableFile()) {
+		if (hasInProgressFileRecoverable()) {
 			strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime);
 		}
 
-		if (!committableFilesPerCheckpoint.isEmpty()) {
+		if (!pendingFileRecoverablesPerCheckpoint.isEmpty()) {
 			strBuilder.append(", has pending files for checkpoints: {");
-			for (long checkpointId: committableFilesPerCheckpoint.keySet()) {
+			for (long checkpointId: pendingFileRecoverablesPerCheckpoint.keySet()) {
 				strBuilder.append(checkpointId).append(' ');
 			}
 			strBuilder.append('}');
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index 04de246..5863a03 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
@@ -45,119 +44,172 @@ class BucketStateSerializer<BucketID> implements SimpleVersionedSerializer<Bucke
 
 	private static final int MAGIC_NUMBER = 0x1e764b79;
 
-	private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
+	private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
 
-	private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
+	private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
 
 	private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
 
 	BucketStateSerializer(
-			final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
-			final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer,
+			final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
+			final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
 			final SimpleVersionedSerializer<BucketID> bucketIdSerializer
 	) {
-		this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
-		this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
+		this.inProgressFileRecoverableSerializer = Preconditions.checkNotNull(inProgressFileRecoverableSerializer);
+		this.pendingFileRecoverableSerializer = Preconditions.checkNotNull(pendingFileRecoverableSerializer);
 		this.bucketIdSerializer = Preconditions.checkNotNull(bucketIdSerializer);
 	}
 
 	@Override
 	public int getVersion() {
-		return 1;
+		return 2;
 	}
 
 	@Override
 	public byte[] serialize(BucketState<BucketID> state) throws IOException {
 		DataOutputSerializer out = new DataOutputSerializer(256);
 		out.writeInt(MAGIC_NUMBER);
-		serializeV1(state, out);
+		serializeV2(state, out);
 		return out.getCopyOfBuffer();
 	}
 
 	@Override
 	public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException {
+		final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
 		switch (version) {
 			case 1:
-				DataInputDeserializer in = new DataInputDeserializer(serialized);
 				validateMagicNumber(in);
 				return deserializeV1(in);
+			case 2:
+				validateMagicNumber(in);
+				return deserializeV2(in);
 			default:
 				throw new IOException("Unrecognized version or corrupt state: " + version);
 		}
 	}
 
-	@VisibleForTesting
-	void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException {
-		SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out);
-		out.writeUTF(state.getBucketPath().toString());
-		out.writeLong(state.getInProgressFileCreationTime());
+	private void serializeV2(BucketState<BucketID> state, DataOutputView dataOutputView) throws IOException {
+		SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), dataOutputView);
+		dataOutputView.writeUTF(state.getBucketPath().toString());
+		dataOutputView.writeLong(state.getInProgressFileCreationTime());
 
 		// put the current open part file
-		if (state.hasInProgressResumableFile()) {
-			final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
-			out.writeBoolean(true);
-			SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, resumable, out);
-		}
-		else {
-			out.writeBoolean(false);
+		if (state.hasInProgressFileRecoverable()) {
+			final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
+			dataOutputView.writeBoolean(true);
+			SimpleVersionedSerialization.writeVersionAndSerialize(inProgressFileRecoverableSerializer, inProgressFileRecoverable, dataOutputView);
+		} else {
+			dataOutputView.writeBoolean(false);
 		}
 
 		// put the map of pending files per checkpoint
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getCommittableFilesPerCheckpoint();
+		final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverables = state.getPendingFileRecoverablesPerCheckpoint();
 
-		// manually keep the version here to safe some bytes
-		out.writeInt(commitableSerializer.getVersion());
+		dataOutputView.writeInt(pendingFileRecoverableSerializer.getVersion());
 
-		out.writeInt(pendingCommitters.size());
-		for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
-			List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
+		dataOutputView.writeInt(pendingFileRecoverables.size());
 
-			out.writeLong(resumablesForCheckpoint.getKey());
-			out.writeInt(resumables.size());
+		for (Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesForCheckpoint : pendingFileRecoverables.entrySet()) {
+			final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableList = pendingFilesForCheckpoint.getValue();
 
-			for (RecoverableWriter.CommitRecoverable resumable : resumables) {
-				byte[] serialized = commitableSerializer.serialize(resumable);
-				out.writeInt(serialized.length);
-				out.write(serialized);
+			dataOutputView.writeLong(pendingFilesForCheckpoint.getKey());
+			dataOutputView.writeInt(pendingFileRecoverableList.size());
+
+			for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : pendingFileRecoverableList) {
+				byte[] serialized = pendingFileRecoverableSerializer.serialize(pendingFileRecoverable);
+				dataOutputView.writeInt(serialized.length);
+				dataOutputView.write(serialized);
 			}
 		}
 	}
 
-	@VisibleForTesting
-	BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
+	private BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
+
+		final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer = getCommitableSerializer();
+		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer = getResumableSerializer();
+
 		final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in);
 		final String bucketPathStr = in.readUTF();
 		final long creationTime = in.readLong();
 
 		// then get the current resumable stream
-		RecoverableWriter.ResumeRecoverable current = null;
+		InProgressFileWriter.InProgressFileRecoverable current = null;
 		if (in.readBoolean()) {
-			current = SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in);
+			current =
+				new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(
+					SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in));
 		}
 
 		final int committableVersion = in.readInt();
 		final int numCheckpoints = in.readInt();
-		final HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<>(numCheckpoints);
+		final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablePerCheckpoint = new HashMap<>(numCheckpoints);
 
 		for (int i = 0; i < numCheckpoints; i++) {
 			final long checkpointId = in.readLong();
 			final int noOfResumables = in.readInt();
 
-			final List<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
+			final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(noOfResumables);
 			for (int j = 0; j < noOfResumables; j++) {
 				final byte[] bytes = new byte[in.readInt()];
 				in.readFully(bytes);
-				resumables.add(commitableSerializer.deserialize(committableVersion, bytes));
+				pendingFileRecoverables.add(
+					new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(commitableSerializer.deserialize(committableVersion, bytes)));
 			}
-			resumablesPerCheckpoint.put(checkpointId, resumables);
+			pendingFileRecoverablePerCheckpoint.put(checkpointId, pendingFileRecoverables);
 		}
 
 		return new BucketState<>(
-				bucketId,
-				new Path(bucketPathStr),
-				creationTime,
-				current,
-				resumablesPerCheckpoint);
+			bucketId,
+			new Path(bucketPathStr),
+			creationTime,
+			current,
+			pendingFileRecoverablePerCheckpoint);
+	}
+
+	private BucketState<BucketID> deserializeV2(DataInputView dataInputView) throws IOException {
+		final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, dataInputView);
+		final String bucketPathStr = dataInputView.readUTF();
+		final long creationTime = dataInputView.readLong();
+
+		// then get the current resumable stream
+		InProgressFileWriter.InProgressFileRecoverable current = null;
+		if (dataInputView.readBoolean()) {
+			current = SimpleVersionedSerialization.readVersionAndDeSerialize(inProgressFileRecoverableSerializer, dataInputView);
+		}
+
+		final int pendingFileRecoverableSerializerVersion = dataInputView.readInt();
+		final int numCheckpoints = dataInputView.readInt();
+		final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint = new HashMap<>(numCheckpoints);
+
+		for (int i = 0; i < numCheckpoints; i++) {
+			final long checkpointId = dataInputView.readLong();
+			final int numOfPendingFileRecoverables = dataInputView.readInt();
+
+			final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(numOfPendingFileRecoverables);
+			for (int j = 0; j < numOfPendingFileRecoverables; j++) {
+				final byte[] bytes = new byte[dataInputView.readInt()];
+				dataInputView.readFully(bytes);
+				pendingFileRecoverables.add(pendingFileRecoverableSerializer.deserialize(pendingFileRecoverableSerializerVersion, bytes));
+			}
+			pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverables);
+		}
+
+		return new BucketState<>(bucketId, new Path(bucketPathStr), creationTime, current, pendingFileRecoverablesPerCheckpoint);
+	}
+
+	private SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumableSerializer() {
+		final OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer
+			outputStreamBasedInProgressFileRecoverableSerializer =
+			(OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer) inProgressFileRecoverableSerializer;
+		return outputStreamBasedInProgressFileRecoverableSerializer.getResumeSerializer();
+	}
+
+	private SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitableSerializer() {
+		final OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer
+			outputStreamBasedPendingFileRecoverableSerializer =
+			(OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer) pendingFileRecoverableSerializer;
+		return outputStreamBasedPendingFileRecoverableSerializer.getCommitSerializer();
 	}
 
 	private static void validateMagicNumber(DataInputView in) throws IOException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
new file mode 100644
index 0000000..ed3a0e2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * An interface for factories that create the different {@link InProgressFileWriter writers}.
+ */
+@Internal
+interface BucketWriter<IN, BucketID> {
+
+	/**
+	 * Used to create a new {@link InProgressFileWriter}.
+	 * @param bucketID the id of the bucket this writer is writing to.
+	 * @param path the path this writer will write to.
+	 * @param creationTime the creation time of the file.
+	 * @return the new {@link InProgressFileWriter}
+	 * @throws IOException Thrown if creating a writer fails.
+	 */
+	InProgressFileWriter<IN, BucketID> openNewInProgressFile(
+			final BucketID bucketID,
+			final Path path,
+			final long creationTime) throws IOException;
+
+	/**
+	 * Used to resume a {@link InProgressFileWriter} from a {@link InProgressFileWriter.InProgressFileRecoverable}.
+	 * @param bucketID the id of the bucket this writer is writing to.
+	 * @param inProgressFileSnapshot the state of the part file.
+	 * @param creationTime the creation time of the file.
+	 * @return the resumed {@link InProgressFileWriter}
+	 * @throws IOException Thrown if resuming a writer fails.
+	 */
+	InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
+			final BucketID bucketID,
+			final InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
+			final long creationTime) throws IOException;
+
+	/**
+	 * @return the property of the {@link BucketWriter}
+	 */
+	WriterProperties getProperties();
+
+	/**
+	 * Recovers a pending file for finalizing and committing.
+	 * @param pendingFileRecoverable The handle with the recovery information.
+	 * @return A pending file
+	 * @throws IOException Thrown if recovering a pending file fails.
+	 */
+	PendingFile recoverPendingFile(final InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;
+
+	/**
+	 * Frees up any resources that were previously occupied in order to be able to
+	 * recover from a (potential) failure.
+	 *
+	 * <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
+	 * happen for any reason.
+	 *
+	 * @param inProgressFileRecoverable the {@link InProgressFileWriter.InProgressFileRecoverable} whose state we want to clean-up.
+	 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+	 * (e.g. the file to be deleted was not there for any reason - already deleted or never created).
+	 * @throws IOException if an I/O error occurs
+	 */
+	boolean cleanupInProgressFileRecoverable(final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
+
+	/**
+	 * This represents the file that can not write any data to.
+	 */
+	interface PendingFile {
+		/**
+		 * Commits the pending file, making it visible. The file will contain the exact data
+		 * as when the pending file was created.
+		 *
+		 * @throws IOException Thrown if committing fails.
+		 */
+		void commit() throws IOException;
+
+		/**
+		 * Commits the pending file, making it visible. The file will contain the exact data
+		 * as when the pending file was created.
+		 *
+		 * <p>This method tolerates situations where the file was already committed and
+		 * will not raise an exception in that case. This is important for idempotent
+		 * commit retries as they need to happen after recovery.
+		 *
+		 * @throws IOException Thrown if committing fails.
+		 */
+		void commitAfterRecovery() throws IOException;
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index f055798..0c9b73f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -21,9 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.Preconditions;
@@ -61,7 +59,7 @@ public class Buckets<IN, BucketID> {
 
 	private final BucketAssigner<IN, BucketID> bucketAssigner;
 
-	private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
+	private final BucketWriter<IN, BucketID> bucketWriter;
 
 	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
@@ -78,8 +76,6 @@ public class Buckets<IN, BucketID> {
 
 	private long maxPartCounter;
 
-	private final RecoverableWriter fsWriter;
-
 	private final OutputFileConfig outputFileConfig;
 
 	// --------------------------- State Related Fields -----------------------------
@@ -92,23 +88,23 @@ public class Buckets<IN, BucketID> {
 	 * @param basePath The base path for our buckets.
 	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
 	 * @param bucketFactory The {@link BucketFactory} to be used to create buckets.
-	 * @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
 	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
 	 */
 	Buckets(
 			final Path basePath,
 			final BucketAssigner<IN, BucketID> bucketAssigner,
 			final BucketFactory<IN, BucketID> bucketFactory,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			@Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener,
 			final int subtaskIndex,
-			final OutputFileConfig outputFileConfig) throws IOException {
+			final OutputFileConfig outputFileConfig) {
 
 		this.basePath = Preconditions.checkNotNull(basePath);
 		this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
 		this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
-		this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory);
+		this.bucketWriter = Preconditions.checkNotNull(bucketWriter);
 		this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
 		this.bucketLifeCycleListener = bucketLifeCycleListener;
 		this.subtaskIndex = subtaskIndex;
@@ -118,19 +114,10 @@ public class Buckets<IN, BucketID> {
 		this.activeBuckets = new HashMap<>();
 		this.bucketerContext = new Buckets.BucketerContext();
 
-		try {
-			this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
-		} catch (IOException e) {
-			LOG.error("Unable to create filesystem for path: {}", basePath);
-			throw e;
-		}
-
 		this.bucketStateSerializer = new BucketStateSerializer<>(
-				fsWriter.getResumeRecoverableSerializer(),
-				fsWriter.getCommitRecoverableSerializer(),
-				bucketAssigner.getSerializer()
-		);
-
+			bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+			bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+			bucketAssigner.getSerializer());
 		this.maxPartCounter = 0L;
 	}
 
@@ -185,10 +172,9 @@ public class Buckets<IN, BucketID> {
 
 		final Bucket<IN, BucketID> restoredBucket = bucketFactory
 				.restoreBucket(
-						fsWriter,
 						subtaskIndex,
 						maxPartCounter,
-						partFileWriterFactory,
+						bucketWriter,
 						rollingPolicy,
 						recoveredState,
 						outputFileConfig
@@ -238,7 +224,7 @@ public class Buckets<IN, BucketID> {
 			final ListState<Long> partCounterStateContainer) throws Exception {
 
 		Preconditions.checkState(
-				fsWriter != null && bucketStateSerializer != null,
+			bucketWriter != null && bucketStateSerializer != null,
 				"sink has not been initialized");
 
 		LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
@@ -308,12 +294,11 @@ public class Buckets<IN, BucketID> {
 		if (bucket == null) {
 			final Path bucketPath = assembleBucketPath(bucketId);
 			bucket = bucketFactory.getNewBucket(
-					fsWriter,
 					subtaskIndex,
 					bucketId,
 					bucketPath,
 					maxPartCounter,
-					partFileWriterFactory,
+					bucketWriter,
 					rollingPolicy,
 					outputFileConfig);
 			activeBuckets.put(bucketId, bucket);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
new file mode 100644
index 0000000..0f3cb9c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates {@link BulkPartWriter BulkPartWriters}.
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
+ */
+@Internal
+class BulkBucketWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
+
+	private final BulkWriter.Factory<IN> writerFactory;
+
+	BulkBucketWriter(final RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException {
+		super(recoverableWriter);
+		this.writerFactory = writerFactory;
+	}
+
+	@Override
+	public InProgressFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final RecoverableWriter.ResumeRecoverable resumable,
+			final long creationTime) throws IOException {
+
+		Preconditions.checkNotNull(stream);
+		Preconditions.checkNotNull(resumable);
+
+		final BulkWriter<IN> writer = writerFactory.create(stream);
+		return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+	}
+
+	@Override
+	public InProgressFileWriter<IN, BucketID> openNew(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final Path path,
+			final long creationTime) throws IOException {
+
+		Preconditions.checkNotNull(stream);
+		Preconditions.checkNotNull(path);
+
+		final BulkWriter<IN> writer = writerFactory.create(stream);
+		return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index a44b0e8..b1b7864 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -20,23 +20,21 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 
 /**
- * A {@link PartFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
+ * A {@link InProgressFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
  * This also implements the {@link PartFileInfo}.
  */
 @Internal
-final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID>  {
 
 	private final BulkWriter<IN> writer;
 
-	private BulkPartWriter(
+	BulkPartWriter(
 			final BucketID bucketId,
 			final RecoverableFsDataOutputStream currentPartStream,
 			final BulkWriter<IN> writer,
@@ -46,62 +44,20 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
 	}
 
 	@Override
-	void write(IN element, long currentTime) throws IOException {
+	public void write(IN element, long currentTime) throws IOException {
 		writer.addElement(element);
 		markWrite(currentTime);
 	}
 
 	@Override
-	RecoverableWriter.ResumeRecoverable persist() {
+	public InProgressFileRecoverable persist() {
 		throw new UnsupportedOperationException("Bulk Part Writers do not support \"pause and resume\" operations.");
 	}
 
 	@Override
-	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+	public PendingFileRecoverable closeForCommit() throws IOException {
 		writer.flush();
 		writer.finish();
 		return super.closeForCommit();
 	}
-
-	/**
-	 * A factory that creates {@link BulkPartWriter BulkPartWriters}.
-	 * @param <IN> The type of input elements.
-	 * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
-	 */
-	static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
-
-		private final BulkWriter.Factory<IN> writerFactory;
-
-		Factory(BulkWriter.Factory<IN> writerFactory) {
-			this.writerFactory = writerFactory;
-		}
-
-		@Override
-		public PartFileWriter<IN, BucketID> resumeFrom(
-				final BucketID bucketId,
-				final RecoverableFsDataOutputStream stream,
-				final RecoverableWriter.ResumeRecoverable resumable,
-				final long creationTime) throws IOException {
-
-			Preconditions.checkNotNull(stream);
-			Preconditions.checkNotNull(resumable);
-
-			final BulkWriter<IN> writer = writerFactory.create(stream);
-			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
-		}
-
-		@Override
-		public PartFileWriter<IN, BucketID> openNew(
-				final BucketID bucketId,
-				final RecoverableFsDataOutputStream stream,
-				final Path path,
-				final long creationTime) throws IOException {
-
-			Preconditions.checkNotNull(stream);
-			Preconditions.checkNotNull(path);
-
-			final BulkWriter<IN> writer = writerFactory.create(stream);
-			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
-		}
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index 529b93a..bb20b97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 
 import java.io.IOException;
 
@@ -34,41 +33,37 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
 
 	@Override
 	public Bucket<IN, BucketID> getNewBucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final OutputFileConfig outputFileConfig) {
 
 		return Bucket.getNew(
-				fsWriter,
 				subtaskIndex,
 				bucketId,
 				bucketPath,
 				initialPartCounter,
-				partFileWriterFactory,
+				bucketWriter,
 				rollingPolicy,
 				outputFileConfig);
 	}
 
 	@Override
 	public Bucket<IN, BucketID> restoreBucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
 			final OutputFileConfig outputFileConfig) throws IOException {
 
 		return Bucket.restore(
-				fsWriter,
 				subtaskIndex,
 				initialPartCounter,
-				partFileWriterFactory,
+				bucketWriter,
 				rollingPolicy,
 				bucketState,
 				outputFileConfig);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
new file mode 100644
index 0000000..60798d1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+/**
+ * The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file.
+ */
+@Internal
+interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+
+	/**
+	 * Write a element to the part file.
+	 * @param element the element to be written.
+	 * @param currentTime the writing time.
+	 * @throws IOException Thrown if writing the element fails.
+	 */
+	void write(final IN element, final long currentTime) throws IOException;
+
+	/**
+	 * @return The state of the current part file.
+	 * @throws IOException Thrown if persisting the part file fails.
+	 */
+	InProgressFileRecoverable persist() throws IOException;
+
+
+	/**
+	 * @return The state of the pending part file. {@link Bucket} uses this to commit the pending file.
+	 * @throws IOException Thrown if an I/O error occurs.
+	 */
+	PendingFileRecoverable closeForCommit() throws IOException;
+
+	/**
+	 * Dispose the part file.
+	 */
+	void dispose();
+
+	// ------------------------------------------------------------------------
+
+
+	 /**
+	 * A handle can be used to recover in-progress file..
+	 */
+	interface InProgressFileRecoverable extends PendingFileRecoverable {}
+
+
+	/**
+	 * The handle can be used to recover pending file.
+	 */
+	interface PendingFileRecoverable {}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
new file mode 100644
index 0000000..2d8c423
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.IOUtils;
+
+import java.io.IOException;
+
+/**
+ * The base class for all the part file writer that use {@link org.apache.flink.core.fs.RecoverableFsDataOutputStream}.
+ * @param <IN> the element type
+ * @param <BucketID> the bucket type
+ */
+public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
+
+	final RecoverableFsDataOutputStream currentPartStream;
+
+	OutputStreamBasedPartFileWriter(
+		final BucketID bucketID,
+		final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
+		final long createTime) {
+		super(bucketID, createTime);
+		this.currentPartStream = recoverableFsDataOutputStream;
+	}
+
+	@Override
+	public InProgressFileRecoverable persist() throws IOException {
+		return new OutputStreamBasedInProgressFileRecoverable(currentPartStream.persist());
+	}
+
+	@Override
+	public PendingFileRecoverable closeForCommit() throws IOException {
+		return new OutputStreamBasedPendingFileRecoverable(currentPartStream.closeForCommit().getRecoverable());
+	}
+
+	@Override
+	public void dispose() {
+		// we can suppress exceptions here, because we do not rely on close() to
+		// flush or persist any data
+		IOUtils.closeQuietly(currentPartStream);
+	}
+
+	@Override
+	public long getSize() throws IOException {
+		return currentPartStream.getPos();
+	}
+
+	abstract static class OutputStreamBasedBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
+
+		private final RecoverableWriter recoverableWriter;
+
+		OutputStreamBasedBucketWriter(final RecoverableWriter recoverableWriter) {
+			this.recoverableWriter = recoverableWriter;
+		}
+
+		@Override
+		public InProgressFileWriter<IN, BucketID> openNewInProgressFile(final BucketID bucketID, final Path path, final long creationTime) throws IOException {
+			return openNew(bucketID, recoverableWriter.open(path), path, creationTime);
+		}
+
+		@Override
+		public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(final BucketID bucketID, final InProgressFileRecoverable inProgressFileRecoverable, final long creationTime) throws IOException {
+			final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable;
+			return resumeFrom(
+				bucketID,
+				recoverableWriter.recover(outputStreamBasedInProgressRecoverable.getResumeRecoverable()),
+				outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
+				creationTime);
+		}
+
+		@Override
+		public PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException {
+			final RecoverableWriter.CommitRecoverable commitRecoverable;
+
+			if (pendingFileRecoverable instanceof OutputStreamBasedPendingFileRecoverable) {
+				commitRecoverable = ((OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable).getCommitRecoverable();
+			} else if (pendingFileRecoverable instanceof OutputStreamBasedInProgressFileRecoverable) {
+				commitRecoverable = ((OutputStreamBasedInProgressFileRecoverable) pendingFileRecoverable).getResumeRecoverable();
+			} else {
+				throw new IllegalArgumentException("can not recover from the pendingFileRecoverable");
+			}
+			return new OutputStreamBasedPendingFile(recoverableWriter.recoverForCommit(commitRecoverable));
+		}
+
+		@Override
+		public boolean cleanupInProgressFileRecoverable(InProgressFileRecoverable inProgressFileRecoverable) throws IOException {
+			final RecoverableWriter.ResumeRecoverable resumeRecoverable =
+				((OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable).getResumeRecoverable();
+			return recoverableWriter.cleanupRecoverableState(resumeRecoverable);
+		}
+
+		@Override
+		public WriterProperties getProperties() {
+			return new WriterProperties(
+					new OutputStreamBasedInProgressFileRecoverableSerializer(recoverableWriter.getResumeRecoverableSerializer()),
+					new OutputStreamBasedPendingFileRecoverableSerializer(recoverableWriter.getCommitRecoverableSerializer()),
+					recoverableWriter.supportsResume());
+		}
+
+		public abstract InProgressFileWriter<IN, BucketID> openNew(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final Path path,
+			final long creationTime) throws IOException;
+
+		public abstract InProgressFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final RecoverableWriter.ResumeRecoverable resumable,
+			final long creationTime) throws IOException;
+	}
+
+	static final class OutputStreamBasedPendingFileRecoverable implements PendingFileRecoverable {
+
+		private final RecoverableWriter.CommitRecoverable commitRecoverable;
+
+		OutputStreamBasedPendingFileRecoverable(final RecoverableWriter.CommitRecoverable commitRecoverable) {
+			this.commitRecoverable = commitRecoverable;
+		}
+
+		RecoverableWriter.CommitRecoverable getCommitRecoverable() {
+			return commitRecoverable;
+		}
+	}
+
+	static final class OutputStreamBasedInProgressFileRecoverable implements InProgressFileRecoverable {
+
+		private final RecoverableWriter.ResumeRecoverable resumeRecoverable;
+
+		OutputStreamBasedInProgressFileRecoverable(final RecoverableWriter.ResumeRecoverable resumeRecoverable) {
+			this.resumeRecoverable = resumeRecoverable;
+		}
+
+		RecoverableWriter.ResumeRecoverable getResumeRecoverable() {
+			return resumeRecoverable;
+		}
+	}
+
+	static final class OutputStreamBasedPendingFile implements BucketWriter.PendingFile {
+
+		private final RecoverableFsDataOutputStream.Committer committer;
+
+		OutputStreamBasedPendingFile(final RecoverableFsDataOutputStream.Committer committer) {
+			this.committer = committer;
+		}
+
+		@Override
+		public void commit() throws IOException {
+			committer.commit();
+		}
+
+		@Override
+		public void commitAfterRecovery() throws IOException {
+			committer.commitAfterRecovery();
+		}
+	}
+
+	static class OutputStreamBasedInProgressFileRecoverableSerializer implements SimpleVersionedSerializer<InProgressFileRecoverable> {
+
+		private static final int MAGIC_NUMBER = 0xb3a4073d;
+
+		private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer;
+
+		OutputStreamBasedInProgressFileRecoverableSerializer(SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer) {
+			this.resumeSerializer = resumeSerializer;
+		}
+
+		@Override
+		public int getVersion() {
+			return 1;
+		}
+
+		@Override
+		public byte[] serialize(InProgressFileRecoverable inProgressRecoverable) throws IOException {
+			OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressRecoverable;
+			DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
+			dataOutputSerializer.writeInt(MAGIC_NUMBER);
+			serializeV1(outputStreamBasedInProgressRecoverable, dataOutputSerializer);
+			return dataOutputSerializer.getCopyOfBuffer();
+		}
+
+		@Override
+		public InProgressFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+			switch (version) {
+				case 1:
+					DataInputView dataInputView = new DataInputDeserializer(serialized);
+					validateMagicNumber(dataInputView);
+					return deserializeV1(dataInputView);
+				default:
+					throw new IOException("Unrecognized version or corrupt state: " + version);
+			}
+		}
+
+		SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeSerializer() {
+			return resumeSerializer;
+		}
+
+		private void serializeV1(final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable, final DataOutputView dataOutputView) throws IOException {
+			SimpleVersionedSerialization.writeVersionAndSerialize(resumeSerializer, outputStreamBasedInProgressRecoverable.getResumeRecoverable(), dataOutputView);
+		}
+
+		private OutputStreamBasedInProgressFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
+			return new OutputStreamBasedInProgressFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(resumeSerializer, dataInputView));
+		}
+
+		private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
+			final int magicNumber = dataInputView.readInt();
+			if (magicNumber != MAGIC_NUMBER) {
+				throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+			}
+		}
+	}
+
+	static class OutputStreamBasedPendingFileRecoverableSerializer implements SimpleVersionedSerializer<PendingFileRecoverable> {
+
+		private static final int MAGIC_NUMBER = 0x2c853c89;
+
+		private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer;
+
+		OutputStreamBasedPendingFileRecoverableSerializer(final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer) {
+			this.commitSerializer = commitSerializer;
+		}
+
+		@Override
+		public int getVersion() {
+			return 1;
+		}
+
+		@Override
+		public byte[] serialize(PendingFileRecoverable pendingFileRecoverable) throws IOException {
+			OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable = (OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable;
+			DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
+			dataOutputSerializer.writeInt(MAGIC_NUMBER);
+			serializeV1(outputStreamBasedPendingFileRecoverable, dataOutputSerializer);
+			return dataOutputSerializer.getCopyOfBuffer();
+		}
+
+		@Override
+		public PendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+			switch (version) {
+				case 1:
+					DataInputDeserializer in = new DataInputDeserializer(serialized);
+					validateMagicNumber(in);
+					return deserializeV1(in);
+
+				default:
+					throw new IOException("Unrecognized version or corrupt state: " + version);
+			}
+		}
+
+		SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitSerializer() {
+			return this.commitSerializer;
+		}
+
+		private void serializeV1(final OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable, final DataOutputView dataOutputView) throws IOException {
+			SimpleVersionedSerialization.writeVersionAndSerialize(commitSerializer, outputStreamBasedPendingFileRecoverable.getCommitRecoverable(), dataOutputView);
+		}
+
+		private OutputStreamBasedPendingFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
+			return new OutputStreamBasedPendingFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(commitSerializer, dataInputView));
+		}
+
+		private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
+			final int magicNumber = dataInputView.readInt();
+			if (magicNumber != MAGIC_NUMBER) {
+				throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+			}
+		}
+	}
+
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
deleted file mode 100644
index 95a2978a..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * An abstract writer for the currently open part file in a specific {@link Bucket}.
- *
- * <p>Currently, there are two subclasses, of this class:
- * <ol>
- *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
- *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
- * </ol>
- *
- * <p>This also implements the {@link PartFileInfo}.
- */
-@Internal
-abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
-
-	private final BucketID bucketId;
-
-	private final long creationTime;
-
-	protected final RecoverableFsDataOutputStream currentPartStream;
-
-	private long lastUpdateTime;
-
-	protected PartFileWriter(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream currentPartStream,
-			final long creationTime) {
-
-		Preconditions.checkArgument(creationTime >= 0L);
-		this.bucketId = Preconditions.checkNotNull(bucketId);
-		this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
-		this.creationTime = creationTime;
-		this.lastUpdateTime = creationTime;
-	}
-
-	abstract void write(IN element, long currentTime) throws IOException;
-
-	RecoverableWriter.ResumeRecoverable persist() throws IOException {
-		return currentPartStream.persist();
-	}
-
-	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
-		return currentPartStream.closeForCommit().getRecoverable();
-	}
-
-	void dispose() {
-		// we can suppress exceptions here, because we do not rely on close() to
-		// flush or persist any data
-		IOUtils.closeQuietly(currentPartStream);
-	}
-
-	void markWrite(long now) {
-		this.lastUpdateTime = now;
-	}
-
-	@Override
-	public BucketID getBucketId() {
-		return bucketId;
-	}
-
-	@Override
-	public long getCreationTime() {
-		return creationTime;
-	}
-
-	@Override
-	public long getSize() throws IOException {
-		return currentPartStream.getPos();
-	}
-
-	@Override
-	public long getLastUpdateTime() {
-		return lastUpdateTime;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * An interface for factories that create the different {@link PartFileWriter writers}.
-	 */
-	interface PartFileFactory<IN, BucketID> {
-
-		/**
-		 * Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
-		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
-		 * @param resumable the state of the stream we are resurrecting.
-		 * @param creationTime the creation time of the stream.
-		 * @return the recovered {@link PartFileWriter writer}.
-		 * @throws IOException
-		 */
-		PartFileWriter<IN, BucketID> resumeFrom(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream stream,
-			final RecoverableWriter.ResumeRecoverable resumable,
-			final long creationTime) throws IOException;
-
-		/**
-		 * Used to create a new {@link PartFileWriter writer}.
-		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
-		 * @param path the part this writer will write to.
-		 * @param creationTime the creation time of the stream.
-		 * @return the new {@link PartFileWriter writer}.
-		 * @throws IOException
-		 */
-		PartFileWriter<IN, BucketID> openNew(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream stream,
-			final Path path,
-			final long creationTime) throws IOException;
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
new file mode 100644
index 0000000..784f8be
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
+ */
+@Internal
+class RowWiseBucketWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
+
+	private final Encoder<IN> encoder;
+
+	RowWiseBucketWriter(final RecoverableWriter recoverableWriter, final Encoder<IN> encoder) {
+		super(recoverableWriter);
+		this.encoder = encoder;
+	}
+
+	@Override
+	public InProgressFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final RecoverableWriter.ResumeRecoverable resumable,
+			final long creationTime) {
+
+		Preconditions.checkNotNull(stream);
+		Preconditions.checkNotNull(resumable);
+
+		return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+	}
+
+	@Override
+	public InProgressFileWriter<IN, BucketID> openNew(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final Path path,
+			final long creationTime) {
+
+		Preconditions.checkNotNull(stream);
+		Preconditions.checkNotNull(path);
+
+		return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 05c160c..bed9ec7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -20,23 +20,21 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 
 /**
- * A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}.
+ * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}.
  * This also implements the {@link PartFileInfo}.
  */
 @Internal
-final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
 
 	private final Encoder<IN> encoder;
 
-	private RowWisePartWriter(
+	RowWisePartWriter(
 			final BucketID bucketId,
 			final RecoverableFsDataOutputStream currentPartStream,
 			final Encoder<IN> encoder,
@@ -46,48 +44,8 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
 	}
 
 	@Override
-	void write(IN element, long currentTime) throws IOException {
+	public void write(final IN element, final long currentTime) throws IOException {
 		encoder.encode(element, currentPartStream);
 		markWrite(currentTime);
 	}
-
-	/**
-	 * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
-	 * @param <IN> The type of input elements.
-	 * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
-	 */
-	static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
-
-		private final Encoder<IN> encoder;
-
-		Factory(Encoder<IN> encoder) {
-			this.encoder = encoder;
-		}
-
-		@Override
-		public PartFileWriter<IN, BucketID> resumeFrom(
-				final BucketID bucketId,
-				final RecoverableFsDataOutputStream stream,
-				final RecoverableWriter.ResumeRecoverable resumable,
-				final long creationTime) throws IOException {
-
-			Preconditions.checkNotNull(stream);
-			Preconditions.checkNotNull(resumable);
-
-			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
-		}
-
-		@Override
-		public PartFileWriter<IN, BucketID> openNew(
-				final BucketID bucketId,
-				final RecoverableFsDataOutputStream stream,
-				final Path path,
-				final long creationTime) throws IOException {
-
-			Preconditions.checkNotNull(stream);
-			Preconditions.checkNotNull(path);
-
-			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
-		}
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index cb58529..64e4418 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -277,7 +277,7 @@ public class StreamingFileSink<IN>
 					basePath,
 					bucketAssigner,
 					bucketFactory,
-					new RowWisePartWriter.Factory<>(encoder),
+					new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder),
 					rollingPolicy,
 					bucketLifeCycleListener,
 					subtaskIndex,
@@ -397,7 +397,7 @@ public class StreamingFileSink<IN>
 					basePath,
 					bucketAssigner,
 					bucketFactory,
-					new BulkPartWriter.Factory<>(writerFactory),
+					new BulkBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory),
 					rollingPolicy,
 					bucketLifeCycleListener,
 					subtaskIndex,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java
new file mode 100644
index 0000000..4fee03c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class describes the property of the {@link BucketWriter}.
+ */
+@Internal
+public class WriterProperties {
+
+	private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
+
+	private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
+
+	private final boolean supportsResume;
+
+	WriterProperties(
+			SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
+			SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
+			boolean supportsResume) {
+		this.inProgressFileRecoverableSerializer = checkNotNull(inProgressFileRecoverableSerializer);
+		this.pendingFileRecoverableSerializer = checkNotNull(pendingFileRecoverableSerializer);
+		this.supportsResume = supportsResume;
+	}
+
+	/**
+	 * @return Whether the {@link BucketWriter} support appending data to the restored the in-progress file or not.
+	 */
+	boolean supportsResume() {
+		return supportsResume;
+	}
+
+	/**
+	 * @return the serializer for the {@link InProgressFileWriter.PendingFileRecoverable}.
+	 */
+	SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverableSerializer() {
+		return pendingFileRecoverableSerializer;
+	}
+
+	/**
+	 * @return the serializer for the {@link InProgressFileWriter.InProgressFileRecoverable}.
+	 */
+	SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> getInProgressFileRecoverableSerializer() {
+		return inProgressFileRecoverableSerializer;
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index f48e467..ff2cc5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
@@ -54,7 +55,7 @@ public class BucketAssignerITCases {
 			basePath,
 			new BasePathBucketAssigner<>(),
 			new DefaultBucketFactoryImpl<>(),
-			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+			new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 			rollingPolicy,
 			null,
 			0,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 81c5766..cbd18c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
@@ -136,7 +135,7 @@ public class BucketStateSerializerTest {
 
 		Assert.assertEquals(testBucketPath, bucket.getBucketPath());
 		Assert.assertNull(bucket.getInProgressPart());
-		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+		Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
 	}
 
 	@Test
@@ -266,8 +265,8 @@ public class BucketStateSerializerTest {
 			final int noOfPendingCheckpoints = 5;
 
 			// there are 5 checkpoint does not complete.
-			final Map<Long, List<RecoverableWriter.CommitRecoverable>>
-				pendingFileRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
+			final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>>
+				pendingFileRecoverables = recoveredState.getPendingFileRecoverablesPerCheckpoint();
 			Assert.assertEquals(5L, pendingFileRecoverables.size());
 
 			final Set<String> beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID))
@@ -283,7 +282,7 @@ public class BucketStateSerializerTest {
 			// recover and commit
 			final Bucket bucket = restoreBucket(noOfPendingCheckpoints + 1, recoveredState);
 			Assert.assertEquals(testBucketPath, bucket.getBucketPath());
-			Assert.assertEquals(0, bucket.getPendingPartsPerCheckpoint().size());
+			Assert.assertEquals(0, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
 
 			final Set<String> afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID))
 				.map(file -> file.getFileName().toString())
@@ -313,27 +312,37 @@ public class BucketStateSerializerTest {
 
 	private static Bucket<String, String> createNewBucket(final Path bucketPath) throws IOException {
 		return Bucket.getNew(
-			FileSystem.getLocalFileSystem().createRecoverableWriter(),
 			0,
 			BUCKET_ID,
 			bucketPath,
 			0,
-			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+			createBucketWriter(),
 			DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
 			OutputFileConfig.builder().build());
 	}
 
 	private static Bucket<String, String> restoreBucket(final int initialPartCounter, final BucketState<String> bucketState) throws IOException {
 		return Bucket.restore(
-			FileSystem.getLocalFileSystem().createRecoverableWriter(),
 			0,
 			initialPartCounter,
-			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+			createBucketWriter(),
 			DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
 			bucketState,
 			OutputFileConfig.builder().build());
 	}
 
+	private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
+		return new RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(), new SimpleStringEncoder<>());
+	}
+
+	private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
+		final RowWiseBucketWriter bucketWriter = createBucketWriter();
+		return new BucketStateSerializer<>(
+			bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+			bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+			SimpleVersionedStringSerializer.INSTANCE);
+	}
+
 	private static BucketState<String> readBucketState(final String scenarioName, final int version) throws IOException {
 		byte[] bytes = Files.readAllBytes(getSnapshotPath(scenarioName, version));
 		return SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), bytes);
@@ -351,14 +360,6 @@ public class BucketStateSerializerTest {
 		return readBucketState(scenarioName, version);
 	}
 
-	private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
-		RecoverableWriter recoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
-		return new BucketStateSerializer<>(
-			recoverableWriter.getResumeRecoverableSerializer(),
-			recoverableWriter.getCommitRecoverableSerializer(),
-			SimpleVersionedStringSerializer.INSTANCE);
-	}
-
 	private static void moveToTemplateDirectory(java.nio.file.Path scenarioPath) throws IOException {
 		FileUtils.copy(new Path(scenarioPath.toString()), new Path(scenarioPath.toString() + "-template"), false);
 		FileUtils.deleteDirectory(scenarioPath.toFile());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index ee85e55..a4d9a09 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
+import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -171,7 +172,7 @@ public class BucketTest {
 		return new TypeSafeMatcher<BucketState<String>>() {
 			@Override
 			protected boolean matchesSafely(BucketState<String> state) {
-				return state.getInProgressResumableFile() != null;
+				return state.getInProgressFileRecoverable() != null;
 			}
 
 			@Override
@@ -185,7 +186,7 @@ public class BucketTest {
 		return new TypeSafeMatcher<BucketState<String>>() {
 			@Override
 			protected boolean matchesSafely(BucketState<String> state) {
-				return state.getInProgressResumableFile() == null;
+				return state.getInProgressFileRecoverable() == null;
 			}
 
 			@Override
@@ -200,7 +201,7 @@ public class BucketTest {
 		return new TypeSafeMatcher<Bucket<String, String>>() {
 			@Override
 			protected boolean matchesSafely(Bucket<String, String> bucket) {
-				final PartFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
+				final InProgressFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
 				return isNull == (inProgressPart == null);
 			}
 
@@ -349,23 +350,21 @@ public class BucketTest {
 
 	private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
 
-	private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
-			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
+	private static final Encoder ENCODER = new SimpleStringEncoder<>();
 
 	private static Bucket<String, String> createBucket(
 			final RecoverableWriter writer,
 			final Path bucketPath,
 			final int subtaskIdx,
 			final int initialPartCounter,
-			final OutputFileConfig outputFileConfig) {
+			final OutputFileConfig outputFileConfig) throws IOException {
 
 		return Bucket.getNew(
-				writer,
 				subtaskIdx,
 				bucketId,
 				bucketPath,
 				initialPartCounter,
-				partFileFactory,
+				new RowWiseBucketWriter<>(writer, ENCODER),
 				rollingPolicy,
 				outputFileConfig);
 	}
@@ -378,10 +377,9 @@ public class BucketTest {
 			final OutputFileConfig outputFileConfig) throws Exception {
 
 		return Bucket.restore(
-				writer,
 				subtaskIndex,
 				initialPartCounter,
-				partFileFactory,
+				new RowWiseBucketWriter<>(writer, ENCODER),
 				rollingPolicy,
 				bucketState,
 				outputFileConfig);
@@ -402,24 +400,46 @@ public class BucketTest {
 
 	private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws IOException {
 		final BucketState<String> stateWithOnlyInProgressFile =
-				new BucketState<>("test", new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
-		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, OutputFileConfig.builder().build());
+				new BucketState<>(
+					"test",
+					new Path(),
+					12345L,
+					new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(new NoOpRecoverable()),
+					new HashMap<>());
+
+		return Bucket.restore(
+			0,
+			1L,
+			new RowWiseBucketWriter<>(writer, ENCODER),
+			rollingPolicy,
+			stateWithOnlyInProgressFile,
+			OutputFileConfig.builder().build());
 	}
 
 	private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int numberOfPendingParts) throws IOException {
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> completePartsPerCheckpoint =
+		final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> completePartsPerCheckpoint =
 				createPendingPartsPerCheckpoint(numberOfPendingParts);
 
 		final BucketState<String> initStateWithOnlyInProgressFile =
-				new BucketState<>("test", new Path(), 12345L, null, completePartsPerCheckpoint);
-		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
+				new BucketState<>(
+					"test",
+					new Path(),
+					12345L,
+					null,
+					completePartsPerCheckpoint);
+		return Bucket.restore(
+			0,
+			1L,
+			new RowWiseBucketWriter<>(writer, ENCODER),
+			rollingPolicy,
+			initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
 	}
 
-	private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
+	private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
+		final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
 		for (int checkpointId = 0; checkpointId < noOfCheckpoints; checkpointId++) {
-			final List<RecoverableWriter.CommitRecoverable> pending = new ArrayList<>();
-			pending.add(new NoOpRecoverable());
+			final List<InProgressFileWriter.PendingFileRecoverable> pending = new ArrayList<>();
+			pending.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new NoOpRecoverable()));
 			pendingCommittablesPerCheckpoint.put((long) checkpointId, pending);
 		}
 		return pendingCommittablesPerCheckpoint;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index e996444..8e2117a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.MockListState;
@@ -93,8 +94,8 @@ public class BucketsTest {
 				return bucket.getBucketId().equals(bucketId) &&
 						bucket.getBucketPath().equals(new Path(testTmpPath, bucketId)) &&
 						bucket.getInProgressPart() == null &&
-						bucket.getPendingPartsForCurrentCheckpoint().isEmpty() &&
-						bucket.getPendingPartsPerCheckpoint().size() == 1;
+						bucket.getPendingFileRecoverablesForCurrentCheckpoint().isEmpty() &&
+						bucket.getPendingFileRecoverablesPerCheckpoint().size() == 1;
 			}
 
 			@Override
@@ -145,7 +146,7 @@ public class BucketsTest {
 		Assert.assertEquals(2L, bucketsTwo.getMaxPartCounter());
 
 		// make sure we have one in-progress file here and a pending
-		Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingPartsPerCheckpoint().size());
+		Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingFileRecoverablesPerCheckpoint().size());
 		Assert.assertNotNull(bucketsTwo.getActiveBuckets().get("test1").getInProgressPart());
 
 		final ListState<byte[]> mergedBucketStateContainer = new MockListState<>();
@@ -175,10 +176,10 @@ public class BucketsTest {
 
 		// this is due to the Bucket#merge(). The in progress file of one
 		// of the previous tasks is put in the list of pending files.
-		Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
+		Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
 
 		// we commit the pending for previous checkpoints
-		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+		Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
 	}
 
 	@Test
@@ -210,8 +211,8 @@ public class BucketsTest {
 		Assert.assertEquals("test", bucket.getBucketId());
 
 		Assert.assertNull(bucket.getInProgressPart());
-		Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
-		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+		Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
+		Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
 	}
 
 	@Test
@@ -321,7 +322,7 @@ public class BucketsTest {
 				path,
 				new VerifyingBucketAssigner(timestamp, watermark, processingTime),
 				new DefaultBucketFactoryImpl<>(),
-				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				new RowWiseBucketWriter<>(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				DefaultRollingPolicy.builder().build(),
 				null,
 				2,
@@ -458,7 +459,7 @@ public class BucketsTest {
 				basePath,
 				new TestUtils.StringIdentityBucketAssigner(),
 				new DefaultBucketFactoryImpl<>(),
-				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				rollingPolicy,
 				bucketLifeCycleListener,
 				subtaskIdx,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 59ea627..2a4da34 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
@@ -201,7 +202,7 @@ public class RollingPolicyTest {
 				basePath,
 				new TestUtils.StringIdentityBucketAssigner(),
 				new DefaultBucketFactoryImpl<>(),
-				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				rollingPolicyToTest,
 				null,
 				0,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index c540dc73..df678c5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
@@ -50,6 +49,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
+
 /**
  * Utilities for the {@link StreamingFileSink} tests.
  */
@@ -158,7 +159,7 @@ public class TestUtils {
 			.forBulkFormat(new Path(outDir.toURI()), writer)
 			.withBucketAssigner(bucketer)
 			.withBucketCheckInterval(bucketCheckInterval)
-			.withRollingPolicy(OnCheckpointRollingPolicy.build())
+			.withRollingPolicy(build())
 			.withBucketFactory(bucketFactory)
 			.withOutputFileConfig(outputFileConfig)
 			.build();
@@ -199,7 +200,7 @@ public class TestUtils {
 		StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink
 				.forBulkFormat(new Path(outDir.toURI()), writer)
 				.withNewBucketAssigner(bucketer)
-				.withRollingPolicy(OnCheckpointRollingPolicy.build())
+				.withRollingPolicy(build())
 				.withBucketCheckInterval(bucketCheckInterval)
 				.withBucketFactory(bucketFactory)
 				.withOutputFileConfig(outputFileConfig)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
index e21da2a..6260a2c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
@@ -50,7 +50,7 @@ public class NoOpRecoverableWriter implements RecoverableWriter {
 
 	@Override
 	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
-		throw new UnsupportedOperationException();
+		return false;
 	}
 
 	@Override