You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/20 17:05:18 UTC

[2/5] flink git commit: [FLINK-9903] [DataStream API] Refactor StreamingFileSink / add bulk encoders

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 0406afc..c208079 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -30,29 +30,23 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.io.Serializable;
 
 /**
  * Sink that emits its input elements to {@link FileSystem} files within buckets. This is
@@ -69,7 +63,9 @@ import java.util.Map;
  * be written to inside the base directory. The {@code Bucketer} can, for example, use time or
  * a property of the element to determine the bucket directory. The default {@code Bucketer} is a
  * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
- * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ * a custom {@code Bucketer} using the {@code setBucketer(Bucketer)} method, after calling
+ * {@link StreamingFileSink#forRowFormat(Path, Encoder)} or
+ * {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}.
  *
  *
  * <p>The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink
@@ -94,19 +90,6 @@ import java.util.Map;
  * state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
  * they do not contain data that arrived after the checkpoint from which we restore.
  *
- * <p><b>NOTE:</b>
- * <ol>
- *     <li>
- *         If checkpointing is not enabled the pending files will never be moved to the finished state.
- *     </li>
- *     <li>
- *         The part files are written using an instance of {@link Encoder}. By default, a
- *         {@link SimpleStringEncoder} is used, which writes the result of {@code toString()} for
- *         every element, separated by newlines. You can configure the writer using the
- *         {@link #setEncoder(Encoder)}.
- *     </li>
- * </ol>
- *
  * @param <IN> Type of the elements emitted by this sink
  */
 @PublicEvolving
@@ -116,8 +99,6 @@ public class StreamingFileSink<IN>
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(StreamingFileSink.class);
-
 	// -------------------------- state descriptors ---------------------------
 
 	private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
@@ -128,298 +109,264 @@ public class StreamingFileSink<IN>
 
 	// ------------------------ configuration fields --------------------------
 
-	private final Path basePath;
-
-	private final BucketFactory<IN> bucketFactory;
-
-	private long bucketCheckInterval = 60L * 1000L;
-
-	private Bucketer<IN> bucketer;
+	private final long bucketCheckInterval;
 
-	private Encoder<IN> encoder;
-
-	private RollingPolicy rollingPolicy;
+	private final StreamingFileSink.BucketsBuilder<IN, ?> bucketsBuilder;
 
 	// --------------------------- runtime fields -----------------------------
 
-	private transient BucketerContext bucketerContext;
-
-	private transient RecoverableWriter fileSystemWriter;
+	private transient Buckets<IN, ?> buckets;
 
 	private transient ProcessingTimeService processingTimeService;
 
-	private transient Map<String, Bucket<IN>> activeBuckets;
+	// --------------------------- State Related Fields -----------------------------
 
-	//////////////////			State Related Fields			/////////////////////
+	private transient ListState<byte[]> bucketStates;
 
-	private transient BucketStateSerializer bucketStateSerializer;
+	private transient ListState<Long> maxPartCountersState;
 
-	private transient ListState<byte[]> restoredBucketStates;
+	/**
+	 * Creates a new {@code StreamingFileSink} that writes files to the given base directory.
+	 */
+	private StreamingFileSink(
+			final StreamingFileSink.BucketsBuilder<IN, ?> bucketsBuilder,
+			final long bucketCheckInterval) {
+
+		Preconditions.checkArgument(bucketCheckInterval > 0L);
 
-	private transient ListState<Long> restoredMaxCounters;
+		this.bucketsBuilder = Preconditions.checkNotNull(bucketsBuilder);
+		this.bucketCheckInterval = bucketCheckInterval;
+	}
 
-	private transient long initMaxPartCounter;
+	// ------------------------------------------------------------------------
 
-	private transient long maxPartCounterUsed;
+	// --------------------------- Sink Builders  -----------------------------
 
 	/**
-	 * Creates a new {@code StreamingFileSink} that writes files to the given base directory.
-	 *
-	 * <p>This uses a {@link DateTimeBucketer} as {@link Bucketer} and a {@link SimpleStringEncoder} as a writer.
-	 *
-	 * @param basePath The directory to which to write the bucket files.
+	 * Creates the builder for a {@code StreamingFileSink} with row-encoding format.
+	 * @param basePath the base path where all the buckets are going to be created as sub-directories.
+	 * @param encoder the {@link Encoder} to be used when writing elements in the buckets.
+	 * @param <IN> the type of incoming elements
+	 * @return The builder where the remaining of the configuration parameters for the sink can be configured.
+	 * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
 	 */
-	public StreamingFileSink(Path basePath) {
-		this(basePath, new DefaultBucketFactory<>());
+	public static <IN> StreamingFileSink.RowFormatBuilder<IN, String> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new StreamingFileSink.RowFormatBuilder<>(basePath, encoder, new DateTimeBucketer<>());
 	}
 
-	@VisibleForTesting
-	StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) {
-		this.basePath = Preconditions.checkNotNull(basePath);
-		this.bucketer = new DateTimeBucketer<>();
-		this.encoder = new SimpleStringEncoder<>();
-		this.rollingPolicy = DefaultRollingPolicy.create().build();
-		this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+	/**
+	 * Creates the builder for a {@link StreamingFileSink} with row-encoding format.
+	 * @param basePath the base path where all the buckets are going to be created as sub-directories.
+	 * @param writerFactory the {@link BulkWriter.Factory} to be used when writing elements in the buckets.
+	 * @param <IN> the type of incoming elements
+	 * @return The builder where the remaining of the configuration parameters for the sink can be configured.
+	 * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
+	 */
+	public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> forBulkFormat(
+			final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
+		return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketer<>());
 	}
 
-	public StreamingFileSink<IN> setEncoder(Encoder<IN> encoder) {
-		this.encoder = Preconditions.checkNotNull(encoder);
-		return this;
-	}
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	private abstract static class BucketsBuilder<IN, BucketID> implements Serializable {
 
-	public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) {
-		this.bucketer = Preconditions.checkNotNull(bucketer);
-		return this;
-	}
+		private static final long serialVersionUID = 1L;
 
-	public StreamingFileSink<IN> setBucketCheckInterval(long interval) {
-		this.bucketCheckInterval = interval;
-		return this;
+		abstract Buckets<IN, BucketID> createBuckets(final int subtaskIndex) throws IOException;
 	}
 
