You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/18 14:14:31 UTC

[flink] branch release-1.11 updated (d8a77cb -> 4349e93)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d8a77cb  [FLINK-17792][tests] Catch and log exception if jstack fails
     new a2deff2  [minor] Allow relative paths in LocalFileSystem
     new b8fab2f  [FLINK-17593][Connectors/FileSystem] Turn BucketStateSerializerTest into an upgrade test
     new 4d684df  [FLINK-17593][Connectors/FileSystem] Support arbitrary recovery mechanism for PartFileWriter
     new 4349e93  [FLINK-17593] Update BucketStateSerializerTest for v2

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/core/fs/RecoverableWriter.java    |   6 +-
 .../flink/core/fs/local/LocalFileSystem.java       |   8 +-
 .../core/fs/local/LocalRecoverableSerializer.java  |   4 +-
 .../core/fs/local/LocalRecoverableWriter.java      |   3 +-
 .../runtime/fs/hdfs/HadoopRecoverableWriter.java   |   2 +-
 .../sink/filesystem/AbstractPartFileWriter.java    |  58 +++
 .../api/functions/sink/filesystem/Bucket.java      | 166 ++++----
 .../functions/sink/filesystem/BucketFactory.java   |   7 +-
 .../api/functions/sink/filesystem/BucketState.java |  32 +-
 .../sink/filesystem/BucketStateSerializer.java     | 146 ++++---
 .../functions/sink/filesystem/BucketWriter.java    | 109 ++++++
 .../api/functions/sink/filesystem/Buckets.java     |  37 +-
 .../sink/filesystem/BulkBucketWriter.java          |  72 ++++
 .../functions/sink/filesystem/BulkPartWriter.java  |  56 +--
 .../sink/filesystem/DefaultBucketFactoryImpl.java  |  13 +-
 .../sink/filesystem/InProgressFileWriter.java      |  70 ++++
 .../OutputStreamBasedPartFileWriter.java           | 296 ++++++++++++++
 .../functions/sink/filesystem/PartFileWriter.java  | 141 -------
 .../sink/filesystem/RowWiseBucketWriter.java       |  68 ++++
 .../sink/filesystem/RowWisePartWriter.java         |  50 +--
 .../sink/filesystem/StreamingFileSink.java         |   4 +-
 .../sink/filesystem/WriterProperties.java          |  67 ++++
 .../sink/filesystem/BucketAssignerITCases.java     |   3 +-
 .../sink/filesystem/BucketStateSerializerTest.java | 427 +++++++++++++--------
 .../api/functions/sink/filesystem/BucketTest.java  |  58 ++-
 .../api/functions/sink/filesystem/BucketsTest.java |  19 +-
 .../sink/filesystem/RollingPolicyTest.java         |   3 +-
 .../api/functions/sink/filesystem/TestUtils.java   |   7 +-
 .../filesystem/utils/NoOpRecoverableWriter.java    |   2 +-
 .../bucket-state-migration-test/empty-v1/snapshot  | Bin 0 -> 128 bytes
 .../bucket-state-migration-test/empty-v2/snapshot  | Bin 0 -> 128 bytes
 ...inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4 |   2 +
 ...inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5 |   2 +
 ...inprogress.6729a640-0585-4785-a652-89802950c663 |   2 +
 ...inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d |   2 +
 ...inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac |   2 +
 .../full-no-in-progress-v1-template/snapshot       | Bin 0 -> 1537 bytes
 ...inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec |   2 +
 ...inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec |   2 +
 ...inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b |   2 +
 ...inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2 |   2 +
 ...inprogress.666acf3e-935c-4621-8171-f7c897496524 |   2 +
 .../full-no-in-progress-v2-template/snapshot       | Bin 0 -> 1597 bytes
 ...inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82 |   2 +
 ...inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c |   2 +
 ...inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97 |   2 +
 ...inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9 |   2 +
 ...inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7 |   2 +
 ...inprogress.32f5a28f-20e1-48da-9951-10e795133d64 |   1 +
 .../full-v1-template/snapshot                      | Bin 0 -> 1613 bytes
 ...inprogress.9731063e-2b28-4701-8cc1-e706480b8022 |   2 +
 ...inprogress.1d423406-097a-4deb-bfde-d023d3477cd5 |   2 +
 ...inprogress.6a837aa3-4736-4098-a878-fdeffe227628 |   2 +
 ...inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c |   2 +
 ...inprogress.a156884a-f090-4c3f-a271-0b63ab539c45 |   2 +
 ...inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a |   1 +
 .../full-v2-template/snapshot                      | Bin 0 -> 1685 bytes
 ...inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0 |   1 +
 .../only-in-progress-v1/snapshot                   | Bin 0 -> 404 bytes
 ...inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef |   1 +
 .../only-in-progress-v2/snapshot                   | Bin 0 -> 416 bytes
 61 files changed, 1325 insertions(+), 653 deletions(-)
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
 delete mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef
 create mode 100644 flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot


[flink] 02/04: [FLINK-17593][Connectors/FileSystem] Turn BucketStateSerializerTest into an upgrade test

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8fab2ffdc8f07b4fb7043dd96b25633f8e2eed9
Author: GuoWei Ma <gu...@gmail.com>
AuthorDate: Mon May 18 15:19:07 2020 +0800

    [FLINK-17593][Connectors/FileSystem] Turn BucketStateSerializerTest into an upgrade test
---
 .../sink/filesystem/BucketStateSerializerTest.java | 424 +++++++++++++--------
 .../bucket-state-migration-test/empty-v1/snapshot  | Bin 0 -> 128 bytes
 ...inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4 |   2 +
 ...inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5 |   2 +
 ...inprogress.6729a640-0585-4785-a652-89802950c663 |   2 +
 ...inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d |   2 +
 ...inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac |   2 +
 .../full-no-in-progress-v1-template/snapshot       | Bin 0 -> 1537 bytes
 ...inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82 |   2 +
 ...inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c |   2 +
 ...inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97 |   2 +
 ...inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9 |   2 +
 ...inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7 |   2 +
 ...inprogress.32f5a28f-20e1-48da-9951-10e795133d64 |   1 +
 .../full-v1-template/snapshot                      | Bin 0 -> 1613 bytes
 ...inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0 |   1 +
 .../only-in-progress-v1/snapshot                   | Bin 0 -> 404 bytes
 17 files changed, 282 insertions(+), 164 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index f2c1f8b..81c5766 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -18,253 +18,349 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.util.FileUtils;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.iterableWithSize;
+import static org.junit.Assert.assertThat;
 
 /**
- * Tests for the {@link BucketStateSerializer}.
+ * Tests for the {@link BucketStateSerializer} that verify we can still read snapshots written using
+ * an older version of the serializer. We keep snapshots for all previous versions in version
+ * control (including the current version). The tests verify that the current version of the
+ * serializer can still read data from all previous versions.
  */
+@RunWith(Parameterized.class)
 public class BucketStateSerializerTest {
 
+	private static final int CURRENT_VERSION = 1;
+
+	@Parameterized.Parameters(name = "Previous Version = {0}")
+	public static Collection<Integer> previousVersions() {
+		return Arrays.asList(1);
+	}
+
+	@Parameterized.Parameter
+	public Integer previousVersion;
+
 	private static final String IN_PROGRESS_CONTENT = "writing";
 	private static final String PENDING_CONTENT = "wrote";
 
+	private static final String BUCKET_ID = "test-bucket";
+
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
+	private static java.nio.file.Path getResourcePath(
+		String scenarioName,
+		int version) {
+		return Paths.get("src/test/resources/")
+			.resolve("bucket-state-migration-test")
+			.resolve(scenarioName + "-v" + version);
+	}
+
+	private static java.nio.file.Path getSnapshotPath(
+		String scenarioName,
+		int version) {
+		java.nio.file.Path basePath = getResourcePath(scenarioName, version);
+		return basePath.resolve("snapshot");
+	}
+
+	private static java.nio.file.Path getOutputPath(String scenarioName, int version) {
+		java.nio.file.Path basePath = getResourcePath(scenarioName, version);
+		return basePath.resolve("bucket");
+	}
+
 	@Test
-	public void testSerializationEmpty() throws IOException {
-		final File testFolder = tempFolder.newFolder();
-		final FileSystem fs = FileSystem.get(testFolder.toURI());
-		final RecoverableWriter writer = fs.createRecoverableWriter();
+	@Ignore
+	public void prepareDeserializationEmpty() throws IOException {
 
-		final Path testBucket = new Path(testFolder.getPath(), "test");
+		final String scenarioName = "empty";
+		final java.nio.file.Path scenarioPath = getResourcePath(scenarioName, CURRENT_VERSION);
 
-		final BucketState<String> bucketState = new BucketState<>(
-				"test", testBucket, Long.MAX_VALUE, null, new HashMap<>());
+		FileUtils.deleteDirectory(scenarioPath.toFile());
+		Files.createDirectories(scenarioPath);
 
-		final SimpleVersionedSerializer<BucketState<String>> serializer =
-				new BucketStateSerializer<>(
-						writer.getResumeRecoverableSerializer(),
-						writer.getCommitRecoverableSerializer(),
-						SimpleVersionedStringSerializer.INSTANCE
-				);
+		final java.nio.file.Path outputPath = getOutputPath(scenarioName, CURRENT_VERSION);
+		final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
 
-		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
-		final BucketState<String> recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+		final Bucket<String, String> bucket =
+			createNewBucket(testBucketPath);
 
-		Assert.assertEquals(testBucket, recoveredState.getBucketPath());
-		Assert.assertNull(recoveredState.getInProgressResumableFile());
-		Assert.assertTrue(recoveredState.getCommittableFilesPerCheckpoint().isEmpty());
+		final BucketState<String> bucketState = bucket.onReceptionOfCheckpoint(0);
+
+		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(
+			bucketStateSerializer(),
+			bucketState);
+		Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), bytes);
 	}
 
 	@Test
-	public void testSerializationOnlyInProgress() throws IOException {
-		final File testFolder = tempFolder.newFolder();
-		final FileSystem fs = FileSystem.get(testFolder.toURI());
+	public void testSerializationEmpty() throws IOException {
 
-		final Path testBucket = new Path(testFolder.getPath(), "test");
+		final String scenarioName = "empty";
+		final java.nio.file.Path outputPath = getOutputPath(scenarioName, previousVersion);
+		final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
+		final BucketState<String> recoveredState = readBucketState(scenarioName, previousVersion);
 
-		final RecoverableWriter writer = fs.createRecoverableWriter();
-		final RecoverableFsDataOutputStream stream = writer.open(testBucket);
-		stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
+		final Bucket<String, String> bucket = restoreBucket(0, recoveredState);
 
-		final RecoverableWriter.ResumeRecoverable current = stream.persist();
+		Assert.assertEquals(testBucketPath, bucket.getBucketPath());
+		Assert.assertNull(bucket.getInProgressPart());
+		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+	}
 
-		final BucketState<String> bucketState = new BucketState<>(
-				"test", testBucket, Long.MAX_VALUE, current, new HashMap<>());
+	@Test
+	@Ignore
+	public void prepareDeserializationOnlyInProgress() throws IOException {
+
+		final String scenarioName = "only-in-progress";
+		final java.nio.file.Path scenarioPath = getResourcePath(scenarioName, CURRENT_VERSION);
+		FileUtils.deleteDirectory(scenarioPath.toFile());
+		Files.createDirectories(scenarioPath);
 
-		final SimpleVersionedSerializer<BucketState<String>> serializer =
-				new BucketStateSerializer<>(
-						writer.getResumeRecoverableSerializer(),
-						writer.getCommitRecoverableSerializer(),
-						SimpleVersionedStringSerializer.INSTANCE
-				);
+		final java.nio.file.Path outputPath = getOutputPath(scenarioName, CURRENT_VERSION);
+		final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
 
-		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
+		final Bucket<String, String> bucket =
+			createNewBucket(testBucketPath);
 
-		// to simulate that everything is over for file.
-		stream.close();
+		bucket.write(IN_PROGRESS_CONTENT, System.currentTimeMillis());
 
-		final BucketState<String> recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+		final BucketState<String> bucketState = bucket.onReceptionOfCheckpoint(0);
 
-		Assert.assertEquals(testBucket, recoveredState.getBucketPath());
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(
+			bucketStateSerializer(), bucketState);
 
-		FileStatus[] statuses = fs.listStatus(testBucket.getParent());
-		Assert.assertEquals(1L, statuses.length);
-		Assert.assertTrue(
-				statuses[0].getPath().getPath().startsWith(
-						(new Path(testBucket.getParent(), ".test.inprogress")).getPath())
-		);
+		Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), bytes);
 	}
 
 	@Test
