You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/20 17:05:19 UTC

[3/5] flink git commit: [FLINK-9903] [DataStream API] Refactor StreamingFileSink / add bulk encoders

[FLINK-9903] [DataStream API] Refactor StreamingFileSink / add bulk encoders

* Add supports for bulk encoders.
* Expose more options in the rolling policy and
* Allows to return any object as bucket id from the bucketer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b56c75ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b56c75ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b56c75ca

Branch: refs/heads/master
Commit: b56c75ca375049b1d2c80d2d0945ae1ae04eb39e
Parents: d309e61
Author: kkloudas <kk...@gmail.com>
Authored: Tue Jul 17 11:52:02 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 20 16:12:29 2018 +0200

----------------------------------------------------------------------
 .../api/common/serialization/BulkWriter.java    |  95 ++++
 .../core/fs/SafetyNetWrapperFileSystem.java     |   5 +
 .../api/functions/sink/filesystem/Bucket.java   |  48 +-
 .../sink/filesystem/BucketFactory.java          |  29 +-
 .../functions/sink/filesystem/BucketState.java  |  12 +-
 .../sink/filesystem/BucketStateSerializer.java  |  39 +-
 .../api/functions/sink/filesystem/Buckets.java  | 339 ++++++++++++++
 .../sink/filesystem/BulkPartWriter.java         | 110 +++++
 .../sink/filesystem/DefaultBucketFactory.java   |  35 +-
 .../sink/filesystem/DefaultRollingPolicy.java   | 142 ------
 .../sink/filesystem/PartFileHandler.java        | 122 -----
 .../functions/sink/filesystem/PartFileInfo.java |   4 +-
 .../sink/filesystem/PartFileWriter.java         | 141 ++++++
 .../sink/filesystem/RollingPolicy.java          |  20 +-
 .../sink/filesystem/RowWisePartWriter.java      |  96 ++++
 .../sink/filesystem/StreamingFileSink.java      | 457 ++++++++-----------
 .../filesystem/bucketers/BasePathBucketer.java  |  11 +-
 .../sink/filesystem/bucketers/Bucketer.java     |  22 +-
 .../filesystem/bucketers/DateTimeBucketer.java  |  12 +-
 .../SimpleVersionedStringSerializer.java        |  77 ++++
 .../rolling/policies/DefaultRollingPolicy.java  | 157 +++++++
 .../policies/OnCheckpointRollingPolicy.java     |  45 ++
 .../filesystem/BucketStateSerializerTest.java   |  47 +-
 .../functions/sink/filesystem/BucketsTest.java  | 119 +++++
 .../sink/filesystem/BulkWriterTest.java         | 144 ++++++
 .../filesystem/LocalStreamingFileSinkTest.java  | 267 ++++-------
 .../sink/filesystem/RollingPolicyTest.java      | 225 +++++++++
 .../functions/sink/filesystem/TestUtils.java    | 164 +++++++
 28 files changed, 2177 insertions(+), 807 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