-	public StreamingFileSink<IN> setRollingPolicy(RollingPolicy policy) {
-		this.rollingPolicy = Preconditions.checkNotNull(policy);
-		return this;
-	}
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	@PublicEvolving
+	public static class RowFormatBuilder<IN, BucketID> extends StreamingFileSink.BucketsBuilder<IN, BucketID> {
 
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		final Iterator<Map.Entry<String, Bucket<IN>>> activeBucketIt =
-				activeBuckets.entrySet().iterator();
-
-		while (activeBucketIt.hasNext()) {
-			Bucket<IN> bucket = activeBucketIt.next().getValue();
-			bucket.commitUpToCheckpoint(checkpointId);
-
-			if (!bucket.isActive()) {
-				// We've dealt with all the pending files and the writer for this bucket is not currently open.
-				// Therefore this bucket is currently inactive and we can remove it from our state.
-				activeBucketIt.remove();
-			}
-		}
-	}
+		private static final long serialVersionUID = 1L;
 
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
-		Preconditions.checkState(
-				restoredBucketStates != null && fileSystemWriter != null && bucketStateSerializer != null,
-				"sink has not been initialized");
+		private long bucketCheckInterval = 60L * 1000L;
+
+		private final Path basePath;
 
-		restoredBucketStates.clear();
-		for (Bucket<IN> bucket : activeBuckets.values()) {
+		private final Encoder<IN> encoder;
 
-			final PartFileInfo info = bucket.getInProgressPartInfo();
-			final long checkpointTimestamp = context.getCheckpointTimestamp();
+		private Bucketer<IN, BucketID> bucketer;
 
-			if (info != null && rollingPolicy.shouldRoll(info, checkpointTimestamp)) {
-				// we also check here so that we do not have to always
-				// wait for the "next" element to arrive.
-				bucket.closePartFile();
-			}
+		private RollingPolicy<BucketID> rollingPolicy;
 
-			final BucketState bucketState = bucket.snapshot(context.getCheckpointId());
-			restoredBucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
+		private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
+
+		RowFormatBuilder(Path basePath, Encoder<IN> encoder, Bucketer<IN, BucketID> bucketer) {
+			this.basePath = Preconditions.checkNotNull(basePath);
+			this.encoder = Preconditions.checkNotNull(encoder);
+			this.bucketer = Preconditions.checkNotNull(bucketer);
+			this.rollingPolicy = DefaultRollingPolicy.create().build();
 		}
 
-		restoredMaxCounters.clear();
-		restoredMaxCounters.add(maxPartCounterUsed);
-	}
+		public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketCheckInterval(final long interval) {
+			this.bucketCheckInterval = interval;
+			return this;
+		}
 
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-		initFileSystemWriter();
+		public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketer(final Bucketer<IN, BucketID> bucketer) {
+			this.bucketer = Preconditions.checkNotNull(bucketer);
+			return this;
+		}
 
-		this.activeBuckets = new HashMap<>();
+		public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<BucketID> policy) {
+			this.rollingPolicy = Preconditions.checkNotNull(policy);
+			return this;
+		}
 
-		// When resuming after a failure:
-		// 1) we get the max part counter used before in order to make sure that we do not overwrite valid data
-		// 2) we commit any pending files for previous checkpoints (previous to the last successful one)
-		// 3) we resume writing to the previous in-progress file of each bucket, and
-		// 4) if we receive multiple states for the same bucket, we merge them.
+		public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<ID> policy) {
+			@SuppressWarnings("unchecked")
+			StreamingFileSink.RowFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this;
+			reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
+			reInterpreted.rollingPolicy = Preconditions.checkNotNull(policy);
+			return reInterpreted;
+		}
 
-		final OperatorStateStore stateStore = context.getOperatorStateStore();
+		@VisibleForTesting
+		StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return this;
+		}
 
-		restoredBucketStates = stateStore.getListState(BUCKET_STATE_DESC);
-		restoredMaxCounters = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+		/** Creates the actual sink. */
+		public StreamingFileSink<IN> build() {
+			return new StreamingFileSink<>(this, bucketCheckInterval);
+		}
 
-		if (context.isRestored()) {
-			final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-
-			LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
-
-			long maxCounter = 0L;
-			for (long partCounter: restoredMaxCounters.get()) {
-				maxCounter = Math.max(partCounter, maxCounter);
-			}
-			initMaxPartCounter = maxCounter;
-
-			for (byte[] recoveredState : restoredBucketStates.get()) {
-				final BucketState bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize(
-						bucketStateSerializer, recoveredState);
-
-				final String bucketId = bucketState.getBucketId();
-
-				LOG.info("Recovered bucket for {}", bucketId);
-
-				final Bucket<IN> restoredBucket = bucketFactory.restoreBucket(
-						fileSystemWriter,
-						subtaskIndex,
-						initMaxPartCounter,
-						encoder,
-						bucketState
-				);
-
-				final Bucket<IN> existingBucket = activeBuckets.get(bucketId);
-				if (existingBucket == null) {
-					activeBuckets.put(bucketId, restoredBucket);
-				} else {
-					existingBucket.merge(restoredBucket);
-				}
-
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(),
-							subtaskIndex, assembleBucketPath(bucketId));
-				}
-			}
+		@Override
+		Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
+			return new Buckets<>(
+					basePath,
+					bucketer,
+					bucketFactory,
+					new RowWisePartWriter.Factory<>(encoder),
+					rollingPolicy,
+					subtaskIndex);
 		}
 	}
 
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
+	/**
+	 * A builder for configuring the sink for bulk-encoding formats, e.g. Parquet/ORC.
+	 */
+	@PublicEvolving
+	public static class BulkFormatBuilder<IN, BucketID> extends StreamingFileSink.BucketsBuilder<IN, BucketID> {
 
-		processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
-		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
-		processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
-		this.bucketerContext = new BucketerContext();
-	}
+		private static final long serialVersionUID = 1L;
 
-	@Override
-	public void onProcessingTime(long timestamp) throws Exception {
-		final long currentTime = processingTimeService.getCurrentProcessingTime();
-		for (Bucket<IN> bucket : activeBuckets.values()) {
-			final PartFileInfo info = bucket.getInProgressPartInfo();
-			if (info != null && rollingPolicy.shouldRoll(info, currentTime)) {
-				bucket.closePartFile();
-			}
-		}
-		processingTimeService.registerTimer(timestamp + bucketCheckInterval, this);
-	}
+		private long bucketCheckInterval = 60L * 1000L;
 