-	public void testSerializationFull() throws IOException {
-		final int noOfTasks = 5;
+	public void testSerializationOnlyInProgress() throws IOException {
 
-		final File testFolder = tempFolder.newFolder();
-		final FileSystem fs = FileSystem.get(testFolder.toURI());
-		final RecoverableWriter writer = fs.createRecoverableWriter();
+		final String scenarioName = "only-in-progress";
+		final java.nio.file.Path outputPath = getOutputPath(scenarioName, previousVersion);
 
-		final Path bucketPath = new Path(testFolder.getPath());
+		final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
 
-		// pending for checkpoints
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> commitRecoverables = new HashMap<>();
-		for (int i = 0; i < noOfTasks; i++) {
-			final List<RecoverableWriter.CommitRecoverable> recoverables = new ArrayList<>();
-			for (int j = 0; j < 2 + i; j++) {
-				final Path part = new Path(bucketPath, "part-" + i + '-' + j);
-
-				final RecoverableFsDataOutputStream stream = writer.open(part);
-				stream.write((PENDING_CONTENT + '-' + j).getBytes(Charset.forName("UTF-8")));
-				recoverables.add(stream.closeForCommit().getRecoverable());
-			}
-			commitRecoverables.put((long) i, recoverables);
-		}
+		final BucketState<String> recoveredState = readBucketState(scenarioName, previousVersion);
 
-		// in-progress
-		final Path testBucket = new Path(bucketPath, "test-2");
-		final RecoverableFsDataOutputStream stream = writer.open(testBucket);
-		stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
+		final Bucket<String, String> bucket = restoreBucket(0, recoveredState);
 
-		final RecoverableWriter.ResumeRecoverable current = stream.persist();
+		Assert.assertEquals(testBucketPath, bucket.getBucketPath());
 
-		final BucketState<String> bucketState = new BucketState<>(
-				"test-2", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
-		final SimpleVersionedSerializer<BucketState<String>> serializer =
-				new BucketStateSerializer<>(
-						writer.getResumeRecoverableSerializer(),
-						writer.getCommitRecoverableSerializer(),
-						SimpleVersionedStringSerializer.INSTANCE
-				);
-		stream.close();
+		//check restore the correct in progress file writer
+		Assert.assertEquals(8, bucket.getInProgressPart().getSize());
 
-		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
+		long numFiles = Files.list(Paths.get(testBucketPath.toString()))
+			.map(file -> {
+				assertThat(
+					file.getFileName().toString(),
+					startsWith(".part-0-0.inprogress"));
+				return 1;
+			})
+			.count();
 
-		final BucketState<String> recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+		assertThat(numFiles, is(1L));
+	}
 
-		Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
+	@Test
+	@Ignore
+	public void prepareDeserializationFull() throws IOException {
+		prepareDeserializationFull(true, "full");
+	}
 
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
-		Assert.assertEquals(5L, recoveredRecoverables.size());
+	@Test
+	public void testSerializationFull() throws IOException {
+		testDeserializationFull(true, "full");
+	}
 
-		// recover and commit
-		for (Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry: recoveredRecoverables.entrySet()) {
-			for (RecoverableWriter.CommitRecoverable recoverable: entry.getValue()) {
-				writer.recoverForCommit(recoverable).commit();
-			}
-		}
+	@Test
+	@Ignore
+	public void prepareDeserializationNullInProgress() throws IOException {
+		prepareDeserializationFull(false, "full-no-in-progress");
+	}
 
-		FileStatus[] filestatuses = fs.listStatus(bucketPath);
-		Set<String> paths = new HashSet<>(filestatuses.length);
-		for (FileStatus filestatus : filestatuses) {
-			paths.add(filestatus.getPath().getPath());
-		}
+	@Test
+	public void testSerializationNullInProgress() throws IOException {
+		testDeserializationFull(false, "full-no-in-progress");
+	}
 
-		for (int i = 0; i < noOfTasks; i++) {
-			for (int j = 0; j < 2 + i; j++) {
-				final String part = new Path(bucketPath, "part-" + i + '-' + j).getPath();
-				Assert.assertTrue(paths.contains(part));
-				paths.remove(part);
-			}
-		}
+	private static void prepareDeserializationFull(final boolean withInProgress, final String scenarioName) throws IOException {
 
-		// only the in-progress must be left
-		Assert.assertEquals(1L, paths.size());
+		final java.nio.file.Path scenarioPath = getResourcePath(scenarioName, CURRENT_VERSION);
+		FileUtils.deleteDirectory(Paths.get(scenarioPath.toString() + "-template").toFile());
+		Files.createDirectories(scenarioPath);
 
-		// verify that the in-progress file is still there
-		Assert.assertTrue(paths.iterator().next().startsWith(
-				(new Path(testBucket.getParent(), ".test-2.inprogress").getPath())));
-	}
+		final int noOfPendingCheckpoints = 5;
 
-	@Test
-	public void testSerializationNullInProgress() throws IOException {
-		final int noOfTasks = 5;
+		final java.nio.file.Path outputPath = getOutputPath(scenarioName, CURRENT_VERSION);
 
-		final File testFolder = tempFolder.newFolder();
-		final FileSystem fs = FileSystem.get(testFolder.toURI());
-		final RecoverableWriter writer = fs.createRecoverableWriter();
+		final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
 
-		final Path bucketPath = new Path(testFolder.getPath());
+		final Bucket<String, String> bucket = createNewBucket(testBucketPath);
 
+		BucketState<String> bucketState = null;
 		// pending for checkpoints
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> commitRecoverables = new HashMap<>();
-		for (int i = 0; i < noOfTasks; i++) {
-			final List<RecoverableWriter.CommitRecoverable> recoverables = new ArrayList<>();
-			for (int j = 0; j < 2 + i; j++) {
-				final Path part = new Path(bucketPath, "test-" + i + '-' + j);
-
-				final RecoverableFsDataOutputStream stream = writer.open(part);
-				stream.write((PENDING_CONTENT + '-' + j).getBytes(Charset.forName("UTF-8")));
-				recoverables.add(stream.closeForCommit().getRecoverable());
-			}
-			commitRecoverables.put((long) i, recoverables);
+		for (int i = 0; i < noOfPendingCheckpoints; i++) {
+			// write 10 bytes to the in progress file
+			bucket.write(PENDING_CONTENT, System.currentTimeMillis());
+			bucket.write(PENDING_CONTENT, System.currentTimeMillis());
+			// every checkpoint would produce a pending file
+			bucketState = bucket.onReceptionOfCheckpoint(i);
+		}
+
+		if (withInProgress) {
+			// create a in progress file
+			bucket.write(IN_PROGRESS_CONTENT, System.currentTimeMillis());
+
+			// 5 pending files and 1 in progress file
+			bucketState = bucket.onReceptionOfCheckpoint(noOfPendingCheckpoints);
 		}
 
-		final RecoverableWriter.ResumeRecoverable current = null;
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer(), bucketState);
 
-		final BucketState<String> bucketState = new BucketState<>(
-				"", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
+		Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), bytes);
 
-		final SimpleVersionedSerializer<BucketState<String>> serializer = new BucketStateSerializer<>(
-				writer.getResumeRecoverableSerializer(),
-				writer.getCommitRecoverableSerializer(),
-				SimpleVersionedStringSerializer.INSTANCE
-		);
+		// copy the scenario file to a template directory.
+		// it is because that the test `testSerializationFull` would change the in progress file to pending files.
+		moveToTemplateDirectory(scenarioPath);
+	}
 
-		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
+	private void testDeserializationFull(final boolean withInProgress, final String scenarioName) throws IOException {
 
-		final BucketState<String> recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+		try {
+			final java.nio.file.Path outputPath = getOutputPath(scenarioName, previousVersion);
+			final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
+			// restore the state
+			final BucketState<String> recoveredState = readBucketStateFromTemplate(scenarioName, previousVersion);
+			final int noOfPendingCheckpoints = 5;
 
-		Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
-		Assert.assertNull(recoveredState.getInProgressResumableFile());
+			// there are 5 checkpoint does not complete.
+			final Map<Long, List<RecoverableWriter.CommitRecoverable>>
+				pendingFileRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
+			Assert.assertEquals(5L, pendingFileRecoverables.size());
 
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
-		Assert.assertEquals(5L, recoveredRecoverables.size());
+			final Set<String> beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID))
+				.map(file -> file.getFileName().toString())
+				.collect(Collectors.toSet());
 
-		// recover and commit
-		for (Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry: recoveredRecoverables.entrySet()) {
-			for (RecoverableWriter.CommitRecoverable recoverable: entry.getValue()) {
-				writer.recoverForCommit(recoverable).commit();
+			// before retsoring all file has "inprogress"
+			for (int i = 0; i < noOfPendingCheckpoints; i++) {
+				final String part = ".part-0-" + i + ".inprogress";
+				assertThat(beforeRestorePaths, hasItem(startsWith(part)));
 			}
-		}
 
-		FileStatus[] filestatuses = fs.listStatus(bucketPath);
-		Set<String> paths = new HashSet<>(filestatuses.length);
-		for (FileStatus filestatus : filestatuses) {
-			paths.add(filestatus.getPath().getPath());
-		}
+			// recover and commit
+			final Bucket bucket = restoreBucket(noOfPendingCheckpoints + 1, recoveredState);
+			Assert.assertEquals(testBucketPath, bucket.getBucketPath());
+			Assert.assertEquals(0, bucket.getPendingPartsPerCheckpoint().size());
+
+			final Set<String> afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID))
+				.map(file -> file.getFileName().toString())
+				.collect(Collectors.toSet());
+
+			// after restoring all pending files are comitted.
+			// there is no "inporgress" in file name for the committed files.
+			for (int i = 0; i < noOfPendingCheckpoints; i++) {
+				final String part = "part-0-" + i;
+				assertThat(afterRestorePaths, hasItem(part));
+				afterRestorePaths.remove(part);
+			}
+
+			if (withInProgress) {
+				// only the in-progress must be left
+				assertThat(afterRestorePaths, iterableWithSize(1));
 
-		for (int i = 0; i < noOfTasks; i++) {
-			for (int j = 0; j < 2 + i; j++) {
-				final String part = new Path(bucketPath, "test-" + i + '-' + j).getPath();
-				Assert.assertTrue(paths.contains(part));
-				paths.remove(part);
+				// verify that the in-progress file is still there
+				assertThat(afterRestorePaths, hasItem(startsWith(".part-0-" + noOfPendingCheckpoints + ".inprogress")));
+			} else {
+				assertThat(afterRestorePaths, empty());
 			}
+		} finally {
+			FileUtils.deleteDirectory(getResourcePath(scenarioName, previousVersion).toFile());
 		}
+	}
+
+	private static Bucket<String, String> createNewBucket(final Path bucketPath) throws IOException {
+		return Bucket.getNew(
+			FileSystem.getLocalFileSystem().createRecoverableWriter(),
+			0,
+			BUCKET_ID,
+			bucketPath,
+			0,
+			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+			DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+			OutputFileConfig.builder().build());
+	}
+
+	private static Bucket<String, String> restoreBucket(final int initialPartCounter, final BucketState<String> bucketState) throws IOException {
+		return Bucket.restore(
+			FileSystem.getLocalFileSystem().createRecoverableWriter(),
+			0,
+			initialPartCounter,
+			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+			DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+			bucketState,
+			OutputFileConfig.builder().build());
+	}
+
+	private static BucketState<String> readBucketState(final String scenarioName, final int version) throws IOException {
+		byte[] bytes = Files.readAllBytes(getSnapshotPath(scenarioName, version));
+		return SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), bytes);
+	}
+
+	private static BucketState<String> readBucketStateFromTemplate(final String scenarioName, final int version) throws IOException {
+		final java.nio.file.Path scenarioPath =  getResourcePath(scenarioName, version);
+
+		// clear the scenario files first
+		FileUtils.deleteDirectory(scenarioPath.toFile());
+
+		// prepare the scenario files
+		FileUtils.copy(new Path(scenarioPath.toString() + "-template"), new Path(scenarioPath.toString()), false);
+
+		return readBucketState(scenarioName, version);
+	}
+
+	private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
+		RecoverableWriter recoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
+		return new BucketStateSerializer<>(
+			recoverableWriter.getResumeRecoverableSerializer(),
+			recoverableWriter.getCommitRecoverableSerializer(),
+			SimpleVersionedStringSerializer.INSTANCE);
+	}
 