new file mode 100644
index 0000000..44f5fbe
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An encoder that encodes data in a bulk fashion, encoding many records together at a time.
+ *
+ * <p>Examples for bulk encoding are most compressed formats, including formats like
+ * Parquet and ORC which encode batches of records into blocks of column vectors.
+ *
+ * <p>The bulk encoder may be stateful and is bound to a single stream during its
+ * lifetime.
+ *
+ * @param <T> The type of the elements encoded through this encoder.
+ */
+@PublicEvolving
+public interface BulkWriter<T> {
+
+	/**
+	 * Adds an element to the encoder. The encoder may temporarily buffer the element,
+	 * or immediately write it to the stream.
+	 *
+	 * <p>It may be that adding this element fills up an internal buffer and causes the
+	 * encoding and flushing of a batch of internally buffered elements.
+	 *
+	 * @param element The element to add.
+	 * @throws IOException Thrown, if the element cannot be added to the encoder,
+	 *                     or if the output stream throws an exception.
+	 */
+	void addElement(T element) throws IOException;
+
+	/**
+	 * Flushes all intermediate buffered data to the output stream.
+	 * It is expected that flushing often may reduce the efficiency of the encoding.
+	 *
+	 * @throws IOException Thrown if the encoder cannot be flushed, or if the output
+	 *                     stream throws an exception.
+	 */
+	void flush() throws IOException;
+
+	/**
+	 * Finishes the writing. This must flush all internal buffer, finish encoding, and write
+	 * footers.
+	 *
+	 * <p>The writer is not expected to handle any more records via {@link #addElement(Object)} after
+	 * this method is called.
+	 *
+	 * <p><b>Important:</b> This method MUST NOT close the stream that the writer writes to.
+	 * Closing the stream is expected to happen through the invoker of this method afterwards.
+	 *
+	 * @throws IOException Thrown if the finalization fails.
+	 */
+	void finish() throws IOException;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A factory that creates a {@link BulkWriter}.
+	 * @param <T> The type of record to write.
+	 */
+	@FunctionalInterface
+	interface Factory<T> extends Serializable {
+
+		/**
+		 * Creates a writer that writes to the given stream.
+		 *
+		 * @param out The output stream to write the encoded data to.
+		 * @throws IOException Thrown if the writer cannot be opened, or if the output
+		 *                     stream throws an exception.
+		 */
+		BulkWriter<T> create(FSDataOutputStream out) throws IOException;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 92b3a74c..04e6315 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -65,6 +65,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
 	}
 
 	@Override
+	public RecoverableWriter createRecoverableWriter() throws IOException {
+		return unsafeFileSystem.createRecoverableWriter();
+	}
+
+	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
 		return unsafeFileSystem.getFileBlockLocations(file, start, len);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index d9a6d75..3e2d22c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
@@ -30,6 +29,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
@@ -39,17 +39,17 @@ import java.util.Map;
  * queried to see in which bucket this element should be written to.
  */
 @PublicEvolving
-public class Bucket<IN> {
+public class Bucket<IN, BucketID> {
 
 	private static final String PART_PREFIX = "part";
 
-	private final String bucketId;
+	private final BucketID bucketId;
 
 	private final Path bucketPath;
 
 	private final int subtaskIndex;
 
-	private final Encoder<IN> encoder;
+	private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;
 
 	private final RecoverableWriter fsWriter;
 
@@ -57,7 +57,7 @@ public class Bucket<IN> {
 
 	private long partCounter;
 
-	private PartFileHandler<IN> currentPart;
+	private PartFileWriter<IN, BucketID> currentPart;
 
 	private List<RecoverableWriter.CommitRecoverable> pending;
 
@@ -68,10 +68,10 @@ public class Bucket<IN> {
 			RecoverableWriter fsWriter,
 			int subtaskIndex,
 			long initialPartCounter,
-			Encoder<IN> writer,
-			BucketState bucketstate) throws IOException {
+			PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+			BucketState<BucketID> bucketState) throws IOException {
 
-		this(fsWriter, subtaskIndex, bucketstate.getBucketId(), bucketstate.getBucketPath(), initialPartCounter, writer);
+		this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory);
 
 		// the constructor must have already initialized the filesystem writer
 		Preconditions.checkState(fsWriter != null);
@@ -79,15 +79,15 @@ public class Bucket<IN> {
 		// we try to resume the previous in-progress file, if the filesystem
 		// supports such operation. If not, we just commit the file and start fresh.
 
-		final RecoverableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress();
+		final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress();
 		if (resumable != null) {
-			currentPart = PartFileHandler.resumeFrom(
-					bucketId, fsWriter, resumable, bucketstate.getCreationTime());
+			currentPart = partFileFactory.resumeFrom(
+					bucketId, fsWriter, resumable, bucketState.getCreationTime());
 		}
 
 		// we commit pending files for previous checkpoints to the last successful one
 		// (from which we are recovering from)
-		for (List<RecoverableWriter.CommitRecoverable> commitables: bucketstate.getPendingPerCheckpoint().values()) {
+		for (List<RecoverableWriter.CommitRecoverable> commitables: bucketState.getPendingPerCheckpoint().values()) {
 			for (RecoverableWriter.CommitRecoverable commitable: commitables) {
 				fsWriter.recoverForCommit(commitable).commitAfterRecovery();
 			}
@@ -100,26 +100,26 @@ public class Bucket<IN> {
 	public Bucket(
 			RecoverableWriter fsWriter,
 			int subtaskIndex,
-			String bucketId,
+			BucketID bucketId,
 			Path bucketPath,
 			long initialPartCounter,
-			Encoder<IN> writer) {
+			PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory) {
 
 		this.fsWriter = Preconditions.checkNotNull(fsWriter);
 		this.subtaskIndex = subtaskIndex;
 		this.bucketId = Preconditions.checkNotNull(bucketId);
 		this.bucketPath = Preconditions.checkNotNull(bucketPath);
 		this.partCounter = initialPartCounter;
-		this.encoder = Preconditions.checkNotNull(writer);
+		this.partFileFactory = Preconditions.checkNotNull(partFileFactory);
 
 		this.pending = new ArrayList<>();
 	}
 
-	public PartFileInfo getInProgressPartInfo() {
+	public PartFileInfo<BucketID> getInProgressPartInfo() {
 		return currentPart;
 	}
 
-	public String getBucketId() {
+	public BucketID getBucketId() {
 		return bucketId;
 	}
 
@@ -137,18 +137,18 @@ public class Bucket<IN> {
 
 	void write(IN element, long currentTime) throws IOException {
 		Preconditions.checkState(currentPart != null, "bucket has been closed");
-		currentPart.write(element, encoder, currentTime);
+		currentPart.write(element, currentTime);
 	}
 
 	void rollPartFile(final long currentTime) throws IOException {
 		closePartFile();
-		currentPart = PartFileHandler.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
+		currentPart = partFileFactory.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
 		partCounter++;
 	}
 
-	void merge(final Bucket<IN> bucket) throws IOException {
+	void merge(final Bucket<IN, BucketID> bucket) throws IOException {
 		Preconditions.checkNotNull(bucket);
-		Preconditions.checkState(bucket.getBucketPath().equals(getBucketPath()));
+		Preconditions.checkState(Objects.equals(bucket.getBucketPath(), bucketPath));
 
 		// there should be no pending files in the "to-merge" states.
 		Preconditions.checkState(bucket.pending.isEmpty());
@@ -176,7 +176,7 @@ public class Bucket<IN> {
 		}
 	}
 
-	public void commitUpToCheckpoint(long checkpointId) throws IOException {
+	public void onCheckpointAcknowledgment(long checkpointId) throws IOException {
 		Preconditions.checkNotNull(fsWriter);
 
 		Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
@@ -193,7 +193,7 @@ public class Bucket<IN> {
 		}
 	}
 
-	public BucketState snapshot(long checkpointId) throws IOException {
+	public BucketState<BucketID> onCheckpoint(long checkpointId) throws IOException {
 		RecoverableWriter.ResumeRecoverable resumable = null;
 		long creationTime = Long.MAX_VALUE;
 
@@ -206,7 +206,7 @@ public class Bucket<IN> {
 			pendingPerCheckpoint.put(checkpointId, pending);
 			pending = new ArrayList<>();
 		}
-		return new BucketState(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
+		return new BucketState<>(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
 	}
 
 	private Path getNewPartPath() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 88f3c1a..0c6b587 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 
@@ -30,20 +29,20 @@ import java.io.Serializable;
  * A factory able to create {@link Bucket buckets} for the {@link StreamingFileSink}.
  */
 @Internal
-public interface BucketFactory<IN> extends Serializable {
+interface BucketFactory<IN, BucketID> extends Serializable {
 
-	Bucket<IN> getNewBucket(
-			RecoverableWriter fsWriter,
-			int subtaskIndex,
-			String bucketId,
-			Path bucketPath,
-			long initialPartCounter,
-			Encoder<IN> writer) throws IOException;
+	Bucket<IN, BucketID> getNewBucket(
+			final RecoverableWriter fsWriter,
+			final int subtaskIndex,
+			final BucketID bucketId,
+			final Path bucketPath,
+			final long initialPartCounter,
+			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory) throws IOException;
 
-	Bucket<IN> restoreBucket(
-			RecoverableWriter fsWriter,
-			int subtaskIndex,
-			long initialPartCounter,
-			Encoder<IN> writer,
-			BucketState bucketstate) throws IOException;
+	Bucket<IN, BucketID> restoreBucket(
+			final RecoverableWriter fsWriter,
+			final int subtaskIndex,
+			final long initialPartCounter,
+			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketState<BucketID> bucketState) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index 5ebc46c..bb49e3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -32,9 +32,9 @@ import java.util.Map;
  * The state of the {@link Bucket} that is to be checkpointed.
  */
 @Internal
-public class BucketState {
+public class BucketState<BucketID> {
 
-	private final String bucketId;
+	private final BucketID bucketId;
 
 	/**
 	 * The base path for the bucket, i.e. the directory where all the part files are stored.
@@ -59,10 +59,10 @@ public class BucketState {
 	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint;
 
 	public BucketState(
-			final String bucketId,
+			final BucketID bucketId,
 			final Path bucketPath,
 			final long creationTime,
-			final @Nullable RecoverableWriter.ResumeRecoverable inProgress,
+			@Nullable final RecoverableWriter.ResumeRecoverable inProgress,
 			final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
 	) {
 		this.bucketId = Preconditions.checkNotNull(bucketId);
@@ -72,7 +72,7 @@ public class BucketState {
 		this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint);
 	}
 
-	public String getBucketId() {
+	public BucketID getBucketId() {
 		return bucketId;
 	}
 
@@ -85,7 +85,7 @@ public class BucketState {
 	}
 
 	@Nullable
-	public RecoverableWriter.ResumeRecoverable getCurrentInProgress() {
+	public RecoverableWriter.ResumeRecoverable getInProgress() {
 		return inProgress;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index a167ec9..cf9b805 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -42,7 +41,7 @@ import java.util.Map.Entry;
  * A {@code SimpleVersionedSerializer} used to serialize the {@link BucketState BucketState}.
  */
 @Internal
-class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
+class BucketStateSerializer<BucketID> implements SimpleVersionedSerializer<BucketState<BucketID>> {
 
 	private static final int MAGIC_NUMBER = 0x1e764b79;
 
@@ -50,12 +49,16 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
 
 	private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
 
-	public BucketStateSerializer(
-			final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
-			final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer) {
+	private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
 
+	BucketStateSerializer(
+			final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
+			final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer,
+			final SimpleVersionedSerializer<BucketID> bucketIdSerializer
+	) {
 		this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
 		this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
+		this.bucketIdSerializer = Preconditions.checkNotNull(bucketIdSerializer);
 	}
 
 	@Override
@@ -64,7 +67,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
 	}
 
 	@Override
-	public byte[] serialize(BucketState state) throws IOException {
+	public byte[] serialize(BucketState<BucketID> state) throws IOException {
 		DataOutputSerializer out = new DataOutputSerializer(256);
 		out.writeInt(MAGIC_NUMBER);
 		serializeV1(state, out);
@@ -72,7 +75,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
 	}
 
 	@Override
-	public BucketState deserialize(int version, byte[] serialized) throws IOException {
+	public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException {
 		switch (version) {
 			case 1:
 				DataInputDeserializer in = new DataInputDeserializer(serialized);
@@ -84,13 +87,13 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
 	}
 
 	@VisibleForTesting
-	void serializeV1(BucketState state, DataOutputView out) throws IOException {
-		out.writeUTF(state.getBucketId());
+	void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException {
+		SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out);
 		out.writeUTF(state.getBucketPath().toString());
 		out.writeLong(state.getCreationTime());
 
 		// put the current open part file
-		final RecoverableWriter.ResumeRecoverable currentPart = state.getCurrentInProgress();
+		final RecoverableWriter.ResumeRecoverable currentPart = state.getInProgress();
 		if (currentPart != null) {
 			out.writeBoolean(true);
 			SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, currentPart, out);
@@ -100,19 +103,19 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
 		}
 
 		// put the map of pending files per checkpoint
-		final Map<Long, List<CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint();
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint();
 
 		// manually keep the version here to safe some bytes
 		out.writeInt(commitableSerializer.getVersion());
 
 		out.writeInt(pendingCommitters.size());
-		for (Entry<Long, List<CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
-			List<CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
+		for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
+			List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
 
 			out.writeLong(resumablesForCheckpoint.getKey());
 			out.writeInt(resumables.size());
 
-			for (CommitRecoverable resumable : resumables) {
+			for (RecoverableWriter.CommitRecoverable resumable : resumables) {
 				byte[] serialized = commitableSerializer.serialize(resumable);
 				out.writeInt(serialized.length);
 				out.write(serialized);
@@ -121,8 +124,8 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
 	}
 
 	@VisibleForTesting
-	BucketState deserializeV1(DataInputView in) throws IOException {
-		final String bucketId = in.readUTF();
+	BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
+		final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in);
 		final String bucketPathStr = in.readUTF();
 		final long creationTime = in.readLong();
 
@@ -140,7 +143,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
 			final long checkpointId = in.readLong();
 			final int noOfResumables = in.readInt();
 
-			final ArrayList<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
+			final List<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
 			for (int j = 0; j < noOfResumables; j++) {
 				final byte[] bytes = new byte[in.readInt()];
 				in.readFully(bytes);
@@ -149,7 +152,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
 			resumablesPerCheckpoint.put(checkpointId, resumables);
 		}
 
-		return new BucketState(
+		return new BucketState<>(
 				bucketId,
 				new Path(bucketPathStr),
 				creationTime,

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
new file mode 100644
index 0000000..6afba17
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * The manager of the different active buckets in the {@link StreamingFileSink}.
+ *
+ * <p>This class is responsible for all bucket-related operations and the actual
+ * {@link StreamingFileSink} is just plugging in the functionality offered by
+ * this class to the lifecycle of the operator.
+ *
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}.
+ */
+public class Buckets<IN, BucketID> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Buckets.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final BucketFactory<IN, BucketID> bucketFactory;
+
+	private final Bucketer<IN, BucketID> bucketer;
+
+	private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
+
+	private final RollingPolicy<BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final int subtaskIndex;
+
+	private final Buckets.BucketerContext bucketerContext;
+
+	private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
+
+	private long initMaxPartCounter;
+
+	private long maxPartCounterUsed;
+
+	private final RecoverableWriter fileSystemWriter;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final BucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A private constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketer The {@link Bucketer} provided by the user.
+	 * @param bucketFactory The {@link BucketFactory} to be used to create buckets.
+	 * @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	Buckets(
+			final Path basePath,
+			final Bucketer<IN, BucketID> bucketer,
+			final BucketFactory<IN, BucketID> bucketFactory,
+			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final RollingPolicy<BucketID> rollingPolicy,
+			final int subtaskIndex) throws IOException {
+
+		this.basePath = Preconditions.checkNotNull(basePath);
+		this.bucketer = Preconditions.checkNotNull(bucketer);
+		this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+		this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory);
+		this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+		this.subtaskIndex = subtaskIndex;
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new Buckets.BucketerContext();
+
+		this.fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+		this.bucketStateSerializer = new BucketStateSerializer<>(
+				fileSystemWriter.getResumeRecoverableSerializer(),
+				fileSystemWriter.getCommitRecoverableSerializer(),
+				bucketer.getSerializer()
+		);
+
+		this.initMaxPartCounter = 0L;
+		this.maxPartCounterUsed = 0L;
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 * @param partCounterState the state holding the max previously used part counters.
+	 * @throws Exception
+	 */
+	void initializeState(final ListState<byte[]> bucketStates, final ListState<Long> partCounterState) throws Exception {
+
+		// When resuming after a failure:
+		// 1) we get the max part counter used before in order to make sure that we do not overwrite valid data
+		// 2) we commit any pending files for previous checkpoints (previous to the last successful one)
+		// 3) we resume writing to the previous in-progress file of each bucket, and
+		// 4) if we receive multiple states for the same bucket, we merge them.
+
+		// get the max counter
+		long maxCounter = 0L;
+		for (long partCounter: partCounterState.get()) {
+			maxCounter = Math.max(partCounter, maxCounter);
+		}
+		initMaxPartCounter = maxCounter;
+
+		// get the restored buckets
+		for (byte[] recoveredState : bucketStates.get()) {
+			final BucketState<BucketID> bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize(
+					bucketStateSerializer, recoveredState);
+
+			final BucketID bucketId = bucketState.getBucketId();
+
+			LOG.info("Recovered bucket for {}", bucketId);
+
+			final Bucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
+					fileSystemWriter,
+					subtaskIndex,
+					initMaxPartCounter,
+					partFileWriterFactory,
+					bucketState
+			);
+
+			final Bucket<IN, BucketID> existingBucket = activeBuckets.get(bucketId);
+			if (existingBucket == null) {
+				activeBuckets.put(bucketId, restoredBucket);
+			} else {
+				existingBucket.merge(restoredBucket);
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(),
+						subtaskIndex, assembleBucketPath(bucketId));
+			}
+		}
+	}
+
+	void publishUpToCheckpoint(long checkpointId) throws IOException {
+		final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
+				activeBuckets.entrySet().iterator();
+
+		while (activeBucketIt.hasNext()) {
+			Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
+			bucket.onCheckpointAcknowledgment(checkpointId);
+
+			if (!bucket.isActive()) {
+				// We've dealt with all the pending files and the writer for this bucket is not currently open.
+				// Therefore this bucket is currently inactive and we can remove it from our state.
+				activeBucketIt.remove();
+			}
+		}
+	}
+
+	void snapshotState(
+			final long checkpointId,
+			final long checkpointTimestamp,
+			final ListState<byte[]> bucketStates,
+			final ListState<Long> partCounterState) throws Exception {
+
+		Preconditions.checkState(
+				fileSystemWriter != null && bucketStateSerializer != null,
+				"sink has not been initialized"
+		);
+
+		for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
+			final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
+
+			if (info != null &&
+					(rollingPolicy.shouldRollOnCheckpoint(info) ||
+					rollingPolicy.shouldRollOnEvent(info) ||
+					rollingPolicy.shouldRollOnProcessingTime(info, checkpointTimestamp))
+			) {
+				// we also check here so that we do not have to always
+				// wait for the "next" element to arrive.
+				bucket.closePartFile();
+			}
+
+			final BucketState<BucketID> bucketState = bucket.onCheckpoint(checkpointId);
+			bucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
+		}
+
+		partCounterState.add(maxPartCounterUsed);
+	}
+
+	/**
+	 * Called on every incoming element to write it to its final location.
+	 * @param value the element itself.
+	 * @param context the {@link SinkFunction.Context context} available to the sink function.
+	 * @throws Exception
+	 */
+	void onElement(IN value, SinkFunction.Context context) throws Exception {
+		final long currentProcessingTime = context.currentProcessingTime();
+
+		// setting the values in the bucketer context
+		bucketerContext.update(
+				context.timestamp(),
+				context.currentWatermark(),
+				currentProcessingTime);
+
+		final BucketID bucketId = bucketer.getBucketId(value, bucketerContext);
+
+		Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+		if (bucket == null) {
+			final Path bucketPath = assembleBucketPath(bucketId);
+			bucket = bucketFactory.getNewBucket(
+					fileSystemWriter,
+					subtaskIndex,
+					bucketId,
+					bucketPath,
+					initMaxPartCounter,
+					partFileWriterFactory);
+			activeBuckets.put(bucketId, bucket);
+		}
+
+		final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
+		if (info == null || rollingPolicy.shouldRollOnEvent(info)) {
+			bucket.rollPartFile(currentProcessingTime);
+		}
+		bucket.write(value, currentProcessingTime);
+
+		// we update the counter here because as buckets become inactive and
+		// get removed in the initializeState(), at the time we snapshot they
+		// may not be there to take them into account during checkpointing.
+		updateMaxPartCounter(bucket.getPartCounter());
+	}
+
+	void onProcessingTime(long timestamp) throws Exception {
+		for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
+			final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
+			if (info != null && rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) {
+				bucket.closePartFile();
+			}
+		}
+	}
+
+	void close() {
+		if (activeBuckets != null) {
+			activeBuckets.values().forEach(Bucket::dispose);
+		}
+	}
+
+	/**
+	 * Assembles the final bucket {@link Path} that will be used for the provided bucket in the
+	 * underlying filesystem.
+	 * @param bucketId the id of the bucket as returned by the {@link Bucketer}.
+	 * @return The resulting path.
+	 */
+	private Path assembleBucketPath(BucketID bucketId) {
+		return new Path(basePath, bucketId.toString());
+	}
+
+	/**
+	 * Updates the state keeping track of the maximum used part
+	 * counter across all local active buckets.
+	 * @param candidate the part counter that will potentially replace the current {@link #maxPartCounterUsed}.
+	 */
+	private void updateMaxPartCounter(long candidate) {
+		maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
+	}
+
+	/**
+	 * The {@link Bucketer.Context} exposed to the
+	 * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
+	 * whenever a new incoming element arrives.
+	 */
+	private static final class BucketerContext implements Bucketer.Context {
+
+		@Nullable
+		private Long elementTimestamp;
+
+		private long currentWatermark;
+
+		private long currentProcessingTime;
+
+		private BucketerContext() {
+			this.elementTimestamp = null;
+			this.currentWatermark = Long.MIN_VALUE;
+			this.currentProcessingTime = Long.MIN_VALUE;
+		}
+
+		void update(@Nullable Long element, long watermark, long processingTime) {
+			this.elementTimestamp = element;
+			this.currentWatermark = watermark;
+			this.currentProcessingTime = processingTime;
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return currentProcessingTime;
+		}
+
+		@Override
+		public long currentWatermark() {
+			return currentWatermark;
+		}
+
+		@Override
+		@Nullable
+		public Long timestamp() {
+			return elementTimestamp;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
new file mode 100644
index 0000000..558b1bf
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link PartFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
+ * This also implements the {@link PartFileInfo}.
+ */
+@Internal
+final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+
+	private final BulkWriter<IN> writer;
+
+	private BulkPartWriter(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream currentPartStream,
+			final BulkWriter<IN> writer,
+			final long creationTime) {
+		super(bucketId, currentPartStream, creationTime);
+		this.writer = Preconditions.checkNotNull(writer);
+	}
+
+	@Override
+	void write(IN element, long currentTime) throws IOException {
+		writer.addElement(element);
+		markWrite(currentTime);
+	}
+
+	@Override
+	RecoverableWriter.ResumeRecoverable persist() {
+		throw new UnsupportedOperationException("Bulk Part Writers do not support \"pause and resume\" operations.");
+	}
+
+	@Override
+	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+		writer.flush();
+		writer.finish();
+		return super.closeForCommit();
+	}
+
+	/**
+	 * A factory that creates {@link BulkPartWriter BulkPartWriters}.
+	 * @param <IN> The type of input elements.
+	 * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}.
+	 */
+	static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
+
+		private final BulkWriter.Factory<IN> writerFactory;
+
+		Factory(BulkWriter.Factory<IN> writerFactory) {
+			this.writerFactory = writerFactory;
+		}
+
+		@Override
+		public PartFileWriter<IN, BucketID> resumeFrom(
+				final BucketID bucketId,
+				final RecoverableWriter fileSystemWriter,
+				final RecoverableWriter.ResumeRecoverable resumable,
+				final long creationTime) throws IOException {
+
+			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(resumable);
+
+			final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
+			final BulkWriter<IN> writer = writerFactory.create(stream);
+			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+		}
+
+		@Override
+		public PartFileWriter<IN, BucketID> openNew(
+				final BucketID bucketId,
+				final RecoverableWriter fileSystemWriter,
+				final Path path,
+				final long creationTime) throws IOException {
+
+			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(path);
+
+			final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
+			final BulkWriter<IN> writer = writerFactory.create(stream);
+			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
index 795ba74..532138f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 
@@ -29,18 +28,18 @@ import java.io.IOException;
  * A factory returning {@link Bucket buckets}.
  */
 @Internal
-public class DefaultBucketFactory<IN> implements BucketFactory<IN> {
+class DefaultBucketFactory<IN, BucketID> implements BucketFactory<IN, BucketID> {
 
-	private static final long serialVersionUID = 3372881359208513357L;
+	private static final long serialVersionUID = 1L;
 
 	@Override
-	public Bucket<IN> getNewBucket(
-			RecoverableWriter fsWriter,
-			int subtaskIndex,
-			String bucketId,
-			Path bucketPath,
-			long initialPartCounter,
-			Encoder<IN> writer) throws IOException {
+	public Bucket<IN, BucketID> getNewBucket(
+			final RecoverableWriter fsWriter,
+			final int subtaskIndex,
+			final BucketID bucketId,
+			final Path bucketPath,
+			final long initialPartCounter,
+			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory) {
 
 		return new Bucket<>(
 				fsWriter,
@@ -48,22 +47,22 @@ public class DefaultBucketFactory<IN> implements BucketFactory<IN> {
 				bucketId,
 				bucketPath,
 				initialPartCounter,
-				writer);
+				partFileWriterFactory);
 	}
 
 	@Override
-	public Bucket<IN> restoreBucket(
-			RecoverableWriter fsWriter,
-			int subtaskIndex,
-			long initialPartCounter,
-			Encoder<IN> writer,
-			BucketState bucketState) throws IOException {
+	public Bucket<IN, BucketID> restoreBucket(
+			final RecoverableWriter fsWriter,
+			final int subtaskIndex,
+			final long initialPartCounter,
+			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+			final BucketState<BucketID> bucketState) throws IOException {
 
 		return new Bucket<>(
 				fsWriter,
 				subtaskIndex,
 				initialPartCounter,
-				writer,
+				partFileWriterFactory,
 				bucketState);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
deleted file mode 100644
index 026ac70..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * The default implementation of the {@link RollingPolicy}.
- *
- * <p>This policy rolls a part file if:
- * <ol>
- *     <li>there is no open part file,</li>
- * 	   <li>the current file has reached the maximum bucket size (by default 128MB),</li>
- * 	   <li>the current file is older than the roll over interval (by default 60 sec), or</li>
- * 	   <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li>
- * </ol>
- */
-@PublicEvolving
-public final class DefaultRollingPolicy implements RollingPolicy {
-
-	private static final long serialVersionUID = 1318929857047767030L;
-
-	private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
-
-	private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
-
-	private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
-
-	private final long partSize;
-
-	private final long rolloverInterval;
-
-	private final long inactivityInterval;
-
-	/**
-	 * Private constructor to avoid direct instantiation.
-	 */
-	private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) {
-		Preconditions.checkArgument(partSize > 0L);
-		Preconditions.checkArgument(rolloverInterval > 0L);
-		Preconditions.checkArgument(inactivityInterval > 0L);
-
-		this.partSize = partSize;
-		this.rolloverInterval = rolloverInterval;
-		this.inactivityInterval = inactivityInterval;
-	}
-
-	@Override
-	public boolean shouldRoll(final PartFileInfo state, final long currentTime) throws IOException {
-		if (state == null) {
-			// this means that there is no currently open part file.
-			return true;
-		}
-
-		if (state.getSize() > partSize) {
-			return true;
-		}
-
-		if (currentTime - state.getCreationTime() > rolloverInterval) {
-			return true;
-		}
-
-		return currentTime - state.getLastUpdateTime() > inactivityInterval;
-	}
-
-	/**
-	 * Initiates the instantiation of a {@link DefaultRollingPolicy}.
-	 * To finalize it and have the actual policy, call {@code .create()}.
-	 */
-	public static PolicyBuilder create() {
-		return new PolicyBuilder();
-	}
-
-	/**
-	 * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
-	 */
-	@PublicEvolving
-	public static class PolicyBuilder {
-
-		private long partSize = DEFAULT_MAX_PART_SIZE;
-
-		private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
-
-		private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
-
-		/**
-		 * Sets the part size above which a part file will have to roll.
-		 * @param size the allowed part size.
-		 */
-		public PolicyBuilder withMaxPartSize(long size) {
-			Preconditions.checkState(size > 0L);
-			this.partSize = size;
-			return this;
-		}
-
-		/**
-		 * Sets the interval of allowed inactivity after which a part file will have to roll.
-		 * @param interval the allowed inactivity interval.
-		 */
-		public PolicyBuilder withInactivityInterval(long interval) {
-			Preconditions.checkState(interval > 0L);
-			this.inactivityInterval = interval;
-			return this;
-		}
-
-		/**
-		 * Sets the max time a part file can stay open before having to roll.
-		 * @param interval the desired rollover interval.
-		 */
-		public PolicyBuilder withRolloverInterval(long interval) {
-			Preconditions.checkState(interval > 0L);
-			this.rolloverInterval = interval;
-			return this;
-		}
-
-		/**
-		 * Creates the actual policy.
-		 */
-		public DefaultRollingPolicy build() {
-			return new DefaultRollingPolicy(partSize, rolloverInterval, inactivityInterval);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
deleted file mode 100644
index 10fd12b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * A handler for the currently open part file in a specific {@link Bucket}.
- * This also implements the {@link PartFileInfo}.
- */
-@Internal
-class PartFileHandler<IN> implements PartFileInfo {
-
-	private final String bucketId;
-
-	private final long creationTime;
-
-	private final RecoverableFsDataOutputStream currentPartStream;
-
-	private long lastUpdateTime;
-
-	private PartFileHandler(
-			final String bucketId,
-			final RecoverableFsDataOutputStream currentPartStream,
-			final long creationTime) {
-
-		Preconditions.checkArgument(creationTime >= 0L);
-		this.bucketId = Preconditions.checkNotNull(bucketId);
-		this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
-		this.creationTime = creationTime;
-		this.lastUpdateTime = creationTime;
-	}
-
-	public static <IN> PartFileHandler<IN> resumeFrom(
-			final String bucketId,
-			final RecoverableWriter fileSystemWriter,
-			final RecoverableWriter.ResumeRecoverable resumable,
-			final long creationTime) throws IOException {
-		Preconditions.checkNotNull(bucketId);
-		Preconditions.checkNotNull(fileSystemWriter);
-		Preconditions.checkNotNull(resumable);
-
-		final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
-		return new PartFileHandler<>(bucketId, stream, creationTime);
-	}
-
-	public static <IN> PartFileHandler<IN> openNew(
-			final String bucketId,
-			final RecoverableWriter fileSystemWriter,
-			final Path path,
-			final long creationTime) throws IOException {
-		Preconditions.checkNotNull(bucketId);
-		Preconditions.checkNotNull(fileSystemWriter);
-		Preconditions.checkNotNull(path);
-
-		final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
-		return new PartFileHandler<>(bucketId, stream, creationTime);
-	}
-
-	void write(IN element, Encoder<IN> encoder, long currentTime) throws IOException {
-		encoder.encode(element, currentPartStream);
-		this.lastUpdateTime = currentTime;
-	}
-
-	RecoverableWriter.ResumeRecoverable persist() throws IOException {
-		return currentPartStream.persist();
-	}
-
-	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
-		return currentPartStream.closeForCommit().getRecoverable();
-	}
-
-	void dispose() {
-		// we can suppress exceptions here, because we do not rely on close() to
-		// flush or persist any data
-		IOUtils.closeQuietly(currentPartStream);
-	}
-
-	@Override
-	public String getBucketId() {
-		return bucketId;
-	}
-
-	@Override
-	public long getCreationTime() {
-		return creationTime;
-	}
-
-	@Override
-	public long getSize() throws IOException {
-		return currentPartStream.getPos();
-	}
-
-	@Override
-	public long getLastUpdateTime() {
-		return lastUpdateTime;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
index 9c3d047..5e72ea0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
@@ -29,13 +29,13 @@ import java.io.IOException;
  * should roll the part file or not.
  */
 @PublicEvolving
-public interface PartFileInfo {
+public interface PartFileInfo<BucketID> {
 
 	/**
 	 * @return The bucket identifier of the current buffer, as returned by the
 	 * {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
 	 */
-	String getBucketId();
+	BucketID getBucketId();
 
 	/**
 	 * @return The creation time (in ms) of the currently open part file.

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
new file mode 100644
index 0000000..662454b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * An abstract writer for the currently open part file in a specific {@link Bucket}.
+ *
+ * <p>Currently, there are two subclasses, of this class:
+ * <ol>
+ *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
+ *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
+ * </ol>
+ *
+ * <p>This also implements the {@link PartFileInfo}.
+ */
+@Internal
+abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
+
+	private final BucketID bucketId;
+
+	private final long creationTime;
+
+	protected final RecoverableFsDataOutputStream currentPartStream;
+
+	private long lastUpdateTime;
+
+	protected PartFileWriter(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream currentPartStream,
+			final long creationTime) {
+
+		Preconditions.checkArgument(creationTime >= 0L);
+		this.bucketId = Preconditions.checkNotNull(bucketId);
+		this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
+		this.creationTime = creationTime;
+		this.lastUpdateTime = creationTime;
+	}
+
+	abstract void write(IN element, long currentTime) throws IOException;
+
+	RecoverableWriter.ResumeRecoverable persist() throws IOException {
+		return currentPartStream.persist();
+	}
+
+	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+		return currentPartStream.closeForCommit().getRecoverable();
+	}
+
+	void dispose() {
+		// we can suppress exceptions here, because we do not rely on close() to
+		// flush or persist any data
+		IOUtils.closeQuietly(currentPartStream);
+	}
+
+	void markWrite(long now) {
+		this.lastUpdateTime = now;
+	}
+
+	@Override
+	public BucketID getBucketId() {
+		return bucketId;
+	}
+
+	@Override
+	public long getCreationTime() {
+		return creationTime;
+	}
+
+	@Override
+	public long getSize() throws IOException {
+		return currentPartStream.getPos();
+	}
+
+	@Override
+	public long getLastUpdateTime() {
+		return lastUpdateTime;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An interface for factories that create the different {@link PartFileWriter writers}.
+	 */
+	interface PartFileFactory<IN, BucketID> {
+
+		/**
+		 * Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
+		 * @param bucketId the id of the bucket this writer is writing to.
+		 * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+		 * @param resumable the state of the stream we are resurrecting.
+		 * @param creationTime the creation time of the stream.
+		 * @return the recovered {@link PartFileWriter writer}.
+		 * @throws IOException
+		 */
+		PartFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketId,
+			final RecoverableWriter fileSystemWriter,
+			final RecoverableWriter.ResumeRecoverable resumable,
+			final long creationTime) throws IOException;
+
+		/**
+		 * Used to create a new {@link PartFileWriter writer}.
+		 * @param bucketId the id of the bucket this writer is writing to.
+		 * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+		 * @param path the part this writer will write to.
+		 * @param creationTime the creation time of the stream.
+		 * @return the new {@link PartFileWriter writer}.
+		 * @throws IOException
+		 */
+		PartFileWriter<IN, BucketID> openNew(
+			final BucketID bucketId,
+			final RecoverableWriter fileSystemWriter,
+			final Path path,
+			final long creationTime) throws IOException;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
index 936377e..24c38aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
@@ -28,13 +28,27 @@ import java.io.Serializable;
  * rolls its currently open part file and opens a new one.
  */
 @PublicEvolving
-public interface RollingPolicy extends Serializable {
+public interface RollingPolicy<BucketID> extends Serializable {
 
 	/**
-	 * Determines if the in-progress part file for a bucket should roll.
+	 * Determines if the in-progress part file for a bucket should roll on every checkpoint.
+	 * @param partFileState the state of the currently open part file of the bucket.
+	 * @return {@code True} if the part file should roll, {@link false} otherwise.
+	 */
+	boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;
+
+	/**
+	 * Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size.
+	 * @param partFileState the state of the currently open part file of the bucket.
+	 * @return {@code True} if the part file should roll, {@link false} otherwise.
+	 */
+	boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState) throws IOException;
+
+	/**
+	 * Determines if the in-progress part file for a bucket should roll based on a time condition.
 	 * @param partFileState the state of the currently open part file of the bucket.
 	 * @param currentTime the current processing time.
 	 * @return {@code True} if the part file should roll, {@link false} otherwise.
 	 */
-	boolean shouldRoll(final PartFileInfo partFileState, final long currentTime) throws IOException;
+	boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
new file mode 100644
index 0000000..0b00b43
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}.
+ * This also implements the {@link PartFileInfo}.
+ */
+@Internal
+final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+
+	private final Encoder<IN> encoder;
+
+	private RowWisePartWriter(
+			final BucketID bucketId,
+			final RecoverableFsDataOutputStream currentPartStream,
+			final Encoder<IN> encoder,
+			final long creationTime) {
+		super(bucketId, currentPartStream, creationTime);
+		this.encoder = Preconditions.checkNotNull(encoder);
+	}
+
+	@Override
+	void write(IN element, long currentTime) throws IOException {
+		encoder.encode(element, currentPartStream);
+		markWrite(currentTime);
+	}
+
+	/**
+	 * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
+	 * @param <IN> The type of input elements.
+	 * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}.
+	 */
+	static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
+
+		private final Encoder<IN> encoder;
+
+		Factory(Encoder<IN> encoder) {
+			this.encoder = encoder;
+		}
+
+		@Override
+		public PartFileWriter<IN, BucketID> resumeFrom(
+				final BucketID bucketId,
+				final RecoverableWriter fileSystemWriter,
+				final RecoverableWriter.ResumeRecoverable resumable,
+				final long creationTime) throws IOException {
+
+			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(resumable);
+
+			final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
+			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+		}
+
+		@Override
+		public PartFileWriter<IN, BucketID> openNew(
+				final BucketID bucketId,
+				final RecoverableWriter fileSystemWriter,
+				final Path path,
+				final long creationTime) throws IOException {
+
+			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(path);
+
+			final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
+			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+		}
+	}
+}