-	@Override
-	public void invoke(IN value, Context context) throws Exception {
-		final long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
-		final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+		private final Path basePath;
+
+		private final BulkWriter.Factory<IN> writerFactory;
+
+		private Bucketer<IN, BucketID> bucketer;
 
-		// setting the values in the bucketer context
-		bucketerContext.update(context.timestamp(), context.currentWatermark(), currentProcessingTime);
-
-		final String bucketId = bucketer.getBucketId(value, bucketerContext);
-
-		Bucket<IN> bucket = activeBuckets.get(bucketId);
-		if (bucket == null) {
-			final Path bucketPath = assembleBucketPath(bucketId);
-			bucket = bucketFactory.getNewBucket(
-					fileSystemWriter,
-					subtaskIndex,
-					bucketId,
-					bucketPath,
-					initMaxPartCounter,
-					encoder);
-			activeBuckets.put(bucketId, bucket);
+		private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
+
+		BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, Bucketer<IN, BucketID> bucketer) {
+			this.basePath = Preconditions.checkNotNull(basePath);
+			this.writerFactory = Preconditions.checkNotNull(writerFactory);
+			this.bucketer = Preconditions.checkNotNull(bucketer);
 		}
 
-		final PartFileInfo info = bucket.getInProgressPartInfo();
-		if (info == null || rollingPolicy.shouldRoll(info, currentProcessingTime)) {
-			bucket.rollPartFile(currentProcessingTime);
+		public StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketCheckInterval(long interval) {
+			this.bucketCheckInterval = interval;
+			return this;
 		}
-		bucket.write(value, currentProcessingTime);
 
-		// we update the counter here because as buckets become inactive and
-		// get removed in the initializeState(), at the time we snapshot they
-		// may not be there to take them into account during checkpointing.
-		updateMaxPartCounter(bucket.getPartCounter());
-	}
+		public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> withBucketer(Bucketer<IN, ID> bucketer) {
+			@SuppressWarnings("unchecked")
+			StreamingFileSink.BulkFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.BulkFormatBuilder<IN, ID>) this;
+			reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
+			return reInterpreted;
+		}
 
-	@Override
-	public void close() throws Exception {
-		if (activeBuckets != null) {
-			activeBuckets.values().forEach(Bucket::dispose);
+		@VisibleForTesting
+		StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return this;
 		}
-	}
 
-	private void initFileSystemWriter() throws IOException {
-		if (fileSystemWriter == null) {
-			fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
-			bucketStateSerializer = new BucketStateSerializer(
-					fileSystemWriter.getResumeRecoverableSerializer(),
-					fileSystemWriter.getCommitRecoverableSerializer()
-			);
+		/** Creates the actual sink. */
+		public StreamingFileSink<IN> build() {
+			return new StreamingFileSink<>(this, bucketCheckInterval);
 		}
-	}
 
-	private void updateMaxPartCounter(long candidate) {
-		maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
+		@Override
+		Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
+			return new Buckets<>(
+					basePath,
+					bucketer,
+					bucketFactory,
+					new BulkPartWriter.Factory<>(writerFactory),
+					new OnCheckpointRollingPolicy<>(),
+					subtaskIndex);
+		}
 	}
 
-	private Path assembleBucketPath(String bucketId) {
-		return new Path(basePath, bucketId);
+	// --------------------------- Sink Methods -----------------------------
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+		this.buckets = bucketsBuilder.createBuckets(subtaskIndex);
+
+		final OperatorStateStore stateStore = context.getOperatorStateStore();
+		bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
+		maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+
+		if (context.isRestored()) {
+			buckets.initializeState(bucketStates, maxPartCountersState);
+		}
 	}
 
-	/**
-	 * The {@link Bucketer.Context} exposed to the
-	 * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
-	 * whenever a new incoming element arrives.
-	 */
-	private static class BucketerContext implements Bucketer.Context {
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		buckets.publishUpToCheckpoint(checkpointId);
+	}
 
-		@Nullable
-		private Long elementTimestamp;
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		Preconditions.checkState(bucketStates != null && maxPartCountersState != null, "sink has not been initialized");
 
-		private long currentWatermark;
+		bucketStates.clear();
+		maxPartCountersState.clear();
 
-		private long currentProcessingTime;
+		buckets.snapshotState(
+				context.getCheckpointId(),
+				context.getCheckpointTimestamp(),
+				bucketStates,
+				maxPartCountersState);
+	}
 
-		void update(@Nullable Long elementTimestamp, long currentWatermark, long currentProcessingTime) {
-			this.elementTimestamp = elementTimestamp;
-			this.currentWatermark = currentWatermark;
-			this.currentProcessingTime = currentProcessingTime;
-		}
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
+		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
+		processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
+	}
 
-		@Override
-		public long currentProcessingTime() {
-			return currentProcessingTime;
-		}
+	@Override
+	public void onProcessingTime(long timestamp) throws Exception {
+		final long currentTime = processingTimeService.getCurrentProcessingTime();
+		buckets.onProcessingTime(currentTime);
+		processingTimeService.registerTimer(currentTime + bucketCheckInterval, this);
+	}
 
-		@Override
-		public long currentWatermark() {
-			return currentWatermark;
-		}
+	@Override
+	public void invoke(IN value, SinkFunction.Context context) throws Exception {
+		buckets.onElement(value, context);
+	}
 