-		// only the in-progress must be left
-		Assert.assertTrue(paths.isEmpty());
+	private static void moveToTemplateDirectory(java.nio.file.Path scenarioPath) throws IOException {
+		FileUtils.copy(new Path(scenarioPath.toString()), new Path(scenarioPath.toString() + "-template"), false);
+		FileUtils.deleteDirectory(scenarioPath.toFile());
 	}
 }
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v1/snapshot b/flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
new file mode 100644
index 0000000..9700f3d
Binary files /dev/null and b/flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v1/snapshot differ
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
new file mode 100644
index 0000000..5d6644c
Binary files /dev/null and b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot differ
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64
new file mode 100644
index 0000000..631ee76
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64
@@ -0,0 +1 @@
+writing
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
new file mode 100644
index 0000000..dd313db
Binary files /dev/null and b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot differ
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0
new file mode 100644
index 0000000..631ee76
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0
@@ -0,0 +1 @@
+writing
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot b/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot
new file mode 100644
index 0000000..6e98be9
Binary files /dev/null and b/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot differ


[flink] 04/04: [FLINK-17593] Update BucketStateSerializerTest for v2

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4349e93eec94d9b89c21f473c770bc03adfd652a
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Mon May 18 13:06:11 2020 +0200

    [FLINK-17593] Update BucketStateSerializerTest for v2
---
 .../sink/filesystem/BucketStateSerializerTest.java       |   4 ++--
 .../bucket-state-migration-test/empty-v2/snapshot        | Bin 0 -> 128 bytes
 ...t-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec |   2 ++
 ...t-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec |   2 ++
 ...t-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b |   2 ++
 ...t-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2 |   2 ++
 ...t-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524 |   2 ++
 .../full-no-in-progress-v2-template/snapshot             | Bin 0 -> 1597 bytes
 ...t-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022 |   2 ++
 ...t-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5 |   2 ++
 ...t-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628 |   2 ++
 ...t-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c |   2 ++
 ...t-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45 |   2 ++
 ...t-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a |   1 +
 .../full-v2-template/snapshot                            | Bin 0 -> 1685 bytes
 ...t-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef |   1 +
 .../only-in-progress-v2/snapshot                         | Bin 0 -> 416 bytes
 17 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index cbd18c4..bc6c1e5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -61,11 +61,11 @@ import static org.junit.Assert.assertThat;
 @RunWith(Parameterized.class)
 public class BucketStateSerializerTest {
 
-	private static final int CURRENT_VERSION = 1;
+	private static final int CURRENT_VERSION = 2;
 
 	@Parameterized.Parameters(name = "Previous Version = {0}")
 	public static Collection<Integer> previousVersions() {
-		return Arrays.asList(1);
+		return Arrays.asList(1, 2);
 	}
 
 	@Parameterized.Parameter
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v2/snapshot b/flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
new file mode 100644
index 0000000..9e84e8d
Binary files /dev/null and b/flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v2/snapshot differ
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot
new file mode 100644
index 0000000..4e0eaa4
Binary files /dev/null and b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot differ
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45 b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a
new file mode 100644
index 0000000..631ee76
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a
@@ -0,0 +1 @@
+writing
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot
new file mode 100644
index 0000000..9d9c3fc
Binary files /dev/null and b/flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot differ
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef b/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef
new file mode 100644
index 0000000..631ee76
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef
@@ -0,0 +1 @@
+writing
diff --git a/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot b/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot
new file mode 100644
index 0000000..d21f7c4
Binary files /dev/null and b/flink-streaming-java/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot differ


[flink] 03/04: [FLINK-17593][Connectors/FileSystem] Support arbitrary recovery mechanism for PartFileWriter

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4d684dfa1963ec976f8b22afe033fbef8548d2d0
Author: GuoWei Ma <gu...@gmail.com>
AuthorDate: Wed May 13 21:15:03 2020 +0800

    [FLINK-17593][Connectors/FileSystem] Support arbitrary recovery mechanism for PartFileWriter
    
    This change includes two things:
    
    1. Make the PartFileWriter generic and decouple the PartFileWriter and
    RecoverableStream. According to different pre-commit / commit methods,
    this change allows us to extend different types of PartFileWriter.
    
    2. Make the Bucket/Buckets depends on the PartFileFactory instead of
    RecoverableWriter.
---
 .../apache/flink/core/fs/RecoverableWriter.java    |   6 +-
 .../core/fs/local/LocalRecoverableWriter.java      |   2 +-
 .../runtime/fs/hdfs/HadoopRecoverableWriter.java   |   2 +-
 .../sink/filesystem/AbstractPartFileWriter.java    |  58 ++++
 .../api/functions/sink/filesystem/Bucket.java      | 166 +++++-------
 .../functions/sink/filesystem/BucketFactory.java   |   7 +-
 .../api/functions/sink/filesystem/BucketState.java |  32 +--
 .../sink/filesystem/BucketStateSerializer.java     | 146 ++++++----
 .../functions/sink/filesystem/BucketWriter.java    | 109 ++++++++
 .../api/functions/sink/filesystem/Buckets.java     |  37 +--
 .../sink/filesystem/BulkBucketWriter.java          |  72 +++++
 .../functions/sink/filesystem/BulkPartWriter.java  |  56 +---
 .../sink/filesystem/DefaultBucketFactoryImpl.java  |  13 +-
 .../sink/filesystem/InProgressFileWriter.java      |  70 +++++
 .../OutputStreamBasedPartFileWriter.java           | 296 +++++++++++++++++++++
 .../functions/sink/filesystem/PartFileWriter.java  | 141 ----------
 .../sink/filesystem/RowWiseBucketWriter.java       |  68 +++++
 .../sink/filesystem/RowWisePartWriter.java         |  50 +---
 .../sink/filesystem/StreamingFileSink.java         |   4 +-
 .../sink/filesystem/WriterProperties.java          |  67 +++++
 .../sink/filesystem/BucketAssignerITCases.java     |   3 +-
 .../sink/filesystem/BucketStateSerializerTest.java |  35 +--
 .../api/functions/sink/filesystem/BucketTest.java  |  58 ++--
 .../api/functions/sink/filesystem/BucketsTest.java |  19 +-
 .../sink/filesystem/RollingPolicyTest.java         |   3 +-
 .../api/functions/sink/filesystem/TestUtils.java   |   7 +-
 .../filesystem/utils/NoOpRecoverableWriter.java    |   2 +-
 27 files changed, 1033 insertions(+), 496 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
index 7d54b11..b92da88 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -138,10 +138,8 @@ public interface RecoverableWriter {
 	 * recover from a (potential) failure. These can be temporary files that were written
 	 * to the filesystem or objects that were uploaded to S3.
 	 *
-	 * <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already
-	 * been cleaned up and the resources have been freed. But the contract is that it will throw
-	 * an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter}
-	 * whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}.
+	 * <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
+	 *  happen for any reason.
 	 *
 	 * @param resumable The {@link ResumeRecoverable} whose state we want to clean-up.
 	 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
index bae7314..2a97b85 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -77,7 +77,7 @@ public class LocalRecoverableWriter implements RecoverableWriter {
 
 	@Override
 	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
-		throw new UnsupportedOperationException();
+		return false;
 	}
 
 	@Override
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index d325f2c..91d76c6 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -95,7 +95,7 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
 
 	@Override
 	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
