You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2019/11/15 12:45:56 UTC

[flink] branch master updated: [FLINK-13850] refactor part file configurations into a single method

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 22b2a88  [FLINK-13850] refactor part file configurations into a single method
22b2a88 is described below

commit 22b2a8856307c310b4c75b32eeed33ba66c0206e
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Thu Nov 14 20:14:02 2019 +0100

    [FLINK-13850] refactor part file configurations into a single method
    
    Closes #9533
---
 docs/dev/connectors/streamfile_sink.md             | 14 ++++
 .../api/functions/sink/filesystem/Bucket.java      | 24 +++---
 .../functions/sink/filesystem/BucketFactory.java   |  4 +-
 .../api/functions/sink/filesystem/Buckets.java     | 10 +--
 .../sink/filesystem/DefaultBucketFactoryImpl.java  |  8 +-
 .../sink/filesystem/OutputFileConfig.java          | 98 ++++++++++++++++++++++
 .../functions/sink/filesystem/PartFileConfig.java  | 53 ------------
 .../sink/filesystem/StreamingFileSink.java         | 58 +++++--------
 .../sink/filesystem/BucketAssignerITCases.java     |  2 +-
 .../api/functions/sink/filesystem/BucketTest.java  | 32 +++++--
 .../api/functions/sink/filesystem/BucketsTest.java |  6 +-
 .../functions/sink/filesystem/BulkWriterTest.java  |  3 +-
 .../sink/filesystem/RollingPolicyTest.java         |  2 +-
 .../api/functions/sink/filesystem/TestUtils.java   |  9 +-
 14 files changed, 192 insertions(+), 131 deletions(-)

diff --git a/docs/dev/connectors/streamfile_sink.md b/docs/dev/connectors/streamfile_sink.md
index c59e646..378a0f4 100644
--- a/docs/dev/connectors/streamfile_sink.md
+++ b/docs/dev/connectors/streamfile_sink.md
@@ -133,6 +133,20 @@ New buckets are created as dictated by the bucketing policy, and this doesn't af
 
 Old buckets can still receive new records as the bucketing policy is evaluated on a per-record basis.
 