-		@Override
-		@Nullable
-		public Long timestamp() {
-			return elementTimestamp;
-		}
+	@Override
+	public void close() throws Exception {
+		buckets.close();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
index 5ffe152..d7b2013 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
@@ -19,22 +19,29 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 /**
  * A {@link Bucketer} that does not perform any
  * bucketing of files. All files are written to the base path.
  */
 @PublicEvolving
-public class BasePathBucketer<T> implements Bucketer<T> {
+public class BasePathBucketer<T> implements Bucketer<T, String> {
 
 	private static final long serialVersionUID = -6033643155550226022L;
 
 	@Override
-	public String getBucketId(T element, Context context) {
+	public String getBucketId(T element, Bucketer.Context context) {
 		return "";
 	}
 
 	@Override
+	public SimpleVersionedSerializer<String> getSerializer() {
+		// in the future this could be optimized as it is the empty string.
+		return SimpleVersionedStringSerializer.INSTANCE;
+	}
+
+	@Override
 	public String toString() {
 		return "BasePathBucketer";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
index 5c30927..503e361 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 
 import javax.annotation.Nullable;
@@ -33,9 +35,16 @@ import java.io.Serializable;
  * <p>The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing
  * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket the
  * element should fall in. The {@code Bucketer} can, for example, determine buckets based on system time.
+ *
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of the object returned by the {@link #getBucketId(Object, Bucketer.Context)}. This has to have
+ *                  a correct {@link #hashCode()} and {@link #equals(Object)} method. In addition, the {@link Path}
+ *                  to the created bucket will be the result of the {@link #toString()} of this method, appended to
+ *                  the {@code basePath} specified in the
+ *                  {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
  */
 @PublicEvolving
-public interface Bucketer<T> extends Serializable {
+public interface Bucketer<IN, BucketID> extends Serializable {
 
 	/**
 	 * Returns the identifier of the bucket the provided element should be put into.
@@ -48,13 +57,20 @@ public interface Bucketer<T> extends Serializable {
 	 * and the {@code base path} provided during the initialization of the
 	 * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
 	 */
-	String getBucketId(T element, Context context);
+	BucketID getBucketId(IN element, Bucketer.Context context);
+
+	/**
+	 * @return A {@link SimpleVersionedSerializer} capable of serializing/deserializing the elements
+	 * of type {@code BucketID}. That is the type of the objects returned by the
+	 * {@link #getBucketId(Object, Bucketer.Context)}.
+	 */
+	SimpleVersionedSerializer<BucketID> getSerializer();
 
 	/**
 	 * Context that the {@link Bucketer} can use for getting additional data about
 	 * an input record.
 	 *
-	 * <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Context)} call.
+	 * <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Bucketer.Context)} call.
 	 * Do not store the context and use afterwards!
 	 */
 	@PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
index 515468c..eed0b79 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -50,9 +51,9 @@ import java.util.Date;
  *
  */
 @PublicEvolving
-public class DateTimeBucketer<T> implements Bucketer<T> {
+public class DateTimeBucketer<IN> implements Bucketer<IN, String> {
 
-	private static final long serialVersionUID = 3284420879277893804L;
+	private static final long serialVersionUID = 1L;
 
 	private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
 
@@ -78,7 +79,7 @@ public class DateTimeBucketer<T> implements Bucketer<T> {
 	}
 
 	@Override
-	public String getBucketId(T element, Context context) {
+	public String getBucketId(IN element, Bucketer.Context context) {
 		if (dateFormatter == null) {
 			dateFormatter = new SimpleDateFormat(formatString);
 		}
@@ -86,6 +87,11 @@ public class DateTimeBucketer<T> implements Bucketer<T> {
 	}
 
 	@Override
+	public SimpleVersionedSerializer<String> getSerializer() {
+		return SimpleVersionedStringSerializer.INSTANCE;
+	}
+
+	@Override
 	public String toString() {
 		return "DateTimeBucketer{" +
 				"formatString='" + formatString + '\'' +

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
new file mode 100644
index 0000000..d025af9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A {@link SimpleVersionedSerializer} implementation for Strings.
+ */
+public final class SimpleVersionedStringSerializer implements SimpleVersionedSerializer<String> {
+
+	private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+	public static final SimpleVersionedStringSerializer INSTANCE = new SimpleVersionedStringSerializer();
+
+	@Override
+	public int getVersion() {
+		return 1;
+	}
+
+	@Override
+	public byte[] serialize(String value) {
+		final byte[] serialized = value.getBytes(StandardCharsets.UTF_8);
+		final byte[] targetBytes = new byte[Integer.BYTES + serialized.length];
+
+		final ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+		bb.putInt(serialized.length);
+		bb.put(serialized);
+		return targetBytes;
+	}
+
+	@Override
+	public String deserialize(int version, byte[] serialized) throws IOException {
+		switch (version) {
+			case 1:
+				return deserializeV1(serialized);
+			default:
+				throw new IOException("Unrecognized version or corrupt state: " + version);
+		}
+	}
+
+	private static String deserializeV1(byte[] serialized) {
+		final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+		final byte[] targetStringBytes = new byte[bb.getInt()];
+		bb.get(targetStringBytes);
+		return new String(targetStringBytes, CHARSET);
+	}
+
+	/**
+	 * Private constructor to prevent instantiation.
+	 * Access the serializer through the {@link #INSTANCE}.
+	 */
+	private SimpleVersionedStringSerializer() {}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
new file mode 100644
index 0000000..a9ff617
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The default implementation of the {@link RollingPolicy}.
+ *
+ * <p>This policy rolls a part file if:
+ * <ol>
+ *     <li>there is no open part file,</li>
+ * 	   <li>the current file has reached the maximum bucket size (by default 128MB),</li>
+ * 	   <li>the current file is older than the roll over interval (by default 60 sec), or</li>
+ * 	   <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li>
+ * </ol>
+ */
+@PublicEvolving
+public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<BucketID> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
+
+	private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
+
+	private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
+
+	private final long partSize;
+
+	private final long rolloverInterval;
+
+	private final long inactivityInterval;
+
+	/**
+	 * Private constructor to avoid direct instantiation.
+	 */
+	private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) {
+		Preconditions.checkArgument(partSize > 0L);
+		Preconditions.checkArgument(rolloverInterval > 0L);
+		Preconditions.checkArgument(inactivityInterval > 0L);
+
+		this.partSize = partSize;
+		this.rolloverInterval = rolloverInterval;
+		this.inactivityInterval = inactivityInterval;
+	}
+
+	@Override
+	public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
+		return false;
+	}
+
+	@Override
+	public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) throws IOException {
+		if (partFileState == null) {
+			// this means that there is no currently open part file.
+			return true;
+		}
+
+		return partFileState.getSize() > partSize;
+	}
+
+	@Override
+	public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) {
+		if (partFileState == null) {
+			// this means that there is no currently open part file.
+			return true;
+		}
+
+		if (currentTime - partFileState.getCreationTime() > rolloverInterval) {
+			return true;
+		}
+
+		return currentTime - partFileState.getLastUpdateTime() > inactivityInterval;
+	}
+
+	/**
+	 * Initiates the instantiation of a {@code DefaultRollingPolicy}.
+	 * To finalize it and have the actual policy, call {@code .create()}.
+	 */
+	public static DefaultRollingPolicy.PolicyBuilder create() {
+		return new DefaultRollingPolicy.PolicyBuilder();
+	}
+
+	/**
+	 * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
+	 */
+	@PublicEvolving
+	public static final class PolicyBuilder {
+
+		private long partSize = DEFAULT_MAX_PART_SIZE;
+
+		private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+
+		private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+
+		private PolicyBuilder() {}
+
+		/**
+		 * Sets the part size above which a part file will have to roll.
+		 * @param size the allowed part size.
+		 */
+		public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(long size) {
+			Preconditions.checkState(size > 0L);
+			this.partSize = size;
+			return this;
+		}
+
+		/**
+		 * Sets the interval of allowed inactivity after which a part file will have to roll.
+		 * @param interval the allowed inactivity interval.
+		 */
+		public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(long interval) {
+			Preconditions.checkState(interval > 0L);
+			this.inactivityInterval = interval;
+			return this;
+		}
+
+		/**
+		 * Sets the max time a part file can stay open before having to roll.
+		 * @param interval the desired rollover interval.
+		 */
+		public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(long interval) {
+			Preconditions.checkState(interval > 0L);
+			this.rolloverInterval = interval;
+			return this;
+		}
+
+		/**
+		 * Creates the actual policy.
+		 */
+		public <BucketID> DefaultRollingPolicy<BucketID> build() {
+			return new DefaultRollingPolicy<>(partSize, rolloverInterval, inactivityInterval);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
new file mode 100644
index 0000000..4361941
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies;
+
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+/**
+ * A {@link RollingPolicy} which rolls on every checkpoint.
+ */
+public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<BucketID> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
+		return true;
+	}
+
+	@Override
+	public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) {
+		return false;
+	}
+
+	@Override
+	public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) {
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 353ac00..3d5be63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -60,20 +61,21 @@ public class BucketStateSerializerTest {
 
 		final Path testBucket = new Path(testFolder.getPath(), "test");
 
-		final BucketState bucketState = new BucketState(
+		final BucketState<String> bucketState = new BucketState<>(
 				"test", testBucket, Long.MAX_VALUE, null, new HashMap<>());
 
-		final SimpleVersionedSerializer<BucketState> serializer =
-				new BucketStateSerializer(
+		final SimpleVersionedSerializer<BucketState<String>> serializer =
+				new BucketStateSerializer<>(
 						writer.getResumeRecoverableSerializer(),
-						writer.getCommitRecoverableSerializer()
+						writer.getCommitRecoverableSerializer(),
+						SimpleVersionedStringSerializer.INSTANCE
 				);
 
 		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
-		final BucketState recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+		final BucketState<String> recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
 		Assert.assertEquals(testBucket, recoveredState.getBucketPath());
-		Assert.assertNull(recoveredState.getCurrentInProgress());
+		Assert.assertNull(recoveredState.getInProgress());
 		Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty());
 	}
 
@@ -90,13 +92,14 @@ public class BucketStateSerializerTest {
 
 		final RecoverableWriter.ResumeRecoverable current = stream.persist();
 
-		final BucketState bucketState = new BucketState(
+		final BucketState<String> bucketState = new BucketState<>(
 				"test", testBucket, Long.MAX_VALUE, current, new HashMap<>());
 
-		final SimpleVersionedSerializer<BucketState> serializer =
-				new BucketStateSerializer(
+		final SimpleVersionedSerializer<BucketState<String>> serializer =
+				new BucketStateSerializer<>(
 						writer.getResumeRecoverableSerializer(),
-						writer.getCommitRecoverableSerializer()
+						writer.getCommitRecoverableSerializer(),
+						SimpleVersionedStringSerializer.INSTANCE
 				);
 
 		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
@@ -104,7 +107,7 @@ public class BucketStateSerializerTest {
 		// to simulate that everything is over for file.
 		stream.close();
 
-		final BucketState recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+		final BucketState<String> recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
 		Assert.assertEquals(testBucket, recoveredState.getBucketPath());
 
@@ -147,18 +150,19 @@ public class BucketStateSerializerTest {
 
 		final RecoverableWriter.ResumeRecoverable current = stream.persist();
 
-		final BucketState bucketState = new BucketState(
+		final BucketState<String> bucketState = new BucketState<>(
 				"test-2", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
-		final SimpleVersionedSerializer<BucketState> serializer =
-				new BucketStateSerializer(
+		final SimpleVersionedSerializer<BucketState<String>> serializer =
+				new BucketStateSerializer<>(
 						writer.getResumeRecoverableSerializer(),
-						writer.getCommitRecoverableSerializer()
+						writer.getCommitRecoverableSerializer(),
+						SimpleVersionedStringSerializer.INSTANCE
 				);
 		stream.close();
 
 		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
 
-		final BucketState recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+		final BucketState<String> recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
 		Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
 
@@ -220,20 +224,21 @@ public class BucketStateSerializerTest {
 
 		final RecoverableWriter.ResumeRecoverable current = null;
 
-		final BucketState bucketState = new BucketState(
+		final BucketState<String> bucketState = new BucketState<>(
 				"", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
 
-		final SimpleVersionedSerializer<BucketState> serializer = new BucketStateSerializer(
+		final SimpleVersionedSerializer<BucketState<String>> serializer = new BucketStateSerializer<>(
 				writer.getResumeRecoverableSerializer(),
-				writer.getCommitRecoverableSerializer()
+				writer.getCommitRecoverableSerializer(),
+				SimpleVersionedStringSerializer.INSTANCE
 		);
 
 		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
 
-		final BucketState recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+		final BucketState<String> recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
 		Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
-		Assert.assertNull(recoveredState.getCurrentInProgress());
+		Assert.assertNull(recoveredState.getInProgress());
 
 		final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
 		Assert.assertEquals(5L, recoveredRecoverables.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
new file mode 100644
index 0000000..042ba4e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * Tests for {@link Buckets}.
+ */
+public class BucketsTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testContextPassingNormalExecution() throws Exception {
+		testCorrectPassingOfContext(1L, 2L, 3L);
+	}
+
+	@Test
+	public void testContextPassingNullTimestamp() throws Exception {
+		testCorrectPassingOfContext(null, 2L, 3L);
+	}
+
+	private void testCorrectPassingOfContext(Long timestamp, long watermark, long processingTime) throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+
+		final Long expectedTimestamp = timestamp;
+		final long expectedWatermark = watermark;
+		final long expectedProcessingTime = processingTime;
+
+		final Buckets<String, String> buckets = StreamingFileSink
+				.<String>forRowFormat(new Path(outDir.toURI()), new SimpleStringEncoder<>())
+				.withBucketer(new VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
+				.createBuckets(2);
+
+		buckets.onElement("TEST", new SinkFunction.Context() {
+			@Override
+			public long currentProcessingTime() {
+				return expectedProcessingTime;
+			}
+
+			@Override
+			public long currentWatermark() {
+				return expectedWatermark;
+			}
+
+			@Override
+			public Long timestamp() {
+				return expectedTimestamp;
+			}
+		});
+	}
+
+	private static class VarifyingBucketer implements Bucketer<String, String> {
+
+		private static final long serialVersionUID = 7729086510972377578L;
+
+		private final Long expectedTimestamp;
+		private final long expectedWatermark;
+		private final long expectedProcessingTime;
+
+		VarifyingBucketer(
+				final Long expectedTimestamp,
+				final long expectedWatermark,
+				final long expectedProcessingTime
+		) {
+			this.expectedTimestamp = expectedTimestamp;
+			this.expectedWatermark = expectedWatermark;
+			this.expectedProcessingTime = expectedProcessingTime;
+		}
+
+		@Override
+		public String getBucketId(String element, Context context) {
+			final Long elementTimestamp = context.timestamp();
+			final long watermark = context.currentWatermark();
+			final long processingTime = context.currentProcessingTime();
+
+			Assert.assertEquals(expectedTimestamp, elementTimestamp);
+			Assert.assertEquals(expectedProcessingTime, processingTime);
+			Assert.assertEquals(expectedWatermark, watermark);
+
+			return element;
+		}
+
+		@Override
+		public SimpleVersionedSerializer<String> getSerializer() {
+			return SimpleVersionedStringSerializer.INSTANCE;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
new file mode 100644
index 0000000..7b6b82c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Tests for the {@link StreamingFileSink} with {@link BulkWriter}.
+ */
+public class BulkWriterTest extends TestLogger {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testCustomBulkWriter() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+
+		// we set the max bucket size to small so that we can know when it rolls
+		try (
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+						TestUtils.createTestSinkWithBulkEncoder(
+								outDir,
+								1,
+								0,
+								10L,
+								new TestUtils.TupleToStringBucketer(),
+								new TestBulkWriterFactory(),
+								new DefaultBucketFactory<>())
+		) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			// this creates a new bucket "test1" and part-0-0
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
+			TestUtils.checkLocalFs(outDir, 1, 0);
+
+			// we take a checkpoint so we roll.
+			testHarness.snapshot(1L, 1L);
+
+			// these will close part-0-0 and open part-0-1
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
+
+			// we take a checkpoint so we roll again.
+			testHarness.snapshot(2L, 2L);
+
+			TestUtils.checkLocalFs(outDir, 2, 0);
+
+			Map<File, String> contents = TestUtils.getFileContentByPath(outDir);
+			int fileCounter = 0;
+			for (Map.Entry<File, String> fileContents : contents.entrySet()) {
+				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@1\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@2\ntest1@3\n", fileContents.getValue());
+				}
+			}
+			Assert.assertEquals(2L, fileCounter);
+
+			// we acknowledge the latest checkpoint, so everything should be published.
+			testHarness.notifyOfCompletedCheckpoint(2L);
+			TestUtils.checkLocalFs(outDir, 0, 2);
+		}
+	}
+
+	/**
+	 * A {@link BulkWriter} used for the tests.
+	 */
+	private static class TestBulkWriter implements BulkWriter<Tuple2<String, Integer>> {
+
+		private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+		private final FSDataOutputStream stream;
+
+		TestBulkWriter(final FSDataOutputStream stream) {
+			this.stream = Preconditions.checkNotNull(stream);
+		}
+
+		@Override
+		public void addElement(Tuple2<String, Integer> element) throws IOException {
+			stream.write((element.f0 + '@' + element.f1 + '\n').getBytes(CHARSET));
+		}
+
+		@Override
+		public void flush() throws IOException {
+			stream.flush();
+		}
+
+		@Override
+		public void finish() throws IOException {
+			flush();
+		}
+	}
+
+	/**
+	 * A {@link BulkWriter.Factory} used for the tests.
+	 */
+	private static class TestBulkWriterFactory implements BulkWriter.Factory<Tuple2<String, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public BulkWriter<Tuple2<String, Integer>> create(FSDataOutputStream out) {
+			return new TestBulkWriter(out);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index b6f73ac..6e942e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -18,20 +18,17 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
-import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -39,9 +36,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -56,12 +50,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 	public void testClosingWithoutInput() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
 
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
-				createRescalingTestSink(outDir, 1, 0, 100L, 124L);
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.close();
+		try (
+			OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+					TestUtils.createRescalingTestSink(outDir, 1, 0, 100L, 124L);
+		) {
+			testHarness.setup();
+			testHarness.open();
+		}
 	}
 
 	@Test
@@ -70,7 +65,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 		OperatorSubtaskState snapshot;
 
 		// we set the max bucket size to small so that we can know when it rolls
-		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
 				outDir, 1, 0, 100L, 10L)) {
 
 			testHarness.setup();
@@ -78,7 +73,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 
 			// this creates a new bucket "test1" and part-0-0
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
-			checkLocalFs(outDir, 1, 0);
+			TestUtils.checkLocalFs(outDir, 1, 0);
 
 			// we take a checkpoint so that we keep the in-progress file offset.
 			snapshot = testHarness.snapshot(1L, 1L);
@@ -87,9 +82,9 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
 
-			checkLocalFs(outDir, 2, 0);
+			TestUtils.checkLocalFs(outDir, 2, 0);
 
-			Map<File, String> contents = getFileContentByPath(outDir);
+			Map<File, String> contents = TestUtils.getFileContentByPath(outDir);
 			int fileCounter = 0;
 			for (Map.Entry<File, String> fileContents : contents.entrySet()) {
 				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
@@ -103,7 +98,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			Assert.assertEquals(2L, fileCounter);
 		}
 
-		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
 				outDir, 1, 0, 100L, 10L)) {
 
 			testHarness.setup();
@@ -111,11 +106,11 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness.open();
 
 			// the in-progress is the not cleaned up one and the pending is truncated and finalized
-			checkLocalFs(outDir, 2, 0);
+			TestUtils.checkLocalFs(outDir, 2, 0);
 
 			// now we go back to the first checkpoint so it should truncate part-0-0 and restart part-0-1
 			int fileCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
 					// truncated
 					fileCounter++;
@@ -132,7 +127,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
 
 			fileCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
 					fileCounter++;
 					Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -145,16 +140,16 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			Assert.assertEquals(2L, fileCounter);
 
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
-			checkLocalFs(outDir, 3, 0); // the previous part-0-1 in progress is simply ignored (random extension)
+			TestUtils.checkLocalFs(outDir, 3, 0); // the previous part-0-1 in progress is simply ignored (random extension)
 
 			testHarness.snapshot(2L, 2L);
 
 			// this will close the new part-0-1
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
-			checkLocalFs(outDir, 3, 0);
+			TestUtils.checkLocalFs(outDir, 3, 0);
 
 			fileCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
 					fileCounter++;
 					Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -169,10 +164,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 
 			// this will publish part-0-0
 			testHarness.notifyOfCompletedCheckpoint(2L);
-			checkLocalFs(outDir, 2, 1);
+			TestUtils.checkLocalFs(outDir, 2, 1);
 
 			fileCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getName().equals("part-0-0")) {
 					fileCounter++;
 					Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -192,7 +187,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 		final File outDir = TEMP_FOLDER.newFolder();
 
 		// we set the max bucket size to small so that we can know when it rolls
-		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
 				outDir, 1, 0, 100L, 10L)) {
 
 			testHarness.setup();
@@ -203,11 +198,11 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			// these 2 create a new bucket "test1", with a .part-0-0.inprogress and also fill it
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
-			checkLocalFs(outDir, 1, 0);
+			TestUtils.checkLocalFs(outDir, 1, 0);
 
 			// this will open .part-0-1.inprogress
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
-			checkLocalFs(outDir, 2, 0);
+			TestUtils.checkLocalFs(outDir, 2, 0);
 
 			// we take a checkpoint so that we keep the in-progress file offset.
 			testHarness.snapshot(1L, 1L);
@@ -218,13 +213,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			// and open and fill .part-0-2.inprogress
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
-			checkLocalFs(outDir, 3, 0);                    // nothing committed yet
+			TestUtils.checkLocalFs(outDir, 3, 0);                    // nothing committed yet
 
 			testHarness.snapshot(2L, 2L);
 
 			// open .part-0-3.inprogress
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 7), 7L));
-			checkLocalFs(outDir, 4, 0);
+			TestUtils.checkLocalFs(outDir, 4, 0);
 
 			// this will close the part file (time)
 			testHarness.setProcessingTime(101L);
@@ -232,10 +227,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness.snapshot(3L, 3L);
 
 			testHarness.notifyOfCompletedCheckpoint(1L);							// the pending for checkpoint 1 are committed
-			checkLocalFs(outDir, 3, 1);
+			TestUtils.checkLocalFs(outDir, 3, 1);
 
 			int fileCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getName().equals("part-0-0")) {
 					fileCounter++;
 					Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
@@ -253,10 +248,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			Assert.assertEquals(4L, fileCounter);
 
 			testHarness.notifyOfCompletedCheckpoint(3L);							// all the pending for checkpoint 2 and 3 are committed
-			checkLocalFs(outDir, 0, 4);
+			TestUtils.checkLocalFs(outDir, 0, 4);
 
 			fileCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getName().equals("part-0-0")) {
 					fileCounter++;
 					Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
@@ -280,7 +275,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 		final File outDir = TEMP_FOLDER.newFolder();
 
 		// we set a big bucket size so that it does not close by size, but by timers.
-		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
 				outDir, 1, 0, 100L, 124L)) {
 
 			testHarness.setup();
@@ -290,10 +285,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
-			checkLocalFs(outDir, 2, 0);
+			TestUtils.checkLocalFs(outDir, 2, 0);
 
 			int bucketCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getParentFile().getName().equals("test1")) {
 					bucketCounter++;
 				} else if (fileContents.getKey().getParentFile().getName().equals("test2")) {
@@ -303,10 +298,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			Assert.assertEquals(2L, bucketCounter);					// verifies that we have 2 buckets, "test1" and "test2"
 
 			testHarness.setProcessingTime(101L);                                // put them in pending
-			checkLocalFs(outDir, 2, 0);
+			TestUtils.checkLocalFs(outDir, 2, 0);
 
 			testHarness.snapshot(0L, 0L);                // put them in pending for 0
-			checkLocalFs(outDir, 2, 0);
+			TestUtils.checkLocalFs(outDir, 2, 0);
 
 			// create another 2 buckets with 1 inprogress file each
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test3", 1), 1L));
@@ -315,13 +310,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness.setProcessingTime(202L);                                // put them in pending
 
 			testHarness.snapshot(1L, 0L);                // put them in pending for 1
-			checkLocalFs(outDir, 4, 0);
+			TestUtils.checkLocalFs(outDir, 4, 0);
 
 			testHarness.notifyOfCompletedCheckpoint(0L);            // put the pending for 0 to the "committed" state
-			checkLocalFs(outDir, 2, 2);
+			TestUtils.checkLocalFs(outDir, 2, 2);
 
 			bucketCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getParentFile().getName().equals("test1")) {
 					bucketCounter++;
 					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
@@ -339,10 +334,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			Assert.assertEquals(4L, bucketCounter);
 
 			testHarness.notifyOfCompletedCheckpoint(1L);            // put the pending for 1 to the "committed" state
-			checkLocalFs(outDir, 0, 4);
+			TestUtils.checkLocalFs(outDir, 0, 4);
 
 			bucketCounter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				if (fileContents.getKey().getParentFile().getName().equals("test1")) {
 					bucketCounter++;
 					Assert.assertEquals("test1@1\n", fileContents.getValue());
@@ -367,9 +362,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 	public void testClosingOnSnapshot() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
 
-		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
-					createRescalingTestSink(outDir, 1, 0, 100L, 2L)) {
-
+		try (
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+						TestUtils.createRescalingTestSink(outDir, 1, 0, 100L, 2L)
+		) {
 			testHarness.setup();
 			testHarness.open();
 
@@ -377,29 +373,29 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
-			checkLocalFs(outDir, 2, 0);
+			TestUtils.checkLocalFs(outDir, 2, 0);
 
 			// this is to check the inactivity threshold
 			testHarness.setProcessingTime(101L);
-			checkLocalFs(outDir, 2, 0);
+			TestUtils.checkLocalFs(outDir, 2, 0);
 
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test3", 1), 1L));
-			checkLocalFs(outDir, 3, 0);
+			TestUtils.checkLocalFs(outDir, 3, 0);
 
 			testHarness.snapshot(0L, 1L);
-			checkLocalFs(outDir, 3, 0);
+			TestUtils.checkLocalFs(outDir, 3, 0);
 
 			testHarness.notifyOfCompletedCheckpoint(0L);
-			checkLocalFs(outDir, 0, 3);
+			TestUtils.checkLocalFs(outDir, 0, 3);
 
 			testHarness.snapshot(1L, 0L);
 
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test4", 10), 10L));
-			checkLocalFs(outDir, 1, 3);
+			TestUtils.checkLocalFs(outDir, 1, 3);
 		}
 
 		// at close it is not moved to final.
-		checkLocalFs(outDir, 1, 3);
+		TestUtils.checkLocalFs(outDir, 1, 3);
 	}
 
 	@Test
@@ -408,11 +404,11 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 
 		OperatorSubtaskState mergedSnapshot;
 
-		// we set small file size so that the part file rolls.
+		// we set small file size so that the part file rolls on every element.
 		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createRescalingTestSink(
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createRescalingTestSink(
 						outDir, 2, 0, 100L, 10L);
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createRescalingTestSink(
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createRescalingTestSink(
 						outDir, 2, 1, 100L, 10L)
 		) {
 			testHarness1.setup();
@@ -422,16 +418,16 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness2.open();
 
 			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
-			checkLocalFs(outDir, 1, 0);
+			TestUtils.checkLocalFs(outDir, 1, 0);
 
 			testHarness2.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
 			testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
 
 			// all the files are in-progress
-			checkLocalFs(outDir, 3, 0);
+			TestUtils.checkLocalFs(outDir, 3, 0);
 
 			int counter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				final String parentFilename = fileContents.getKey().getParentFile().getName();
 				final String inProgressFilename = fileContents.getKey().getName();
 
@@ -456,7 +452,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 		}
 
 		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
 						outDir, 1, 0, 100L, 10L)
 		) {
 			testHarness.setup();
@@ -464,13 +460,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness.open();
 
 			// still everything in-progress but the in-progress for prev task 1 should be put in pending now
-			checkLocalFs(outDir, 3, 0);
+			TestUtils.checkLocalFs(outDir, 3, 0);
 
 			testHarness.snapshot(2L, 2L);
 			testHarness.notifyOfCompletedCheckpoint(2L);
 
 			int counter = 0;
-			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+			for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
 				final String parentFilename = fileContents.getKey().getParentFile().getName();
 				final String filename = fileContents.getKey().getName();
 
@@ -498,11 +494,18 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 		final TestBucketFactory first = new TestBucketFactory();
 		final TestBucketFactory second = new TestBucketFactory();
 
+		final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+				.create()
+				.withMaxPartSize(2L)
+				.withRolloverInterval(100L)
+				.withInactivityInterval(100L)
+				.build();
+
 		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createCustomRescalingTestSink(
-						outDir, 2, 0, 100L, 2L, first, new SimpleStringEncoder<>());
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createCustomRescalingTestSink(
-						outDir, 2, 1, 100L, 2L, second, new SimpleStringEncoder<>())
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createCustomRescalingTestSink(
+						outDir, 2, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, first);
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createCustomRescalingTestSink(
+						outDir, 2, 1, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, second)
 		) {
 			testHarness1.setup();
 			testHarness1.open();
@@ -514,7 +517,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
 			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
 			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
-			checkLocalFs(outDir, 3, 0);
+			TestUtils.checkLocalFs(outDir, 3, 0);
 
 			// intentionally we snapshot them in the reverse order so that the states are shuffled
 			mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
@@ -527,10 +530,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 		final TestBucketFactory secondRecovered = new TestBucketFactory();
 
 		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createCustomRescalingTestSink(
-						outDir, 2, 0, 100L, 2L, firstRecovered, new SimpleStringEncoder<>());
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createCustomRescalingTestSink(
-						outDir, 2, 1, 100L, 2L, secondRecovered, new SimpleStringEncoder<>())
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createCustomRescalingTestSink(
+						outDir, 2, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, firstRecovered);
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createCustomRescalingTestSink(
+						outDir, 2, 1, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, secondRecovered)
 		) {
 			testHarness1.setup();
 			testHarness1.initializeState(mergedSnapshot);
@@ -540,7 +543,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test4", 0), 0L));
 
 			Assert.assertEquals(3L, firstRecovered.getInitialCounter());
-			checkLocalFs(outDir, 1, 3);
+			TestUtils.checkLocalFs(outDir, 1, 3);
 
 			testHarness2.setup();
 			testHarness2.initializeState(mergedSnapshot);
@@ -550,78 +553,26 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 0), 0L));
 
 			Assert.assertEquals(3L, secondRecovered.getInitialCounter());
-			checkLocalFs(outDir, 2, 3);
+			TestUtils.checkLocalFs(outDir, 2, 3);
 		}
 	}
 
 	//////////////////////			Helper Methods			//////////////////////
 
-	private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createRescalingTestSink(
-			File outDir,
-			int totalParallelism,
-			int taskIdx,
-			long inactivityInterval,
-			long partMaxSize) throws Exception {
-
-		return createCustomRescalingTestSink(
-				outDir,
-				totalParallelism,
-				taskIdx,
-				inactivityInterval,
-				partMaxSize,
-				new DefaultBucketFactory<>(),
-				(Encoder<Tuple2<String, Integer>>) (element, stream) -> {
-					stream.write((element.f0 + '@' + element.f1).getBytes(StandardCharsets.UTF_8));
-					stream.write('\n');
-				});
-	}
-
-	private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink(
-			File outDir,
-			int totalParallelism,
-			int taskIdx,
-			long inactivityInterval,
-			long partMaxSize,
-			BucketFactory<Tuple2<String, Integer>> factory,
-			Encoder<Tuple2<String, Integer>> writer) throws Exception {
-
-		StreamingFileSink<Tuple2<String, Integer>> sink = new StreamingFileSink<>(new Path(outDir.toURI()), factory)
-				.setBucketer(new Bucketer<Tuple2<String, Integer>>() {
-
-					private static final long serialVersionUID = -3086487303018372007L;
-
-					@Override
-					public String getBucketId(Tuple2<String, Integer> element, Context context) {
-						return element.f0;
-					}
-				})
-				.setEncoder(writer)
-				.setRollingPolicy(
-						DefaultRollingPolicy
-								.create()
-								.withMaxPartSize(partMaxSize)
-								.withRolloverInterval(inactivityInterval)
-								.withInactivityInterval(inactivityInterval)
-								.build())
-				.setBucketCheckInterval(10L);
-
-		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
-	}
-
-	static class TestBucketFactory extends DefaultBucketFactory<Tuple2<String, Integer>> {
+	static class TestBucketFactory extends DefaultBucketFactory<Tuple2<String, Integer>, String> {
 
 		private static final long serialVersionUID = 2794824980604027930L;
 
 		private long initialCounter = -1L;
 
 		@Override
-		public Bucket<Tuple2<String, Integer>> getNewBucket(
-				RecoverableWriter fsWriter,
-				int subtaskIndex,
-				String bucketId,
-				Path bucketPath,
-				long initialPartCounter,
-				Encoder<Tuple2<String, Integer>> writer) throws IOException {
+		public Bucket<Tuple2<String, Integer>, String> getNewBucket(
+				final RecoverableWriter fsWriter,
+				final int subtaskIndex,
+				final String bucketId,
+				final Path bucketPath,
+				final long initialPartCounter,
+				final PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> partFileWriterFactory) {
 
 			this.initialCounter = initialPartCounter;
 
@@ -631,16 +582,16 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 					bucketId,
 					bucketPath,
 					initialPartCounter,
-					writer);
+					partFileWriterFactory);
 		}
 
 		@Override
-		public Bucket<Tuple2<String, Integer>> restoreBucket(
-				RecoverableWriter fsWriter,
-				int subtaskIndex,
-				long initialPartCounter,
-				Encoder<Tuple2<String, Integer>> writer,
-				BucketState bucketState) throws IOException {
+		public Bucket<Tuple2<String, Integer>, String> restoreBucket(
+				final RecoverableWriter fsWriter,
+				final int subtaskIndex,
+				final long initialPartCounter,
+				final PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> partFileWriterFactory,
+				final BucketState<String> bucketState) throws IOException {
 
 			this.initialCounter = initialPartCounter;
 
@@ -648,7 +599,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 					fsWriter,
 					subtaskIndex,
 					initialPartCounter,
-					writer,
+					partFileWriterFactory,
 					bucketState);
 		}
 
@@ -656,34 +607,4 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 			return initialCounter;
 		}
 	}
-
-	private static void checkLocalFs(File outDir, int expectedInProgress, int expectedCompleted) {
-		int inProgress = 0;
-		int finished = 0;
-
-		for (File file: FileUtils.listFiles(outDir, null, true)) {
-			if (file.getAbsolutePath().endsWith("crc")) {
-				continue;
-			}
-
-			if (file.toPath().getFileName().toString().startsWith(".")) {
-				inProgress++;
-			} else {
-				finished++;
-			}
-		}
-
-		Assert.assertEquals(expectedInProgress, inProgress);
-		Assert.assertEquals(expectedCompleted, finished);
-	}
-
-	private static Map<File, String> getFileContentByPath(File directory) throws IOException {
-		Map<File, String> contents = new HashMap<>(4);
-
-		final Collection<File> filesInBucket = FileUtils.listFiles(directory, null, true);
-		for (File file : filesInBucket) {
-			contents.put(file, FileUtils.readFileToString(file));
-		}
-		return contents;
-	}
 }