-		throw new UnsupportedOperationException();
+		return false;
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java
new file mode 100644
index 0000000..0350a8f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+/**
+ * An abstract writer for the currently open part file in a specific {@link Bucket}.
+ * @param <IN> the element type.
+ * @param <BucketID> the bucket id type.
+ */
+public abstract class AbstractPartFileWriter<IN, BucketID> implements InProgressFileWriter<IN, BucketID> {
+
+	private final BucketID bucketID;
+
+	private final long creationTime;
+
+	private long lastUpdateTime;
+
+	public AbstractPartFileWriter(final BucketID bucketID, final long createTime) {
+		this.bucketID = bucketID;
+		this.creationTime = createTime;
+		this.lastUpdateTime = createTime;
+	}
+
+	@Override
+	public BucketID getBucketId() {
+		return bucketID;
+	}
+
+	@Override
+	public long getCreationTime() {
+		return creationTime;
+	}
+
+	@Override
+	public long getLastUpdateTime() {
+		return lastUpdateTime;
+	}
+
+	void markWrite(long now) {
+		this.lastUpdateTime = now;
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index e7abd37..5e9a72b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -21,10 +21,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
-import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,48 +56,44 @@ public class Bucket<IN, BucketID> {
 
 	private final int subtaskIndex;
 
-	private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;
-
-	private final RecoverableWriter fsWriter;
+	private final BucketWriter<IN, BucketID> bucketWriter;
 
 	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-	private final NavigableMap<Long, ResumeRecoverable> resumablesPerCheckpoint;
+	private final NavigableMap<Long, InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverablesPerCheckpoint;
 
-	private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
+	private final NavigableMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
 
 	private final OutputFileConfig outputFileConfig;
 
 	private long partCounter;
 
 	@Nullable
-	private PartFileWriter<IN, BucketID> inProgressPart;
+	private InProgressFileWriter<IN, BucketID> inProgressPart;
 
-	private List<CommitRecoverable> pendingPartsForCurrentCheckpoint;
+	private List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverablesForCurrentCheckpoint;
 
 	/**
 	 * Constructor to create a new empty bucket.
 	 */
 	private Bucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final OutputFileConfig outputFileConfig) {
-		this.fsWriter = checkNotNull(fsWriter);
 		this.subtaskIndex = subtaskIndex;
 		this.bucketId = checkNotNull(bucketId);
 		this.bucketPath = checkNotNull(bucketPath);
 		this.partCounter = initialPartCounter;
-		this.partFileFactory = checkNotNull(partFileFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
 		this.rollingPolicy = checkNotNull(rollingPolicy);
 
-		this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
-		this.pendingPartsPerCheckpoint = new TreeMap<>();
-		this.resumablesPerCheckpoint = new TreeMap<>();
+		this.pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
+		this.pendingFileRecoverablesPerCheckpoint = new TreeMap<>();
+		this.inProgressFileRecoverablesPerCheckpoint = new TreeMap<>();
 
 		this.outputFileConfig = checkNotNull(outputFileConfig);
 	}
@@ -110,16 +102,14 @@ public class Bucket<IN, BucketID> {
 	 * Constructor to restore a bucket from checkpointed state.
 	 */
 	private Bucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			final BucketWriter<IN, BucketID> partFileFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
 			final OutputFileConfig outputFileConfig) throws IOException {
 
 		this(
-				fsWriter,
 				subtaskIndex,
 				bucketState.getBucketId(),
 				bucketState.getBucketPath(),
@@ -133,31 +123,29 @@ public class Bucket<IN, BucketID> {
 	}
 
 	private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
-		if (!state.hasInProgressResumableFile()) {
+		if (!state.hasInProgressFileRecoverable()) {
 			return;
 		}
 
 		// we try to resume the previous in-progress file
-		final ResumeRecoverable resumable = state.getInProgressResumableFile();
+		final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
 
-		if (fsWriter.supportsResume()) {
-			final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
-			inProgressPart = partFileFactory.resumeFrom(
-					bucketId, stream, resumable, state.getInProgressFileCreationTime());
+		if (bucketWriter.getProperties().supportsResume()) {
+			inProgressPart = bucketWriter.resumeInProgressFileFrom(
+					bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
 		} else {
 			// if the writer does not support resume, then we close the
 			// in-progress part and commit it, as done in the case of pending files.
-
-			fsWriter.recoverForCommit(resumable).commitAfterRecovery();
+			bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
 		}
 	}
 
 	private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
 
 		// we commit pending files for checkpoints that precess the last successful one, from which we are recovering
-		for (List<CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
-			for (CommitRecoverable committable: committables) {
-				fsWriter.recoverForCommit(committable).commitAfterRecovery();
+		for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables: state.getPendingFileRecoverablesPerCheckpoint().values()) {
+			for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable: pendingFileRecoverables) {
+				bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
 			}
 		}
 	}
@@ -175,7 +163,7 @@ public class Bucket<IN, BucketID> {
 	}
 
 	boolean isActive() {
-		return inProgressPart != null || !pendingPartsForCurrentCheckpoint.isEmpty() || !pendingPartsPerCheckpoint.isEmpty();
+		return inProgressPart != null || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty() || !pendingFileRecoverablesPerCheckpoint.isEmpty();
 	}
 
 	void merge(final Bucket<IN, BucketID> bucket) throws IOException {
@@ -184,16 +172,16 @@ public class Bucket<IN, BucketID> {
 
 		// There should be no pending files in the "to-merge" states.
 		// The reason is that:
-		// 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()).
-		//    So a snapshot, including the one we are recovering from, will never contain such files.
-		// 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
+		// 1) the pendingFileRecoverablesForCurrentCheckpoint is emptied whenever we take a Recoverable (see prepareBucketForCheckpointing()).
+		//    So a Recoverable, including the one we are recovering from, will never contain such files.
+		// 2) the files in pendingFileRecoverablesPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
 
-		checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
-		checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+		checkState(bucket.pendingFileRecoverablesForCurrentCheckpoint.isEmpty());
+		checkState(bucket.pendingFileRecoverablesPerCheckpoint.isEmpty());
 
-		CommitRecoverable committable = bucket.closePartFile();
-		if (committable != null) {
-			pendingPartsForCurrentCheckpoint.add(committable);
+		InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = bucket.closePartFile();
+		if (pendingFileRecoverable != null) {
+			pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -218,8 +206,7 @@ public class Bucket<IN, BucketID> {
 		closePartFile();
 
 		final Path partFilePath = assembleNewPartPath();
-		final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
-		inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
+		inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
@@ -233,14 +220,14 @@ public class Bucket<IN, BucketID> {
 		return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
 	}
 
-	private CommitRecoverable closePartFile() throws IOException {
-		CommitRecoverable committable = null;
+	private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
+		InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
 		if (inProgressPart != null) {
-			committable = inProgressPart.closeForCommit();
-			pendingPartsForCurrentCheckpoint.add(committable);
+			pendingFileRecoverable = inProgressPart.closeForCommit();
+			pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
 			inProgressPart = null;
 		}
-		return committable;
+		return pendingFileRecoverable;
 	}
 
 	void disposePartFile() {
@@ -252,24 +239,16 @@ public class Bucket<IN, BucketID> {
 	BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
 		prepareBucketForCheckpointing(checkpointId);
 
-		ResumeRecoverable inProgressResumable = null;
+		InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
 		long inProgressFileCreationTime = Long.MAX_VALUE;
 
 		if (inProgressPart != null) {
-			inProgressResumable = inProgressPart.persist();
+			inProgressFileRecoverable = inProgressPart.persist();
 			inProgressFileCreationTime = inProgressPart.getCreationTime();
-
-			// the following is an optimization so that writers that do not
-			// require cleanup, they do not have to keep track of resumables
-			// and later iterate over the active buckets.
-			// (see onSuccessfulCompletionOfCheckpoint())
-
-			if (fsWriter.requiresCleanupOfRecoverableState()) {
-				this.resumablesPerCheckpoint.put(checkpointId, inProgressResumable);
-			}
+			this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
 		}
 
-		return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
+		return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);
 	}
 
 	private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
@@ -280,49 +259,46 @@ public class Bucket<IN, BucketID> {
 			closePartFile();
 		}
 
-		if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
-			pendingPartsPerCheckpoint.put(checkpointId, pendingPartsForCurrentCheckpoint);
-			pendingPartsForCurrentCheckpoint = new ArrayList<>();
+		if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
+			pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
+			pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
 		}
 	}
 
 	void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
-		checkNotNull(fsWriter);
+		checkNotNull(bucketWriter);
 
-		Iterator<Map.Entry<Long, List<CommitRecoverable>>> it =
-				pendingPartsPerCheckpoint.headMap(checkpointId, true)
+		Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
+				pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
 						.entrySet().iterator();
 
 		while (it.hasNext()) {
-			Map.Entry<Long, List<CommitRecoverable>> entry = it.next();
+			Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
 
-			for (CommitRecoverable committable : entry.getValue()) {
-				fsWriter.recoverForCommit(committable).commit();
+			for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
+				bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
 			}
 			it.remove();
 		}
 
-		cleanupOutdatedResumables(checkpointId);
+		cleanupInProgressFileRecoverables(checkpointId);
 	}
 
-	private void cleanupOutdatedResumables(long checkpointId) throws IOException {
-		Iterator<Map.Entry<Long, ResumeRecoverable>> it =
-				resumablesPerCheckpoint.headMap(checkpointId, false)
+	private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
+		Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
+				inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
 						.entrySet().iterator();
 
 		while (it.hasNext()) {
-			final ResumeRecoverable recoverable = it.next().getValue();
+			final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();
 
-			// this check is redundant, as we only put entries in the resumablesPerCheckpoint map
-			// list when the requiresCleanupOfRecoverableState() returns true, but having it makes
+			// this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
+			// list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
 			// the code more readable.
 
-			if (fsWriter.requiresCleanupOfRecoverableState()) {
-				final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
-
-				if (LOG.isDebugEnabled() && successfullyDeleted) {
-					LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
-				}
+			final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
+			if (LOG.isDebugEnabled() && successfullyDeleted) {
+				LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
 			}
 			it.remove();
 		}
@@ -342,54 +318,51 @@ public class Bucket<IN, BucketID> {
 	// --------------------------- Testing Methods -----------------------------
 
 	@VisibleForTesting
-	Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() {
-		return pendingPartsPerCheckpoint;
+	Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
+		return pendingFileRecoverablesPerCheckpoint;
 	}
 
 	@Nullable
 	@VisibleForTesting
-	PartFileWriter<IN, BucketID> getInProgressPart() {
+	InProgressFileWriter<IN, BucketID> getInProgressPart() {
 		return inProgressPart;
 	}
 
 	@VisibleForTesting
-	List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
-		return pendingPartsForCurrentCheckpoint;
+	List<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverablesForCurrentCheckpoint() {
+		return pendingFileRecoverablesForCurrentCheckpoint;
 	}
 
 	// --------------------------- Static Factory Methods -----------------------------
 
 	/**
 	 * Creates a new empty {@code Bucket}.
-	 * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
 	 * @param subtaskIndex the index of the subtask creating the bucket.
 	 * @param bucketId the identifier of the bucket, as returned by the {@link BucketAssigner}.
 	 * @param bucketPath the path to where the part files for the bucket will be written to.
 	 * @param initialPartCounter the initial counter for the part files of the bucket.
-	 * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
+	 * @param bucketWriter the {@link BucketWriter} used to write part files in the bucket.
 	 * @param <IN> the type of input elements to the sink.
 	 * @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
 	 * @param outputFileConfig the part file configuration.
 	 * @return The new Bucket.
 	 */
 	static <IN, BucketID> Bucket<IN, BucketID> getNew(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final OutputFileConfig outputFileConfig) {
-		return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
+		return new Bucket<>(subtaskIndex, bucketId, bucketPath, initialPartCounter, bucketWriter, rollingPolicy, outputFileConfig);
 	}
 
 	/**
 	 * Restores a {@code Bucket} from the state included in the provided {@link BucketState}.
-	 * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
 	 * @param subtaskIndex the index of the subtask creating the bucket.
 	 * @param initialPartCounter the initial counter for the part files of the bucket.
-	 * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
+	 * @param bucketWriter the {@link BucketWriter} used to write part files in the bucket.
 	 * @param bucketState the initial state of the restored bucket.
 	 * @param <IN> the type of input elements to the sink.
 	 * @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
@@ -397,13 +370,12 @@ public class Bucket<IN, BucketID> {
 	 * @return The restored Bucket.
 	 */
 	static <IN, BucketID> Bucket<IN, BucketID> restore(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
 			final OutputFileConfig outputFileConfig) throws IOException {
-		return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
+		return new Bucket<>(subtaskIndex, initialPartCounter, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 260e82c..6423627 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -32,20 +31,18 @@ import java.io.Serializable;
 interface BucketFactory<IN, BucketID> extends Serializable {
 
 	Bucket<IN, BucketID> getNewBucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final OutputFileConfig outputFileConfig) throws IOException;
 
 	Bucket<IN, BucketID> restoreBucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
 			final OutputFileConfig outputFileConfig) throws IOException;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index 1829381..75c00b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -46,30 +46,30 @@ class BucketState<BucketID> {
 	private final long inProgressFileCreationTime;
 
 	/**
-	 * A {@link RecoverableWriter.ResumeRecoverable} for the currently open
+	 * A {@link InProgressFileWriter.InProgressFileRecoverable} for the currently open
 	 * part file, or null if there is no currently open part file.
 	 */
 	@Nullable
-	private final RecoverableWriter.ResumeRecoverable inProgressResumableFile;
+	private final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable;
 
 	/**
 	 * The {@link RecoverableWriter.CommitRecoverable files} pending to be
 	 * committed, organized by checkpoint id.
 	 */
-	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> committableFilesPerCheckpoint;
+	private final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
 
 	BucketState(
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long inProgressFileCreationTime,
-			@Nullable final RecoverableWriter.ResumeRecoverable inProgressResumableFile,
-			final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint
+			@Nullable final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable,
+			final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint
 	) {
 		this.bucketId = Preconditions.checkNotNull(bucketId);
 		this.bucketPath = Preconditions.checkNotNull(bucketPath);
 		this.inProgressFileCreationTime = inProgressFileCreationTime;
-		this.inProgressResumableFile = inProgressResumableFile;
-		this.committableFilesPerCheckpoint = Preconditions.checkNotNull(pendingCommittablesPerCheckpoint);
+		this.inProgressFileRecoverable = inProgressFileRecoverable;
+		this.pendingFileRecoverablesPerCheckpoint = Preconditions.checkNotNull(pendingFileRecoverablesPerCheckpoint);
 	}
 
 	BucketID getBucketId() {
@@ -84,17 +84,17 @@ class BucketState<BucketID> {
 		return inProgressFileCreationTime;
 	}
 
-	boolean hasInProgressResumableFile() {
-		return inProgressResumableFile != null;
+	boolean hasInProgressFileRecoverable() {
+		return inProgressFileRecoverable != null;
 	}
 
 	@Nullable
-	RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
-		return inProgressResumableFile;
+	InProgressFileWriter.InProgressFileRecoverable getInProgressFileRecoverable() {
+		return inProgressFileRecoverable;
 	}
 
-	Map<Long, List<RecoverableWriter.CommitRecoverable>> getCommittableFilesPerCheckpoint() {
-		return committableFilesPerCheckpoint;
+	Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
+		return pendingFileRecoverablesPerCheckpoint;
 	}
 
 	@Override
@@ -105,13 +105,13 @@ class BucketState<BucketID> {
 				.append("BucketState for bucketId=").append(bucketId)
 				.append(" and bucketPath=").append(bucketPath);
 
-		if (hasInProgressResumableFile()) {
+		if (hasInProgressFileRecoverable()) {
 			strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime);
 		}
 
-		if (!committableFilesPerCheckpoint.isEmpty()) {
+		if (!pendingFileRecoverablesPerCheckpoint.isEmpty()) {
 			strBuilder.append(", has pending files for checkpoints: {");
-			for (long checkpointId: committableFilesPerCheckpoint.keySet()) {
+			for (long checkpointId: pendingFileRecoverablesPerCheckpoint.keySet()) {
 				strBuilder.append(checkpointId).append(' ');
 			}
 			strBuilder.append('}');
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index 04de246..5863a03 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
@@ -45,119 +44,172 @@ class BucketStateSerializer<BucketID> implements SimpleVersionedSerializer<Bucke
 
 	private static final int MAGIC_NUMBER = 0x1e764b79;
 
-	private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
+	private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
 
-	private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
+	private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
 
 	private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
 
 	BucketStateSerializer(
-			final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
-			final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer,
+			final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
+			final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
 			final SimpleVersionedSerializer<BucketID> bucketIdSerializer
 	) {
-		this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
-		this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
+		this.inProgressFileRecoverableSerializer = Preconditions.checkNotNull(inProgressFileRecoverableSerializer);
+		this.pendingFileRecoverableSerializer = Preconditions.checkNotNull(pendingFileRecoverableSerializer);
 		this.bucketIdSerializer = Preconditions.checkNotNull(bucketIdSerializer);
 	}
 
 	@Override
 	public int getVersion() {
-		return 1;
+		return 2;
 	}
 
 	@Override
 	public byte[] serialize(BucketState<BucketID> state) throws IOException {
 		DataOutputSerializer out = new DataOutputSerializer(256);
 		out.writeInt(MAGIC_NUMBER);
-		serializeV1(state, out);
+		serializeV2(state, out);
 		return out.getCopyOfBuffer();
 	}
 
 	@Override
 	public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException {
+		final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
 		switch (version) {
 			case 1:
-				DataInputDeserializer in = new DataInputDeserializer(serialized);
 				validateMagicNumber(in);
 				return deserializeV1(in);
+			case 2:
+				validateMagicNumber(in);
+				return deserializeV2(in);
 			default:
 				throw new IOException("Unrecognized version or corrupt state: " + version);
 		}
 	}
 
-	@VisibleForTesting
-	void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException {
-		SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out);
-		out.writeUTF(state.getBucketPath().toString());
-		out.writeLong(state.getInProgressFileCreationTime());
+	private void serializeV2(BucketState<BucketID> state, DataOutputView dataOutputView) throws IOException {
+		SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), dataOutputView);
+		dataOutputView.writeUTF(state.getBucketPath().toString());
+		dataOutputView.writeLong(state.getInProgressFileCreationTime());
 
 		// put the current open part file
-		if (state.hasInProgressResumableFile()) {
-			final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
-			out.writeBoolean(true);
-			SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, resumable, out);
-		}
-		else {
-			out.writeBoolean(false);
+		if (state.hasInProgressFileRecoverable()) {
+			final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
+			dataOutputView.writeBoolean(true);
+			SimpleVersionedSerialization.writeVersionAndSerialize(inProgressFileRecoverableSerializer, inProgressFileRecoverable, dataOutputView);
+		} else {
+			dataOutputView.writeBoolean(false);
 		}
 
 		// put the map of pending files per checkpoint
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getCommittableFilesPerCheckpoint();
+		final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverables = state.getPendingFileRecoverablesPerCheckpoint();
 
-		// manually keep the version here to safe some bytes
-		out.writeInt(commitableSerializer.getVersion());
+		dataOutputView.writeInt(pendingFileRecoverableSerializer.getVersion());
 
-		out.writeInt(pendingCommitters.size());
-		for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
-			List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
+		dataOutputView.writeInt(pendingFileRecoverables.size());
 
-			out.writeLong(resumablesForCheckpoint.getKey());
-			out.writeInt(resumables.size());
+		for (Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesForCheckpoint : pendingFileRecoverables.entrySet()) {
+			final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableList = pendingFilesForCheckpoint.getValue();
 
-			for (RecoverableWriter.CommitRecoverable resumable : resumables) {
-				byte[] serialized = commitableSerializer.serialize(resumable);
-				out.writeInt(serialized.length);
-				out.write(serialized);
+			dataOutputView.writeLong(pendingFilesForCheckpoint.getKey());
+			dataOutputView.writeInt(pendingFileRecoverableList.size());
+
+			for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : pendingFileRecoverableList) {
+				byte[] serialized = pendingFileRecoverableSerializer.serialize(pendingFileRecoverable);
+				dataOutputView.writeInt(serialized.length);
+				dataOutputView.write(serialized);
 			}
 		}
 	}
 
-	@VisibleForTesting
-	BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
+	private BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
+
+		final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer = getCommitableSerializer();
+		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer = getResumableSerializer();
+
 		final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in);
 		final String bucketPathStr = in.readUTF();
 		final long creationTime = in.readLong();
 
 		// then get the current resumable stream
-		RecoverableWriter.ResumeRecoverable current = null;
+		InProgressFileWriter.InProgressFileRecoverable current = null;
 		if (in.readBoolean()) {
-			current = SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in);
+			current =
+				new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(
+					SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in));
 		}
 
 		final int committableVersion = in.readInt();
 		final int numCheckpoints = in.readInt();
-		final HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<>(numCheckpoints);
+		final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablePerCheckpoint = new HashMap<>(numCheckpoints);
 
 		for (int i = 0; i < numCheckpoints; i++) {
 			final long checkpointId = in.readLong();
 			final int noOfResumables = in.readInt();
 
-			final List<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
+			final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(noOfResumables);
 			for (int j = 0; j < noOfResumables; j++) {
 				final byte[] bytes = new byte[in.readInt()];
 				in.readFully(bytes);
-				resumables.add(commitableSerializer.deserialize(committableVersion, bytes));
+				pendingFileRecoverables.add(
+					new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(commitableSerializer.deserialize(committableVersion, bytes)));
 			}
-			resumablesPerCheckpoint.put(checkpointId, resumables);
+			pendingFileRecoverablePerCheckpoint.put(checkpointId, pendingFileRecoverables);
 		}
 
 		return new BucketState<>(
-				bucketId,
-				new Path(bucketPathStr),
-				creationTime,
-				current,
-				resumablesPerCheckpoint);
+			bucketId,
+			new Path(bucketPathStr),
+			creationTime,
+			current,
+			pendingFileRecoverablePerCheckpoint);
+	}
+
+	private BucketState<BucketID> deserializeV2(DataInputView dataInputView) throws IOException {
+		final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, dataInputView);
+		final String bucketPathStr = dataInputView.readUTF();
+		final long creationTime = dataInputView.readLong();
+
+		// then get the current resumable stream
+		InProgressFileWriter.InProgressFileRecoverable current = null;
+		if (dataInputView.readBoolean()) {
+			current = SimpleVersionedSerialization.readVersionAndDeSerialize(inProgressFileRecoverableSerializer, dataInputView);
+		}
+
+		final int pendingFileRecoverableSerializerVersion = dataInputView.readInt();
+		final int numCheckpoints = dataInputView.readInt();
+		final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint = new HashMap<>(numCheckpoints);
+
+		for (int i = 0; i < numCheckpoints; i++) {
+			final long checkpointId = dataInputView.readLong();
+			final int numOfPendingFileRecoverables = dataInputView.readInt();
+
+			final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(numOfPendingFileRecoverables);
+			for (int j = 0; j < numOfPendingFileRecoverables; j++) {
+				final byte[] bytes = new byte[dataInputView.readInt()];
+				dataInputView.readFully(bytes);
+				pendingFileRecoverables.add(pendingFileRecoverableSerializer.deserialize(pendingFileRecoverableSerializerVersion, bytes));
+			}
+			pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverables);
+		}
+
+		return new BucketState<>(bucketId, new Path(bucketPathStr), creationTime, current, pendingFileRecoverablesPerCheckpoint);
+	}
+
+	private SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumableSerializer() {
+		final OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer
+			outputStreamBasedInProgressFileRecoverableSerializer =
+			(OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer) inProgressFileRecoverableSerializer;
+		return outputStreamBasedInProgressFileRecoverableSerializer.getResumeSerializer();
+	}
+
+	private SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitableSerializer() {
+		final OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer
+			outputStreamBasedPendingFileRecoverableSerializer =
+			(OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer) pendingFileRecoverableSerializer;
+		return outputStreamBasedPendingFileRecoverableSerializer.getCommitSerializer();
 	}
 
 	private static void validateMagicNumber(DataInputView in) throws IOException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
new file mode 100644
index 0000000..ed3a0e2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * An interface for factories that create the different {@link InProgressFileWriter writers}.
+ */
+@Internal
+interface BucketWriter<IN, BucketID> {
+
+	/**
+	 * Used to create a new {@link InProgressFileWriter}.
+	 * @param bucketID the id of the bucket this writer is writing to.
+	 * @param path the path this writer will write to.
+	 * @param creationTime the creation time of the file.
+	 * @return the new {@link InProgressFileWriter}
+	 * @throws IOException Thrown if creating a writer fails.
+	 */
+	InProgressFileWriter<IN, BucketID> openNewInProgressFile(
+			final BucketID bucketID,
+			final Path path,
+			final long creationTime) throws IOException;
+
+	/**
+	 * Used to resume a {@link InProgressFileWriter} from a {@link InProgressFileWriter.InProgressFileRecoverable}.
+	 * @param bucketID the id of the bucket this writer is writing to.
+	 * @param inProgressFileSnapshot the state of the part file.
+	 * @param creationTime the creation time of the file.
+	 * @return the resumed {@link InProgressFileWriter}
+	 * @throws IOException Thrown if resuming a writer fails.
+	 */
+	InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
+			final BucketID bucketID,
+			final InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
+			final long creationTime) throws IOException;
+
+	/**
+	 * @return the property of the {@link BucketWriter}
+	 */
+	WriterProperties getProperties();
+
+	/**
+	 * Recovers a pending file for finalizing and committing.
+	 * @param pendingFileRecoverable The handle with the recovery information.
+	 * @return A pending file
+	 * @throws IOException Thrown if recovering a pending file fails.
+	 */
+	PendingFile recoverPendingFile(final InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;
+
+	/**
+	 * Frees up any resources that were previously occupied in order to be able to
+	 * recover from a (potential) failure.
+	 *
+	 * <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
+	 * happen for any reason.
+	 *
+	 * @param inProgressFileRecoverable the {@link InProgressFileWriter.InProgressFileRecoverable} whose state we want to clean-up.
+	 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+	 * (e.g. the file to be deleted was not there for any reason - already deleted or never created).
+	 * @throws IOException if an I/O error occurs
+	 */
+	boolean cleanupInProgressFileRecoverable(final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
+
+	/**
+	 * This represents the file that can not write any data to.
+	 */
+	interface PendingFile {
+		/**
+		 * Commits the pending file, making it visible. The file will contain the exact data
+		 * as when the pending file was created.
+		 *
+		 * @throws IOException Thrown if committing fails.
+		 */
+		void commit() throws IOException;
+
+		/**
+		 * Commits the pending file, making it visible. The file will contain the exact data
+		 * as when the pending file was created.
+		 *
+		 * <p>This method tolerates situations where the file was already committed and
+		 * will not raise an exception in that case. This is important for idempotent
+		 * commit retries as they need to happen after recovery.
+		 *
+		 * @throws IOException Thrown if committing fails.
+		 */
+		void commitAfterRecovery() throws IOException;
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index f055798..0c9b73f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -21,9 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.Preconditions;
@@ -61,7 +59,7 @@ public class Buckets<IN, BucketID> {
 
 	private final BucketAssigner<IN, BucketID> bucketAssigner;
 
-	private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
+	private final BucketWriter<IN, BucketID> bucketWriter;
 
 	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
@@ -78,8 +76,6 @@ public class Buckets<IN, BucketID> {
 
 	private long maxPartCounter;
 
-	private final RecoverableWriter fsWriter;
-
 	private final OutputFileConfig outputFileConfig;
 
 	// --------------------------- State Related Fields -----------------------------
@@ -92,23 +88,23 @@ public class Buckets<IN, BucketID> {
 	 * @param basePath The base path for our buckets.
 	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
 	 * @param bucketFactory The {@link BucketFactory} to be used to create buckets.
-	 * @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
 	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
 	 */
 	Buckets(
 			final Path basePath,
 			final BucketAssigner<IN, BucketID> bucketAssigner,
 			final BucketFactory<IN, BucketID> bucketFactory,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			@Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener,
 			final int subtaskIndex,
-			final OutputFileConfig outputFileConfig) throws IOException {
+			final OutputFileConfig outputFileConfig) {
 
 		this.basePath = Preconditions.checkNotNull(basePath);
 		this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
 		this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
-		this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory);
+		this.bucketWriter = Preconditions.checkNotNull(bucketWriter);
 		this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
 		this.bucketLifeCycleListener = bucketLifeCycleListener;
 		this.subtaskIndex = subtaskIndex;
@@ -118,19 +114,10 @@ public class Buckets<IN, BucketID> {
 		this.activeBuckets = new HashMap<>();
 		this.bucketerContext = new Buckets.BucketerContext();
 
-		try {
-			this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
-		} catch (IOException e) {
-			LOG.error("Unable to create filesystem for path: {}", basePath);
-			throw e;
-		}
-
 		this.bucketStateSerializer = new BucketStateSerializer<>(
-				fsWriter.getResumeRecoverableSerializer(),
-				fsWriter.getCommitRecoverableSerializer(),
-				bucketAssigner.getSerializer()
-		);
-
+			bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+			bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+			bucketAssigner.getSerializer());
 		this.maxPartCounter = 0L;
 	}
 
@@ -185,10 +172,9 @@ public class Buckets<IN, BucketID> {
 
 		final Bucket<IN, BucketID> restoredBucket = bucketFactory
 				.restoreBucket(
-						fsWriter,
 						subtaskIndex,
 						maxPartCounter,
-						partFileWriterFactory,
+						bucketWriter,
 						rollingPolicy,
 						recoveredState,
 						outputFileConfig
@@ -238,7 +224,7 @@ public class Buckets<IN, BucketID> {
 			final ListState<Long> partCounterStateContainer) throws Exception {
 
 		Preconditions.checkState(
-				fsWriter != null && bucketStateSerializer != null,
+			bucketWriter != null && bucketStateSerializer != null,
 				"sink has not been initialized");
 
 		LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
@@ -308,12 +294,11 @@ public class Buckets<IN, BucketID> {
 		if (bucket == null) {
 			final Path bucketPath = assembleBucketPath(bucketId);
 			bucket = bucketFactory.getNewBucket(
-					fsWriter,
 					subtaskIndex,
 					bucketId,
 					bucketPath,
 					maxPartCounter,
-					partFileWriterFactory,
+					bucketWriter,
 					rollingPolicy,
 					outputFileConfig);
 			activeBuckets.put(bucketId, bucket);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
new file mode 100644
index 0000000..0f3cb9c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates {@link BulkPartWriter BulkPartWriters}.
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
+ */
+@Internal
+class BulkBucketWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
+
+	private final BulkWriter.Factory<IN> writerFactory;
+
+	BulkBucketWriter(final RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException {
+		super(recoverableWriter);
+		this.writerFactory = writerFactory;
+	}
+
+	@Override
+	public InProgressFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final RecoverableWriter.ResumeRecoverable resumable,
+			final long creationTime) throws IOException {
+
+		Preconditions.checkNotNull(stream);
+		Preconditions.checkNotNull(resumable);
+
+		final BulkWriter<IN> writer = writerFactory.create(stream);
+		return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+	}
+
+	@Override
+	public InProgressFileWriter<IN, BucketID> openNew(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final Path path,
+			final long creationTime) throws IOException {
+
+		Preconditions.checkNotNull(stream);
+		Preconditions.checkNotNull(path);
+
+		final BulkWriter<IN> writer = writerFactory.create(stream);
+		return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index a44b0e8..b1b7864 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -20,23 +20,21 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 
 /**
- * A {@link PartFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
+ * A {@link InProgressFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
  * This also implements the {@link PartFileInfo}.
  */
 @Internal
-final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID>  {
 
 	private final BulkWriter<IN> writer;
 
-	private BulkPartWriter(
+	BulkPartWriter(
 			final BucketID bucketId,
 			final RecoverableFsDataOutputStream currentPartStream,
 			final BulkWriter<IN> writer,
@@ -46,62 +44,20 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
 	}
 
 	@Override
-	void write(IN element, long currentTime) throws IOException {
+	public void write(IN element, long currentTime) throws IOException {
 		writer.addElement(element);
 		markWrite(currentTime);
 	}
 
 	@Override
-	RecoverableWriter.ResumeRecoverable persist() {
+	public InProgressFileRecoverable persist() {
 		throw new UnsupportedOperationException("Bulk Part Writers do not support \"pause and resume\" operations.");
 	}
 
 	@Override
-	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+	public PendingFileRecoverable closeForCommit() throws IOException {
 		writer.flush();
 		writer.finish();
 		return super.closeForCommit();
 	}
-
-	/**
-	 * A factory that creates {@link BulkPartWriter BulkPartWriters}.
-	 * @param <IN> The type of input elements.
-	 * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
-	 */
-	static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
-
-		private final BulkWriter.Factory<IN> writerFactory;
-
-		Factory(BulkWriter.Factory<IN> writerFactory) {
-			this.writerFactory = writerFactory;
-		}
-
-		@Override
-		public PartFileWriter<IN, BucketID> resumeFrom(
-				final BucketID bucketId,
-				final RecoverableFsDataOutputStream stream,
-				final RecoverableWriter.ResumeRecoverable resumable,
-				final long creationTime) throws IOException {
-
-			Preconditions.checkNotNull(stream);
-			Preconditions.checkNotNull(resumable);
-
-			final BulkWriter<IN> writer = writerFactory.create(stream);
-			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
-		}
-
-		@Override
-		public PartFileWriter<IN, BucketID> openNew(
-				final BucketID bucketId,
-				final RecoverableFsDataOutputStream stream,
-				final Path path,
-				final long creationTime) throws IOException {
-
-			Preconditions.checkNotNull(stream);
-			Preconditions.checkNotNull(path);
-
-			final BulkWriter<IN> writer = writerFactory.create(stream);
-			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
-		}
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index 529b93a..bb20b97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 
 import java.io.IOException;
 
@@ -34,41 +33,37 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
 
 	@Override
 	public Bucket<IN, BucketID> getNewBucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final BucketID bucketId,
 			final Path bucketPath,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final OutputFileConfig outputFileConfig) {
 
 		return Bucket.getNew(
-				fsWriter,
 				subtaskIndex,
 				bucketId,
 				bucketPath,
 				initialPartCounter,
-				partFileWriterFactory,
+				bucketWriter,
 				rollingPolicy,
 				outputFileConfig);
 	}
 
 	@Override
 	public Bucket<IN, BucketID> restoreBucket(
-			final RecoverableWriter fsWriter,
 			final int subtaskIndex,
 			final long initialPartCounter,
-			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
 			final OutputFileConfig outputFileConfig) throws IOException {
 
 		return Bucket.restore(
-				fsWriter,
 				subtaskIndex,
 				initialPartCounter,
-				partFileWriterFactory,
+				bucketWriter,
 				rollingPolicy,
 				bucketState,
 				outputFileConfig);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
new file mode 100644
index 0000000..60798d1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+/**
+ * The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file.
+ */
+@Internal
+interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+
+	/**
+	 * Write a element to the part file.
+	 * @param element the element to be written.
+	 * @param currentTime the writing time.
+	 * @throws IOException Thrown if writing the element fails.
+	 */
+	void write(final IN element, final long currentTime) throws IOException;
+
+	/**
+	 * @return The state of the current part file.
+	 * @throws IOException Thrown if persisting the part file fails.
+	 */
+	InProgressFileRecoverable persist() throws IOException;
+
+
+	/**
+	 * @return The state of the pending part file. {@link Bucket} uses this to commit the pending file.
+	 * @throws IOException Thrown if an I/O error occurs.
+	 */
+	PendingFileRecoverable closeForCommit() throws IOException;
+
+	/**
+	 * Dispose the part file.
+	 */
+	void dispose();
+
+	// ------------------------------------------------------------------------
+
+
+	 /**
+	 * A handle can be used to recover in-progress file..
+	 */
+	interface InProgressFileRecoverable extends PendingFileRecoverable {}
+
+
+	/**
+	 * The handle can be used to recover pending file.
+	 */
+	interface PendingFileRecoverable {}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
new file mode 100644
index 0000000..2d8c423
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.IOUtils;
+
+import java.io.IOException;
+
+/**
+ * The base class for all the part file writer that use {@link org.apache.flink.core.fs.RecoverableFsDataOutputStream}.
+ * @param <IN> the element type
+ * @param <BucketID> the bucket type
+ */
+public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
+
+	final RecoverableFsDataOutputStream currentPartStream;
+
+	OutputStreamBasedPartFileWriter(
+		final BucketID bucketID,
+		final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
+		final long createTime) {
+		super(bucketID, createTime);
+		this.currentPartStream = recoverableFsDataOutputStream;
+	}
+
+	@Override
+	public InProgressFileRecoverable persist() throws IOException {
+		return new OutputStreamBasedInProgressFileRecoverable(currentPartStream.persist());
+	}
+
+	@Override
+	public PendingFileRecoverable closeForCommit() throws IOException {
+		return new OutputStreamBasedPendingFileRecoverable(currentPartStream.closeForCommit().getRecoverable());
+	}
+
+	@Override
+	public void dispose() {
+		// we can suppress exceptions here, because we do not rely on close() to
+		// flush or persist any data
+		IOUtils.closeQuietly(currentPartStream);
+	}
+
+	@Override
+	public long getSize() throws IOException {
+		return currentPartStream.getPos();
+	}
+
+	abstract static class OutputStreamBasedBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
+
+		private final RecoverableWriter recoverableWriter;
+
+		OutputStreamBasedBucketWriter(final RecoverableWriter recoverableWriter) {
+			this.recoverableWriter = recoverableWriter;
+		}
+
+		@Override
+		public InProgressFileWriter<IN, BucketID> openNewInProgressFile(final BucketID bucketID, final Path path, final long creationTime) throws IOException {
+			return openNew(bucketID, recoverableWriter.open(path), path, creationTime);
+		}
+
+		@Override
+		public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(final BucketID bucketID, final InProgressFileRecoverable inProgressFileRecoverable, final long creationTime) throws IOException {
+			final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable;
+			return resumeFrom(
+				bucketID,
+				recoverableWriter.recover(outputStreamBasedInProgressRecoverable.getResumeRecoverable()),
+				outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
+				creationTime);
+		}
+
+		@Override
+		public PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException {
+			final RecoverableWriter.CommitRecoverable commitRecoverable;
+
+			if (pendingFileRecoverable instanceof OutputStreamBasedPendingFileRecoverable) {
+				commitRecoverable = ((OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable).getCommitRecoverable();
+			} else if (pendingFileRecoverable instanceof OutputStreamBasedInProgressFileRecoverable) {
+				commitRecoverable = ((OutputStreamBasedInProgressFileRecoverable) pendingFileRecoverable).getResumeRecoverable();
+			} else {
+				throw new IllegalArgumentException("can not recover from the pendingFileRecoverable");
+			}
+			return new OutputStreamBasedPendingFile(recoverableWriter.recoverForCommit(commitRecoverable));
+		}
+
+		@Override
+		public boolean cleanupInProgressFileRecoverable(InProgressFileRecoverable inProgressFileRecoverable) throws IOException {
+			final RecoverableWriter.ResumeRecoverable resumeRecoverable =
+				((OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable).getResumeRecoverable();
+			return recoverableWriter.cleanupRecoverableState(resumeRecoverable);
+		}
+
+		@Override
+		public WriterProperties getProperties() {
+			return new WriterProperties(
+					new OutputStreamBasedInProgressFileRecoverableSerializer(recoverableWriter.getResumeRecoverableSerializer()),
+					new OutputStreamBasedPendingFileRecoverableSerializer(recoverableWriter.getCommitRecoverableSerializer()),
+					recoverableWriter.supportsResume());
+		}
+
+		public abstract InProgressFileWriter<IN, BucketID> openNew(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final Path path,
+			final long creationTime) throws IOException;
+
+		public abstract InProgressFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final RecoverableWriter.ResumeRecoverable resumable,
+			final long creationTime) throws IOException;
+	}
+
+	static final class OutputStreamBasedPendingFileRecoverable implements PendingFileRecoverable {
+
+		private final RecoverableWriter.CommitRecoverable commitRecoverable;
+
+		OutputStreamBasedPendingFileRecoverable(final RecoverableWriter.CommitRecoverable commitRecoverable) {
+			this.commitRecoverable = commitRecoverable;
+		}
+
+		RecoverableWriter.CommitRecoverable getCommitRecoverable() {
+			return commitRecoverable;
+		}
+	}
+
+	static final class OutputStreamBasedInProgressFileRecoverable implements InProgressFileRecoverable {
+
+		private final RecoverableWriter.ResumeRecoverable resumeRecoverable;
+
+		OutputStreamBasedInProgressFileRecoverable(final RecoverableWriter.ResumeRecoverable resumeRecoverable) {
+			this.resumeRecoverable = resumeRecoverable;
+		}
+
+		RecoverableWriter.ResumeRecoverable getResumeRecoverable() {
+			return resumeRecoverable;
+		}
+	}
+
+	static final class OutputStreamBasedPendingFile implements BucketWriter.PendingFile {
+
+		private final RecoverableFsDataOutputStream.Committer committer;
+
+		OutputStreamBasedPendingFile(final RecoverableFsDataOutputStream.Committer committer) {
+			this.committer = committer;
+		}
+
+		@Override
+		public void commit() throws IOException {
+			committer.commit();
+		}
+
+		@Override
+		public void commitAfterRecovery() throws IOException {
+			committer.commitAfterRecovery();
+		}
+	}
+
+	static class OutputStreamBasedInProgressFileRecoverableSerializer implements SimpleVersionedSerializer<InProgressFileRecoverable> {
+
+		private static final int MAGIC_NUMBER = 0xb3a4073d;
+
+		private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer;
+
+		OutputStreamBasedInProgressFileRecoverableSerializer(SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer) {
+			this.resumeSerializer = resumeSerializer;
+		}
+
+		@Override
+		public int getVersion() {
+			return 1;
+		}
+
+		@Override
+		public byte[] serialize(InProgressFileRecoverable inProgressRecoverable) throws IOException {
+			OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressRecoverable;
+			DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
+			dataOutputSerializer.writeInt(MAGIC_NUMBER);
+			serializeV1(outputStreamBasedInProgressRecoverable, dataOutputSerializer);
+			return dataOutputSerializer.getCopyOfBuffer();
+		}
+
+		@Override
+		public InProgressFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+			switch (version) {
+				case 1:
+					DataInputView dataInputView = new DataInputDeserializer(serialized);
+					validateMagicNumber(dataInputView);
+					return deserializeV1(dataInputView);
+				default:
+					throw new IOException("Unrecognized version or corrupt state: " + version);
+			}
+		}
+
+		SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeSerializer() {
+			return resumeSerializer;
+		}
+
+		private void serializeV1(final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable, final DataOutputView dataOutputView) throws IOException {
+			SimpleVersionedSerialization.writeVersionAndSerialize(resumeSerializer, outputStreamBasedInProgressRecoverable.getResumeRecoverable(), dataOutputView);
+		}
+
+		private OutputStreamBasedInProgressFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
+			return new OutputStreamBasedInProgressFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(resumeSerializer, dataInputView));
+		}
+
+		private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
+			final int magicNumber = dataInputView.readInt();
+			if (magicNumber != MAGIC_NUMBER) {
+				throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+			}
+		}
+	}
+
+	static class OutputStreamBasedPendingFileRecoverableSerializer implements SimpleVersionedSerializer<PendingFileRecoverable> {
+
+		private static final int MAGIC_NUMBER = 0x2c853c89;
+
+		private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer;
+
+		OutputStreamBasedPendingFileRecoverableSerializer(final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer) {
+			this.commitSerializer = commitSerializer;
+		}
+
+		@Override
+		public int getVersion() {
+			return 1;
+		}
+
+		@Override
+		public byte[] serialize(PendingFileRecoverable pendingFileRecoverable) throws IOException {
+			OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable = (OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable;
+			DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
+			dataOutputSerializer.writeInt(MAGIC_NUMBER);
+			serializeV1(outputStreamBasedPendingFileRecoverable, dataOutputSerializer);
+			return dataOutputSerializer.getCopyOfBuffer();
+		}
+
+		@Override
+		public PendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+			switch (version) {
+				case 1:
+					DataInputDeserializer in = new DataInputDeserializer(serialized);
+					validateMagicNumber(in);
+					return deserializeV1(in);
+
+				default:
+					throw new IOException("Unrecognized version or corrupt state: " + version);
+			}
+		}
+
+		SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitSerializer() {
+			return this.commitSerializer;
+		}
+
+		private void serializeV1(final OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable, final DataOutputView dataOutputView) throws IOException {
+			SimpleVersionedSerialization.writeVersionAndSerialize(commitSerializer, outputStreamBasedPendingFileRecoverable.getCommitRecoverable(), dataOutputView);
+		}
+
+		private OutputStreamBasedPendingFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
+			return new OutputStreamBasedPendingFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(commitSerializer, dataInputView));
+		}
+
+		private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
+			final int magicNumber = dataInputView.readInt();
+			if (magicNumber != MAGIC_NUMBER) {
+				throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+			}
+		}
+	}
+
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
deleted file mode 100644
index 95a2978a..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * An abstract writer for the currently open part file in a specific {@link Bucket}.
- *
- * <p>Currently, there are two subclasses, of this class:
- * <ol>
- *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
- *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
- * </ol>
- *
- * <p>This also implements the {@link PartFileInfo}.
- */
-@Internal
-abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
-
-	private final BucketID bucketId;
-
-	private final long creationTime;
-
-	protected final RecoverableFsDataOutputStream currentPartStream;
-
-	private long lastUpdateTime;
-
-	protected PartFileWriter(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream currentPartStream,
-			final long creationTime) {
-
-		Preconditions.checkArgument(creationTime >= 0L);
-		this.bucketId = Preconditions.checkNotNull(bucketId);
-		this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
-		this.creationTime = creationTime;
-		this.lastUpdateTime = creationTime;
-	}
-
-	abstract void write(IN element, long currentTime) throws IOException;
-
-	RecoverableWriter.ResumeRecoverable persist() throws IOException {
-		return currentPartStream.persist();
-	}
-
-	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
-		return currentPartStream.closeForCommit().getRecoverable();
-	}
-
-	void dispose() {
-		// we can suppress exceptions here, because we do not rely on close() to
-		// flush or persist any data
-		IOUtils.closeQuietly(currentPartStream);
-	}
-
-	void markWrite(long now) {
-		this.lastUpdateTime = now;
-	}
-
-	@Override
-	public BucketID getBucketId() {
-		return bucketId;
-	}
-
-	@Override
-	public long getCreationTime() {
-		return creationTime;
-	}
-
-	@Override
-	public long getSize() throws IOException {
-		return currentPartStream.getPos();
-	}
-
-	@Override
-	public long getLastUpdateTime() {
-		return lastUpdateTime;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * An interface for factories that create the different {@link PartFileWriter writers}.
-	 */
-	interface PartFileFactory<IN, BucketID> {
-
-		/**
-		 * Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
-		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
-		 * @param resumable the state of the stream we are resurrecting.
-		 * @param creationTime the creation time of the stream.
-		 * @return the recovered {@link PartFileWriter writer}.
-		 * @throws IOException
-		 */
-		PartFileWriter<IN, BucketID> resumeFrom(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream stream,
-			final RecoverableWriter.ResumeRecoverable resumable,
-			final long creationTime) throws IOException;
-
-		/**
-		 * Used to create a new {@link PartFileWriter writer}.
-		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
-		 * @param path the part this writer will write to.
-		 * @param creationTime the creation time of the stream.
-		 * @return the new {@link PartFileWriter writer}.
-		 * @throws IOException
-		 */
-		PartFileWriter<IN, BucketID> openNew(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream stream,
-			final Path path,
-			final long creationTime) throws IOException;
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
new file mode 100644
index 0000000..784f8be
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
+ */
+@Internal
+class RowWiseBucketWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
+
+	private final Encoder<IN> encoder;
+
+	RowWiseBucketWriter(final RecoverableWriter recoverableWriter, final Encoder<IN> encoder) {
+		super(recoverableWriter);
+		this.encoder = encoder;
+	}
+
+	@Override
+	public InProgressFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final RecoverableWriter.ResumeRecoverable resumable,
+			final long creationTime) {
+
+		Preconditions.checkNotNull(stream);
+		Preconditions.checkNotNull(resumable);
+
+		return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+	}
+
+	@Override
+	public InProgressFileWriter<IN, BucketID> openNew(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream stream,
+			final Path path,
+			final long creationTime) {
+
+		Preconditions.checkNotNull(stream);
+		Preconditions.checkNotNull(path);
+
+		return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 05c160c..bed9ec7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -20,23 +20,21 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 
 /**
- * A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}.
+ * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}.
  * This also implements the {@link PartFileInfo}.
  */
 @Internal
-final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
 
 	private final Encoder<IN> encoder;
 
-	private RowWisePartWriter(
+	RowWisePartWriter(
 			final BucketID bucketId,
 			final RecoverableFsDataOutputStream currentPartStream,
 			final Encoder<IN> encoder,
@@ -46,48 +44,8 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
 	}
 
 	@Override
-	void write(IN element, long currentTime) throws IOException {
+	public void write(final IN element, final long currentTime) throws IOException {
 		encoder.encode(element, currentPartStream);
 		markWrite(currentTime);
 	}
-
-	/**
-	 * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
-	 * @param <IN> The type of input elements.
-	 * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
-	 */
-	static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
-
-		private final Encoder<IN> encoder;
-
-		Factory(Encoder<IN> encoder) {
-			this.encoder = encoder;
-		}
-
-		@Override
-		public PartFileWriter<IN, BucketID> resumeFrom(
-				final BucketID bucketId,
-				final RecoverableFsDataOutputStream stream,
-				final RecoverableWriter.ResumeRecoverable resumable,
-				final long creationTime) throws IOException {
-
-			Preconditions.checkNotNull(stream);
-			Preconditions.checkNotNull(resumable);
-
-			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
-		}
-
-		@Override
-		public PartFileWriter<IN, BucketID> openNew(
-				final BucketID bucketId,
-				final RecoverableFsDataOutputStream stream,
-				final Path path,
-				final long creationTime) throws IOException {
-
-			Preconditions.checkNotNull(stream);
-			Preconditions.checkNotNull(path);
-
-			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
-		}
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index cb58529..64e4418 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -277,7 +277,7 @@ public class StreamingFileSink<IN>
 					basePath,
 					bucketAssigner,
 					bucketFactory,
-					new RowWisePartWriter.Factory<>(encoder),
+					new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder),
 					rollingPolicy,
 					bucketLifeCycleListener,
 					subtaskIndex,
@@ -397,7 +397,7 @@ public class StreamingFileSink<IN>
 					basePath,
 					bucketAssigner,
 					bucketFactory,
-					new BulkPartWriter.Factory<>(writerFactory),
+					new BulkBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory),
 					rollingPolicy,
 					bucketLifeCycleListener,
 					subtaskIndex,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java
new file mode 100644
index 0000000..4fee03c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class describes the property of the {@link BucketWriter}.
+ */
+@Internal
+public class WriterProperties {
+
+	private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
+
+	private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
+
+	private final boolean supportsResume;
+
+	WriterProperties(
+			SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
+			SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
+			boolean supportsResume) {
+		this.inProgressFileRecoverableSerializer = checkNotNull(inProgressFileRecoverableSerializer);
+		this.pendingFileRecoverableSerializer = checkNotNull(pendingFileRecoverableSerializer);
+		this.supportsResume = supportsResume;
+	}
+
+	/**
+	 * @return Whether the {@link BucketWriter} support appending data to the restored the in-progress file or not.
+	 */
+	boolean supportsResume() {
+		return supportsResume;
+	}
+
+	/**
+	 * @return the serializer for the {@link InProgressFileWriter.PendingFileRecoverable}.
+	 */
+	SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverableSerializer() {
+		return pendingFileRecoverableSerializer;
+	}
+
+	/**
+	 * @return the serializer for the {@link InProgressFileWriter.InProgressFileRecoverable}.
+	 */
+	SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> getInProgressFileRecoverableSerializer() {
+		return inProgressFileRecoverableSerializer;
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index f48e467..ff2cc5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
@@ -54,7 +55,7 @@ public class BucketAssignerITCases {
 			basePath,
 			new BasePathBucketAssigner<>(),
 			new DefaultBucketFactoryImpl<>(),
-			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+			new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 			rollingPolicy,
 			null,
 			0,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 81c5766..cbd18c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
@@ -136,7 +135,7 @@ public class BucketStateSerializerTest {
 
 		Assert.assertEquals(testBucketPath, bucket.getBucketPath());
 		Assert.assertNull(bucket.getInProgressPart());
-		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+		Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
 	}
 
 	@Test
@@ -266,8 +265,8 @@ public class BucketStateSerializerTest {
 			final int noOfPendingCheckpoints = 5;
 
 			// there are 5 checkpoint does not complete.
-			final Map<Long, List<RecoverableWriter.CommitRecoverable>>
-				pendingFileRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
+			final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>>
+				pendingFileRecoverables = recoveredState.getPendingFileRecoverablesPerCheckpoint();
 			Assert.assertEquals(5L, pendingFileRecoverables.size());
 
 			final Set<String> beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID))
@@ -283,7 +282,7 @@ public class BucketStateSerializerTest {
 			// recover and commit
 			final Bucket bucket = restoreBucket(noOfPendingCheckpoints + 1, recoveredState);
 			Assert.assertEquals(testBucketPath, bucket.getBucketPath());
-			Assert.assertEquals(0, bucket.getPendingPartsPerCheckpoint().size());
+			Assert.assertEquals(0, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
 
 			final Set<String> afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID))
 				.map(file -> file.getFileName().toString())
@@ -313,27 +312,37 @@ public class BucketStateSerializerTest {
 
 	private static Bucket<String, String> createNewBucket(final Path bucketPath) throws IOException {
 		return Bucket.getNew(
-			FileSystem.getLocalFileSystem().createRecoverableWriter(),
 			0,
 			BUCKET_ID,
 			bucketPath,
 			0,
-			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+			createBucketWriter(),
 			DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
 			OutputFileConfig.builder().build());
 	}
 
 	private static Bucket<String, String> restoreBucket(final int initialPartCounter, final BucketState<String> bucketState) throws IOException {
 		return Bucket.restore(
-			FileSystem.getLocalFileSystem().createRecoverableWriter(),
 			0,
 			initialPartCounter,
-			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+			createBucketWriter(),
 			DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
 			bucketState,
 			OutputFileConfig.builder().build());
 	}
 
+	private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
+		return new RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(), new SimpleStringEncoder<>());
+	}
+
+	private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
+		final RowWiseBucketWriter bucketWriter = createBucketWriter();
+		return new BucketStateSerializer<>(
+			bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+			bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+			SimpleVersionedStringSerializer.INSTANCE);
+	}
+
 	private static BucketState<String> readBucketState(final String scenarioName, final int version) throws IOException {
 		byte[] bytes = Files.readAllBytes(getSnapshotPath(scenarioName, version));
 		return SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), bytes);
@@ -351,14 +360,6 @@ public class BucketStateSerializerTest {
 		return readBucketState(scenarioName, version);
 	}
 
-	private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
-		RecoverableWriter recoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
-		return new BucketStateSerializer<>(
-			recoverableWriter.getResumeRecoverableSerializer(),
-			recoverableWriter.getCommitRecoverableSerializer(),
-			SimpleVersionedStringSerializer.INSTANCE);
-	}
-
 	private static void moveToTemplateDirectory(java.nio.file.Path scenarioPath) throws IOException {
 		FileUtils.copy(new Path(scenarioPath.toString()), new Path(scenarioPath.toString() + "-template"), false);
 		FileUtils.deleteDirectory(scenarioPath.toFile());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index ee85e55..a4d9a09 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
+import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -171,7 +172,7 @@ public class BucketTest {
 		return new TypeSafeMatcher<BucketState<String>>() {
 			@Override
 			protected boolean matchesSafely(BucketState<String> state) {
-				return state.getInProgressResumableFile() != null;
+				return state.getInProgressFileRecoverable() != null;
 			}
 
 			@Override
@@ -185,7 +186,7 @@ public class BucketTest {
 		return new TypeSafeMatcher<BucketState<String>>() {
 			@Override
 			protected boolean matchesSafely(BucketState<String> state) {
-				return state.getInProgressResumableFile() == null;
+				return state.getInProgressFileRecoverable() == null;
 			}
 
 			@Override
@@ -200,7 +201,7 @@ public class BucketTest {
 		return new TypeSafeMatcher<Bucket<String, String>>() {
 			@Override
 			protected boolean matchesSafely(Bucket<String, String> bucket) {
-				final PartFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
+				final InProgressFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
 				return isNull == (inProgressPart == null);
 			}
 
@@ -349,23 +350,21 @@ public class BucketTest {
 
 	private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
 
-	private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
-			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
+	private static final Encoder ENCODER = new SimpleStringEncoder<>();
 
 	private static Bucket<String, String> createBucket(
 			final RecoverableWriter writer,
 			final Path bucketPath,
 			final int subtaskIdx,
 			final int initialPartCounter,
-			final OutputFileConfig outputFileConfig) {
+			final OutputFileConfig outputFileConfig) throws IOException {
 
 		return Bucket.getNew(
-				writer,
 				subtaskIdx,
 				bucketId,
 				bucketPath,
 				initialPartCounter,
-				partFileFactory,
+				new RowWiseBucketWriter<>(writer, ENCODER),
 				rollingPolicy,
 				outputFileConfig);
 	}
@@ -378,10 +377,9 @@ public class BucketTest {
 			final OutputFileConfig outputFileConfig) throws Exception {
 
 		return Bucket.restore(
-				writer,
 				subtaskIndex,
 				initialPartCounter,
-				partFileFactory,
+				new RowWiseBucketWriter<>(writer, ENCODER),
 				rollingPolicy,
 				bucketState,
 				outputFileConfig);
@@ -402,24 +400,46 @@ public class BucketTest {
 
 	private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws IOException {
 		final BucketState<String> stateWithOnlyInProgressFile =
-				new BucketState<>("test", new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
-		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, OutputFileConfig.builder().build());
+				new BucketState<>(
+					"test",
+					new Path(),
+					12345L,
+					new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(new NoOpRecoverable()),
+					new HashMap<>());
+
+		return Bucket.restore(
+			0,
+			1L,
+			new RowWiseBucketWriter<>(writer, ENCODER),
+			rollingPolicy,
+			stateWithOnlyInProgressFile,
+			OutputFileConfig.builder().build());
 	}
 
 	private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int numberOfPendingParts) throws IOException {
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> completePartsPerCheckpoint =
+		final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> completePartsPerCheckpoint =
 				createPendingPartsPerCheckpoint(numberOfPendingParts);
 
 		final BucketState<String> initStateWithOnlyInProgressFile =
-				new BucketState<>("test", new Path(), 12345L, null, completePartsPerCheckpoint);
-		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
+				new BucketState<>(
+					"test",
+					new Path(),
+					12345L,
+					null,
+					completePartsPerCheckpoint);
+		return Bucket.restore(
+			0,
+			1L,
+			new RowWiseBucketWriter<>(writer, ENCODER),
+			rollingPolicy,
+			initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
 	}
 
-	private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
-		final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
+	private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
+		final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
 		for (int checkpointId = 0; checkpointId < noOfCheckpoints; checkpointId++) {
-			final List<RecoverableWriter.CommitRecoverable> pending = new ArrayList<>();
-			pending.add(new NoOpRecoverable());
+			final List<InProgressFileWriter.PendingFileRecoverable> pending = new ArrayList<>();
+			pending.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new NoOpRecoverable()));
 			pendingCommittablesPerCheckpoint.put((long) checkpointId, pending);
 		}
 		return pendingCommittablesPerCheckpoint;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index e996444..8e2117a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.MockListState;
@@ -93,8 +94,8 @@ public class BucketsTest {
 				return bucket.getBucketId().equals(bucketId) &&
 						bucket.getBucketPath().equals(new Path(testTmpPath, bucketId)) &&
 						bucket.getInProgressPart() == null &&
-						bucket.getPendingPartsForCurrentCheckpoint().isEmpty() &&
-						bucket.getPendingPartsPerCheckpoint().size() == 1;
+						bucket.getPendingFileRecoverablesForCurrentCheckpoint().isEmpty() &&
+						bucket.getPendingFileRecoverablesPerCheckpoint().size() == 1;
 			}
 
 			@Override
@@ -145,7 +146,7 @@ public class BucketsTest {
 		Assert.assertEquals(2L, bucketsTwo.getMaxPartCounter());
 
 		// make sure we have one in-progress file here and a pending
-		Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingPartsPerCheckpoint().size());
+		Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingFileRecoverablesPerCheckpoint().size());
 		Assert.assertNotNull(bucketsTwo.getActiveBuckets().get("test1").getInProgressPart());
 
 		final ListState<byte[]> mergedBucketStateContainer = new MockListState<>();
@@ -175,10 +176,10 @@ public class BucketsTest {
 
 		// this is due to the Bucket#merge(). The in progress file of one
 		// of the previous tasks is put in the list of pending files.
-		Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
+		Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
 
 		// we commit the pending for previous checkpoints
-		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+		Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
 	}
 
 	@Test
@@ -210,8 +211,8 @@ public class BucketsTest {
 		Assert.assertEquals("test", bucket.getBucketId());
 
 		Assert.assertNull(bucket.getInProgressPart());
-		Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
-		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+		Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
+		Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
 	}
 
 	@Test
@@ -321,7 +322,7 @@ public class BucketsTest {
 				path,
 				new VerifyingBucketAssigner(timestamp, watermark, processingTime),
 				new DefaultBucketFactoryImpl<>(),
-				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				new RowWiseBucketWriter<>(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				DefaultRollingPolicy.builder().build(),
 				null,
 				2,
@@ -458,7 +459,7 @@ public class BucketsTest {
 				basePath,
 				new TestUtils.StringIdentityBucketAssigner(),
 				new DefaultBucketFactoryImpl<>(),
-				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				rollingPolicy,
 				bucketLifeCycleListener,
 				subtaskIdx,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 59ea627..2a4da34 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
@@ -201,7 +202,7 @@ public class RollingPolicyTest {
 				basePath,
 				new TestUtils.StringIdentityBucketAssigner(),
 				new DefaultBucketFactoryImpl<>(),
-				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				rollingPolicyToTest,
 				null,
 				0,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index c540dc73..df678c5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
@@ -50,6 +49,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
+
 /**
  * Utilities for the {@link StreamingFileSink} tests.
  */
@@ -158,7 +159,7 @@ public class TestUtils {
 			.forBulkFormat(new Path(outDir.toURI()), writer)
 			.withBucketAssigner(bucketer)
 			.withBucketCheckInterval(bucketCheckInterval)
-			.withRollingPolicy(OnCheckpointRollingPolicy.build())
+			.withRollingPolicy(build())
 			.withBucketFactory(bucketFactory)
 			.withOutputFileConfig(outputFileConfig)
 			.build();
@@ -199,7 +200,7 @@ public class TestUtils {
 		StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink
 				.forBulkFormat(new Path(outDir.toURI()), writer)
 				.withNewBucketAssigner(bucketer)
-				.withRollingPolicy(OnCheckpointRollingPolicy.build())
+				.withRollingPolicy(build())
 				.withBucketCheckInterval(bucketCheckInterval)
 				.withBucketFactory(bucketFactory)
 				.withOutputFileConfig(outputFileConfig)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
index e21da2a..6260a2c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
@@ -50,7 +50,7 @@ public class NoOpRecoverableWriter implements RecoverableWriter {
 
 	@Override
 	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
-		throw new UnsupportedOperationException();
+		return false;
 	}
 
 	@Override


[flink] 01/04: [minor] Allow relative paths in LocalFileSystem

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2deff2967b7de423b10f7f01a41c06565c37e62
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Mon May 18 09:37:07 2020 +0200

    [minor] Allow relative paths in LocalFileSystem
---
 .../main/java/org/apache/flink/core/fs/local/LocalFileSystem.java | 8 ++------
 .../apache/flink/core/fs/local/LocalRecoverableSerializer.java    | 4 ++--
 .../org/apache/flink/core/fs/local/LocalRecoverableWriter.java    | 1 -
 3 files changed, 4 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 6946558..1a06876 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -49,6 +49,7 @@ import java.nio.file.DirectoryNotEmptyException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -308,14 +309,9 @@ public class LocalFileSystem extends FileSystem {
 
 	/**
 	 * Converts the given Path to a File for this file system.
-	 *
-	 * <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
 	 */
 	public File pathToFile(Path path) {
-		if (!path.isAbsolute()) {
-			path = new Path(getWorkingDirectory(), path);
-		}
-		return new File(path.toUri().getPath());
+		return Paths.get(path.getPath()).toFile();
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
index 685f7c1..8a68928 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableSerializer.java
@@ -52,8 +52,8 @@ class LocalRecoverableSerializer implements SimpleVersionedSerializer<LocalRecov
 
 	@Override
 	public byte[] serialize(LocalRecoverable obj) throws IOException {
-		final byte[] targetFileBytes = obj.targetFile().getAbsolutePath().getBytes(CHARSET);
-		final byte[] tempFileBytes = obj.tempFile().getAbsolutePath().getBytes(CHARSET);
+		final byte[] targetFileBytes = obj.targetFile().toString().getBytes(CHARSET);
+		final byte[] tempFileBytes = obj.tempFile().toString().getBytes(CHARSET);
 		final byte[] targetBytes = new byte[20 + targetFileBytes.length + tempFileBytes.length];
 
 		ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
index a43e0b6..bae7314 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -116,7 +116,6 @@ public class LocalRecoverableWriter implements RecoverableWriter {
 
 	@VisibleForTesting
 	static File generateStagingTempFilePath(File targetFile) {
-		checkArgument(targetFile.isAbsolute(), "targetFile must be absolute");
 		checkArgument(!targetFile.isDirectory(), "targetFile must not be a directory");
 
 		final File parent = targetFile.getParentFile();