+### Part file configuration
+
+The filenames of the part files could be defined using `OutputFileConfig`, this configuration contain a part prefix and part suffix, 
+that will be used with the parallel subtask index of the sink and a rolling counter. 
+For example for a prefix "prefix" and a suffix ".ext" the file create:
+
+```
+└── 2019-08-25--12
+    ├── prefix-0-0.ext
+    ├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    ├── prefix-1-0.ext
+    └── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
+
 ## File Formats
 
 The `StreamingFileSink` supports both row-wise and bulk encoding formats, such as [Apache Parquet](http://parquet.apache.org).
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 4a996e7..7efd2ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -70,7 +70,7 @@ public class Bucket<IN, BucketID> {
 
 	private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
 
-	private final PartFileConfig partFileConfig;
+	private final OutputFileConfig outputFileConfig;
 
 	private long partCounter;
 
@@ -90,7 +90,7 @@ public class Bucket<IN, BucketID> {
 			final long initialPartCounter,
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
-			final PartFileConfig partFileConfig) {
+			final OutputFileConfig outputFileConfig) {
 		this.fsWriter = checkNotNull(fsWriter);
 		this.subtaskIndex = subtaskIndex;
 		this.bucketId = checkNotNull(bucketId);
@@ -103,7 +103,7 @@ public class Bucket<IN, BucketID> {
 		this.pendingPartsPerCheckpoint = new TreeMap<>();
 		this.resumablesPerCheckpoint = new TreeMap<>();
 
-		this.partFileConfig = checkNotNull(partFileConfig);
+		this.outputFileConfig = checkNotNull(outputFileConfig);
 	}
 
 	/**
@@ -116,7 +116,7 @@ public class Bucket<IN, BucketID> {
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
-			final PartFileConfig partFileConfig) throws IOException {
+			final OutputFileConfig outputFileConfig) throws IOException {
 
 		this(
 				fsWriter,
@@ -126,7 +126,7 @@ public class Bucket<IN, BucketID> {
 				initialPartCounter,
 				partFileFactory,
 				rollingPolicy,
-				partFileConfig);
+				outputFileConfig);
 
 		restoreInProgressFile(bucketState);
 		commitRecoveredPendingFiles(bucketState);
@@ -230,7 +230,7 @@ public class Bucket<IN, BucketID> {
 	}
 
 	private Path assembleNewPartPath() {
-		return new Path(bucketPath, partFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + partFileConfig.getPartSuffix());
+		return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
 	}
 
 	private CommitRecoverable closePartFile() throws IOException {
@@ -369,7 +369,7 @@ public class Bucket<IN, BucketID> {
 	 * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
 	 * @param <IN> the type of input elements to the sink.
 	 * @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
-	 * @param partFileConfig the part file configuration.
+	 * @param outputFileConfig the part file configuration.
 	 * @return The new Bucket.
 	 */
 	static <IN, BucketID> Bucket<IN, BucketID> getNew(
@@ -380,8 +380,8 @@ public class Bucket<IN, BucketID> {
 			final long initialPartCounter,
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
-			final PartFileConfig partFileConfig) {
-		return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, partFileConfig);
+			final OutputFileConfig outputFileConfig) {
+		return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
 	}
 
 	/**
@@ -393,7 +393,7 @@ public class Bucket<IN, BucketID> {
 	 * @param bucketState the initial state of the restored bucket.
 	 * @param <IN> the type of input elements to the sink.
 	 * @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
-	 * @param partFileConfig the part file configuration.
+	 * @param outputFileConfig the part file configuration.
 	 * @return The restored Bucket.
 	 */
 	static <IN, BucketID> Bucket<IN, BucketID> restore(
@@ -403,7 +403,7 @@ public class Bucket<IN, BucketID> {
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
-			final PartFileConfig partFileConfig) throws IOException {
-		return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, partFileConfig);
+			final OutputFileConfig outputFileConfig) throws IOException {
+		return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index c318ef2..260e82c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -39,7 +39,7 @@ interface BucketFactory<IN, BucketID> extends Serializable {
 			final long initialPartCounter,
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
-			final PartFileConfig partFileConfig) throws IOException;
+			final OutputFileConfig outputFileConfig) throws IOException;
 
 	Bucket<IN, BucketID> restoreBucket(
 			final RecoverableWriter fsWriter,
@@ -48,5 +48,5 @@ interface BucketFactory<IN, BucketID> extends Serializable {
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
-			final PartFileConfig partFileConfig) throws IOException;
+			final OutputFileConfig outputFileConfig) throws IOException;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index fa7a72b..eb61d18 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -77,7 +77,7 @@ public class Buckets<IN, BucketID> {
 
 	private final RecoverableWriter fsWriter;
 
-	private final PartFileConfig partFileConfig;
+	private final OutputFileConfig outputFileConfig;
 
 	// --------------------------- State Related Fields -----------------------------
 
@@ -99,7 +99,7 @@ public class Buckets<IN, BucketID> {
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final int subtaskIndex,
-			final PartFileConfig partFileConfig) throws IOException {
+			final OutputFileConfig outputFileConfig) throws IOException {
 
 		this.basePath = Preconditions.checkNotNull(basePath);
 		this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
@@ -108,7 +108,7 @@ public class Buckets<IN, BucketID> {
 		this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
 		this.subtaskIndex = subtaskIndex;
 
-		this.partFileConfig = Preconditions.checkNotNull(partFileConfig);
+		this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
 
 		this.activeBuckets = new HashMap<>();
 		this.bucketerContext = new Buckets.BucketerContext();
@@ -186,7 +186,7 @@ public class Buckets<IN, BucketID> {
 						partFileWriterFactory,
 						rollingPolicy,
 						recoveredState,
-						partFileConfig
+						outputFileConfig
 				);
 
 		updateActiveBucketId(bucketId, restoredBucket);
@@ -293,7 +293,7 @@ public class Buckets<IN, BucketID> {
 					maxPartCounter,
 					partFileWriterFactory,
 					rollingPolicy,
-					partFileConfig);
+					outputFileConfig);
 			activeBuckets.put(bucketId, bucket);
 		}
 		return bucket;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index dac2a5a..529b93a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -41,7 +41,7 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
 			final long initialPartCounter,
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
-			final PartFileConfig partFileConfig) {
+			final OutputFileConfig outputFileConfig) {
 
 		return Bucket.getNew(
 				fsWriter,
@@ -51,7 +51,7 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
 				initialPartCounter,
 				partFileWriterFactory,
 				rollingPolicy,
-				partFileConfig);
+				outputFileConfig);
 	}
 
 	@Override
@@ -62,7 +62,7 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final BucketState<BucketID> bucketState,
-			final PartFileConfig partFileConfig) throws IOException {
+			final OutputFileConfig outputFileConfig) throws IOException {
 
 		return Bucket.restore(
 				fsWriter,
@@ -71,6 +71,6 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
 				partFileWriterFactory,
 				rollingPolicy,
 				bucketState,
-				partFileConfig);
+				outputFileConfig);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
new file mode 100644
index 0000000..98b84d1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Part file name configuration.
+ * This allow to define a prefix and a suffix to the part file name.
+ */
+public class OutputFileConfig implements Serializable {
+
+	private final String partPrefix;
+
+	private final String partSuffix;
+
+	/**
+	 *	Initiates the {@code PartFileConfig} with values passed as parameters.
+	 *
+	 * @param partPrefix - the beginning of part file name
+	 * @param partSuffix - the ending of part file name
+	 */
+	public OutputFileConfig(final String partPrefix, final String partSuffix) {
+		this.partPrefix = Preconditions.checkNotNull(partPrefix);
+		this.partSuffix = Preconditions.checkNotNull(partSuffix);
+	}
+
+	/**
+	 * The prefix for the part name.
+	 */
+	String getPartPrefix() {
+		return partPrefix;
+	}
+
+	/**
+	 * The suffix for the part name.
+	 */
+	String getPartSuffix() {
+		return partSuffix;
+	}
+
+	public static OutputFileConfigBuilder builder() {
+		return new OutputFileConfigBuilder();
+	}
+
+	/**
+	 * A builder to create the part file configuration.
+	 */
+	@PublicEvolving
+	public static class OutputFileConfigBuilder {
+
+		private static final String DEFAULT_PART_PREFIX = "part";
+
+		private static final String DEFAULT_PART_SUFFIX = "";
+
+		private String partPrefix;
+
+		private String partSuffix;
+
+		private OutputFileConfigBuilder() {
+			this.partPrefix = DEFAULT_PART_PREFIX;
+			this.partSuffix = DEFAULT_PART_SUFFIX;
+		}
+
+		public OutputFileConfigBuilder withPartPrefix(String prefix) {
+			this.partPrefix = prefix;
+			return this;
+		}
+
+		public OutputFileConfigBuilder withPartSuffix(String suffix) {
+			this.partSuffix = suffix;
+			return this;
+		}
+
+		public OutputFileConfig build() {
+			return new OutputFileConfig(partPrefix, partSuffix);
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileConfig.java
deleted file mode 100644
index f3eaa59..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.util.Preconditions;
-
-/**
- * Part file name configuration.
- * This allow to define a prefix and a suffix to the part file name.
- */
-class PartFileConfig {
-
-	public static final String DEFAULT_PART_PREFIX = "part";
-
-	public static final String DEFAULT_PART_SUFFIX = "";
-
-	private final String partPrefix;
-
-	private final String partSuffix;
-
-	PartFileConfig() {
-		this(DEFAULT_PART_PREFIX, DEFAULT_PART_SUFFIX);
-	}
-
-	PartFileConfig(final String partPrefix, final String partSuffix) {
-		this.partPrefix = Preconditions.checkNotNull(partPrefix);
-		this.partSuffix = Preconditions.checkNotNull(partSuffix);
-	}
-
-	String getPartPrefix() {
-		return partPrefix;
-	}
-
-	String getPartSuffix() {
-		return partSuffix;
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index ed7e926..cd4afc2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -67,9 +67,11 @@ import java.io.Serializable;
  * {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}.
  *
  *
- * <p>The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink
- * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
- * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask.
+ * <p>The filenames of the part files could be defined using {@link OutputFileConfig}, this configuration contain
+ * a part prefix and part suffix, that will be used with the parallel subtask index of the sink
+ * and a rolling counter. For example for a prefix "prefix" and a suffix ".ext" the file create will have a name
+ * {@code "prefix-1-17.ext"} containing the data from {@code subtask 1} of the sink and is the {@code 17th} bucket
+ * created by that subtask.
  * Part files roll based on the user-specified {@link RollingPolicy}. By default, a {@link DefaultRollingPolicy}
  * is used.
  *
@@ -217,12 +219,10 @@ public class StreamingFileSink<IN>
 
 		private BucketFactory<IN, BucketID> bucketFactory;
 
-		private String partFilePrefix;
-
-		private String partFileSuffix;
+		private OutputFileConfig outputFileConfig;
 
 		protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
-			this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, PartFileConfig.DEFAULT_PART_SUFFIX);
+			this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build());
 		}
 
 		protected RowFormatBuilder(
@@ -232,16 +232,14 @@ public class StreamingFileSink<IN>
 				RollingPolicy<IN, BucketID> policy,
 				long bucketCheckInterval,
 				BucketFactory<IN, BucketID> bucketFactory,
-				String partFilePrefix,
-				String partFileSuffix) {
+				OutputFileConfig outputFileConfig) {
 			this.basePath = Preconditions.checkNotNull(basePath);
 			this.encoder = Preconditions.checkNotNull(encoder);
 			this.bucketAssigner = Preconditions.checkNotNull(assigner);
 			this.rollingPolicy = Preconditions.checkNotNull(policy);
 			this.bucketCheckInterval = bucketCheckInterval;
 			this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
-			this.partFilePrefix = Preconditions.checkNotNull(partFilePrefix);
-			this.partFileSuffix = Preconditions.checkNotNull(partFileSuffix);
+			this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
 		}
 
 		public long getBucketCheckInterval() {
@@ -263,19 +261,14 @@ public class StreamingFileSink<IN>
 			return self();
 		}
 
-		public T withPartFilePrefix(final String partPrefix) {
-			this.partFilePrefix = partPrefix;
-			return self();
-		}
-
-		public T withPartFileSuffix(final String partSuffix) {
-			this.partFileSuffix = partSuffix;
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
 			return self();
 		}
 
 		public <ID> StreamingFileSink.RowFormatBuilder<IN, ID, ? extends RowFormatBuilder<IN, ID, ?>> withNewBucketAssignerAndPolicy(final BucketAssigner<IN, ID> assigner, final RollingPolicy<IN, ID> policy) {
 			Preconditions.checkState(bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssignerAndPolicy() cannot be called after specifying a customized bucket factory");
-			return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), partFilePrefix, partFileSuffix);
+			return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
 		}
 
 		/** Creates the actual sink. */
@@ -298,7 +291,7 @@ public class StreamingFileSink<IN>
 					new RowWisePartWriter.Factory<>(encoder),
 					rollingPolicy,
 					subtaskIndex,
-					new PartFileConfig(partFilePrefix, partFileSuffix));
+					outputFileConfig);
 		}
 	}
 
@@ -320,12 +313,10 @@ public class StreamingFileSink<IN>
 
 		private BucketFactory<IN, BucketID> bucketFactory;
 
-		private String partFilePrefix;
-
-		private String partFileSuffix;
+		private OutputFileConfig outputFileConfig;
 
 		protected BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner) {
-			this(basePath, writerFactory, assigner, DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, PartFileConfig.DEFAULT_PART_SUFFIX);
+			this(basePath, writerFactory, assigner, DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build());
 		}
 
 		protected BulkFormatBuilder(
@@ -334,15 +325,13 @@ public class StreamingFileSink<IN>
 				BucketAssigner<IN, BucketID> assigner,
 				long bucketCheckInterval,
 				BucketFactory<IN, BucketID> bucketFactory,
-				String partFilePrefix,
-				String partFileSuffix) {
+				OutputFileConfig outputFileConfig) {
 			this.basePath = Preconditions.checkNotNull(basePath);
 			this.writerFactory = writerFactory;
 			this.bucketAssigner = Preconditions.checkNotNull(assigner);
 			this.bucketCheckInterval = bucketCheckInterval;
 			this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
-			this.partFilePrefix = Preconditions.checkNotNull(partFilePrefix);
-			this.partFileSuffix = Preconditions.checkNotNull(partFileSuffix);
+			this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
 		}
 
 		public long getBucketCheckInterval() {
@@ -363,21 +352,16 @@ public class StreamingFileSink<IN>
 		T withBucketFactory(final BucketFactory<IN, BucketID> factory) {
 			this.bucketFactory = Preconditions.checkNotNull(factory);
 			return self();
-	}
-
-		public T withPartFilePrefix(final String partPrefix) {
-			this.partFilePrefix = partPrefix;
-			return self();
 		}
 
-		public T withPartFileSuffix(final String partSuffix) {
-			this.partFileSuffix = partSuffix;
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
 			return self();
 		}
 
 		public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID, ? extends BulkFormatBuilder<IN, ID, ?>> withNewBucketAssigner(final BucketAssigner<IN, ID> assigner) {
 			Preconditions.checkState(bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssigner() cannot be called after specifying a customized bucket factory");
-			return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(assigner), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), partFilePrefix, partFileSuffix);
+			return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(assigner), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
 		}
 
 		/** Creates the actual sink. */
@@ -394,7 +378,7 @@ public class StreamingFileSink<IN>
 					new BulkPartWriter.Factory<>(writerFactory),
 					OnCheckpointRollingPolicy.build(),
 					subtaskIndex,
-					new PartFileConfig(partFilePrefix, partFileSuffix));
+					outputFileConfig);
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index cc91a33..02c1264 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -57,7 +57,7 @@ public class BucketAssignerITCases {
 			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
 			rollingPolicy,
 			0,
-			new PartFileConfig()
+			PartFileConfig.builder().build()
 		);
 
 		Bucket<String, String> bucket =
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 583bacf..2bcb881 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -64,7 +64,7 @@ public class BucketTest {
 
 		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
 		final Bucket<String, String> bucketUnderTest =
-				createBucket(recoverableWriter, path, 0, 0, new PartFileConfig());
+				createBucket(recoverableWriter, path, 0, 0, PartFileConfig.builder().build());
 
 		bucketUnderTest.write("test-element", 0L);
 
@@ -82,7 +82,7 @@ public class BucketTest {
 
 		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
 		final Bucket<String, String> bucketUnderTest =
-				createBucket(recoverableWriter, path, 0, 0, new PartFileConfig());
+				createBucket(recoverableWriter, path, 0, 0, PartFileConfig.builder().build());
 
 		bucketUnderTest.write("test-element", 0L);
 
@@ -99,13 +99,35 @@ public class BucketTest {
 	}
 
 	@Test
+	public void shouldCleanupResumableAfterRestoring() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0, PartFileConfig.builder().build());
+
+		bucketUnderTest.write("test-element", 0L);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasActiveInProgressFile());
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+
+		final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path);
+		restoreBucket(newRecoverableWriter, 0, 1, state, PartFileConfig.builder().build());
+
+		assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1
+	}
+
+	@Test
 	public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
 		final Path path = new Path(outDir.toURI());
 
 		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
 		final Bucket<String, String> bucketUnderTest =
-				createBucket(recoverableWriter, path, 0, 0, new PartFileConfig());
+				createBucket(recoverableWriter, path, 0, 0, PartFileConfig.builder().build());
 
 		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
 		assertThat(state, hasNoActiveInProgressFile());
@@ -403,7 +425,7 @@ public class BucketTest {
 	private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws IOException {
 		final BucketState<String> stateWithOnlyInProgressFile =
 				new BucketState<>("test", new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
-		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, new PartFileConfig());
+		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, PartFileConfig.builder().build());
 	}
 
 	private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int numberOfPendingParts) throws IOException {
@@ -412,7 +434,7 @@ public class BucketTest {
 
 		final BucketState<String> initStateWithOnlyInProgressFile =
 				new BucketState<>("test", new Path(), 12345L, null, completePartsPerCheckpoint);
-		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, new PartFileConfig());
+		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, PartFileConfig.builder().build());
 	}
 
 	private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index ac7071d..edbed9b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -318,7 +318,7 @@ public class BucketsTest {
 				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
 				DefaultRollingPolicy.builder().build(),
 				2,
-				new PartFileConfig()
+				PartFileConfig.builder().build()
 		);
 
 		buckets.onElement(
@@ -377,7 +377,7 @@ public class BucketsTest {
 				basePath,
 				rollingPolicy,
 				subtaskIdx,
-				new PartFileConfig());
+				PartFileConfig.builder().build());
 	}
 
 	private static Buckets<String, String> createBuckets(
@@ -408,7 +408,7 @@ public class BucketsTest {
 				subtaskIdx,
 				bucketState,
 				partCounterState,
-				new PartFileConfig());
+				PartFileConfig.builder().build());
 	}
 
 	private static Buckets<String, String> restoreBuckets(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
index bab18aa..f416d8a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -101,8 +101,7 @@ public class BulkWriterTest extends TestLogger {
 							new TestUtils.TupleToStringBucketer(),
 							new TestBulkWriterFactory(),
 							new DefaultBucketFactoryImpl<>(),
-							"prefix",
-							".ext")
+							PartFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".ext").build())
 		) {
 			testPartFilesWithStringBucketer(testHarness, outDir, ".prefix-0-0.ext.inprogress", ".prefix-0-1.ext.inprogress");
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index be9db3e..2fe1943 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -204,7 +204,7 @@ public class RollingPolicyTest {
 				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
 				rollingPolicyToTest,
 				0,
-				new PartFileConfig()
+				PartFileConfig.builder().build()
 		);
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index c17a2d6..8b3ab28 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -140,8 +140,7 @@ public class TestUtils {
 				bucketer,
 				writer,
 				bucketFactory,
-				PartFileConfig.DEFAULT_PART_PREFIX,
-				PartFileConfig.DEFAULT_PART_SUFFIX);
+				PartFileConfig.builder().build());
 	}
 
 	static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createTestSinkWithBulkEncoder(
@@ -152,16 +151,14 @@ public class TestUtils {
 			final BucketAssigner<Tuple2<String, Integer>, String> bucketer,
 			final BulkWriter.Factory<Tuple2<String, Integer>> writer,
 			final BucketFactory<Tuple2<String, Integer>, String> bucketFactory,
-			final String partFilePrefix,
-			final String partFileSuffix) throws Exception {
+			final PartFileConfig partFileConfig) throws Exception {
 
 		StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink
 			.forBulkFormat(new Path(outDir.toURI()), writer)
 			.withBucketAssigner(bucketer)
 			.withBucketCheckInterval(bucketCheckInterval)
 			.withBucketFactory(bucketFactory)
-			.withPartFilePrefix(partFilePrefix)
-			.withPartFileSuffix(partFileSuffix)
+			.withPartFileConfig(partFileConfig)
 			.build();
 
 		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), MAX_PARALLELISM, totalParallelism, taskIdx);