You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/16 20:10:08 UTC

[3/3] flink git commit: [FLINK-9750] [DataStream API] Add new StreamingFileSink on top of the ResumableWriter.

[FLINK-9750] [DataStream API] Add new StreamingFileSink on top of the ResumableWriter.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bbc91eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bbc91eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bbc91eb

Branch: refs/heads/master
Commit: 0bbc91eb1b23fa535e28fa8403f8040db46ebf98
Parents: 975fdbe
Author: kkloudas <kk...@gmail.com>
Authored: Mon Jul 16 20:51:44 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 16 20:54:02 2018 +0200

----------------------------------------------------------------------
 .../flink/api/common/serialization/Encoder.java |  43 ++
 .../serialization/SimpleStringEncoder.java      |  69 ++
 .../core/io/SimpleVersionedSerialization.java   | 159 +++++
 .../core/io/SimpleVersionedSerializer.java      |  19 +-
 .../io/SimpleVersionedSerializationTest.java    | 159 +++++
 .../api/functions/sink/filesystem/Bucket.java   | 215 ++++++
 .../sink/filesystem/BucketFactory.java          |  49 ++
 .../functions/sink/filesystem/BucketState.java  |  95 +++
 .../sink/filesystem/BucketStateSerializer.java  | 166 +++++
 .../sink/filesystem/DefaultBucketFactory.java   |  69 ++
 .../sink/filesystem/DefaultRollingPolicy.java   | 142 ++++
 .../sink/filesystem/PartFileHandler.java        | 122 ++++
 .../functions/sink/filesystem/PartFileInfo.java |  54 ++
 .../sink/filesystem/RollingPolicy.java          |  40 ++
 .../sink/filesystem/StreamingFileSink.java      | 425 ++++++++++++
 .../filesystem/bucketers/BasePathBucketer.java  |  41 ++
 .../sink/filesystem/bucketers/Bucketer.java     |  80 +++
 .../filesystem/bucketers/DateTimeBucketer.java  |  94 +++
 .../filesystem/BucketStateSerializerTest.java   | 265 +++++++
 .../filesystem/LocalStreamingFileSinkTest.java  | 689 +++++++++++++++++++
 20 files changed, 2985 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/main/java/org/apache/flink/api/common/serialization/Encoder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/Encoder.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/Encoder.java
new file mode 100644
index 0000000..f60ddc9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/Encoder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A {@link Encoder} is used by the streaming file sink to perform the actual writing
+ * of the incoming elements to the files in a bucket.
+ *
+ * @param <IN> The type of the elements that are being written by the sink.
+ */
+@PublicEvolving
+public interface Encoder<IN> extends Serializable {
+
+	/**
+	 * Writes one element to the bucket file.
+	 * @param element the element to be written.
+	 * @param stream the stream to write the element to.
+	 */
+	void encode(IN element, OutputStream stream) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
new file mode 100644
index 0000000..e50229b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+/**
+ * A simple {@link Encoder} that uses {@code toString()} on the input elements and
+ * writes them to the output bucket file separated by newline.
+ *
+ * @param <IN> The type of the elements that are being written by the sink.
+ */
+@PublicEvolving
+public class SimpleStringEncoder<IN> implements Encoder<IN> {
+
+	private static final long serialVersionUID = -6865107843734614452L;
+
+	private String charsetName;
+
+	private transient Charset charset;
+
+	/**
+	 * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
+	 * strings to bytes.
+	 */
+	public SimpleStringEncoder() {
+		this("UTF-8");
+	}
+
+	/**
+	 * Creates a new {@code StringWriter} that uses the given charset to convert
+	 * strings to bytes.
+	 *
+	 * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
+	 */
+	public SimpleStringEncoder(String charsetName) {
+		this.charsetName = charsetName;
+	}
+
+	@Override
+	public void encode(IN element, OutputStream stream) throws IOException {
+		if (charset == null) {
+			charset = Charset.forName(charsetName);
+		}
+
+		stream.write(element.toString().getBytes(charset));
+		stream.write('\n');
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
new file mode 100644
index 0000000..8bead11
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple serialization / deserialization methods for the {@link SimpleVersionedSerializer}.
+ */
+@PublicEvolving
+public class SimpleVersionedSerialization {
+
+	/**
+	 * Serializes the version and datum into a stream.
+	 *
+	 * <p>Data serialized via this method can be deserialized via
+	 * {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, DataInputView)}.
+	 *
+	 * <p>The first four bytes will be occupied by the version, as returned by
+	 * {@link SimpleVersionedSerializer#getVersion()}. The remaining bytes will be the serialized
+	 * datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}, plus its length.
+	 * The resulting array will hence be eight bytes larger than the serialized datum.
+	 *
+	 * @param serializer The serializer to serialize the datum with.
+	 * @param datum The datum to serialize.
+	 * @param out The stream to serialize to.
+	 */
+	public static <T> void writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum, DataOutputView out) throws IOException {
+		checkNotNull(serializer, "serializer");
+		checkNotNull(datum, "datum");
+		checkNotNull(out, "out");
+
+		final byte[] data = serializer.serialize(datum);
+
+		out.writeInt(serializer.getVersion());
+		out.writeInt(data.length);
+		out.write(data);
+	}
+
+	/**
+	 * Deserializes the version and datum from a stream.
+	 *
+	 * <p>This method deserializes data serialized via
+	 * {@link #writeVersionAndSerialize(SimpleVersionedSerializer, Object, DataOutputView)}.
+	 *
+	 * <p>The first four bytes will be interpreted as the version. The next four bytes will be
+	 * interpreted as the length of the datum bytes, then length-many bytes will be read.
+	 * Finally, the datum is deserialized via the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
+	 * method.
+	 *
+	 * @param serializer The serializer to serialize the datum with.
+	 * @param in The stream to deserialize from.
+	 */
+	public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, DataInputView in) throws IOException {
+		checkNotNull(serializer, "serializer");
+		checkNotNull(in, "in");
+
+		final int version = in.readInt();
+		final int length = in.readInt();
+		final byte[] data = new byte[length];
+		in.readFully(data);
+
+		return serializer.deserialize(version, data);
+	}
+
+	/**
+	 * Serializes the version and datum into a byte array. The first four bytes will be occupied by
+	 * the version (as returned by {@link SimpleVersionedSerializer#getVersion()}),
+	 * written in <i>big-endian</i> encoding. The remaining bytes will be the serialized
+	 * datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}. The resulting array
+	 * will hence be four bytes larger than the serialized datum.
+	 *
+	 * <p>Data serialized via this method can be deserialized via
+	 * {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, byte[])}.
+	 *
+	 * @param serializer The serializer to serialize the datum with.
+	 * @param datum The datum to serialize.
+	 *
+	 * @return A byte array containing the serialized version and serialized datum.
+	 *
+	 * @throws IOException Exceptions from the {@link SimpleVersionedSerializer#serialize(Object)}
+	 *                     method are forwarded.
+	 */
+	public static <T> byte[] writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum) throws IOException {
+		checkNotNull(serializer, "serializer");
+		checkNotNull(datum, "datum");
+
+		final byte[] data = serializer.serialize(datum);
+		final byte[] versionAndData = new byte[data.length + 4];
+
+		final int version = serializer.getVersion();
+		versionAndData[0] = (byte) (version >> 24);
+		versionAndData[1] = (byte) (version >> 16);
+		versionAndData[2] = (byte) (version >>  8);
+		versionAndData[3] = (byte)  version;
+
+		// move the data to the array
+		System.arraycopy(data, 0, versionAndData, 4, data.length);
+
+		return versionAndData;
+	}
+
+	/**
+	 * Deserializes the version and datum from a byte array. The first four bytes will be read as
+	 * the version, in <i>big-endian</i> encoding. The remaining bytes will be passed to the serializer
+	 * for deserialization, via {@link SimpleVersionedSerializer#deserialize(int, byte[])}.
+	 *
+	 * @param serializer The serializer to deserialize the datum with.
+	 * @param bytes The bytes to deserialize from.
+	 *
+	 * @return The deserialized datum.
+	 *
+	 * @throws IOException Exceptions from the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
+	 *                     method are forwarded.
+	 */
+	public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, byte[] bytes) throws IOException {
+		checkNotNull(serializer, "serializer");
+		checkNotNull(bytes, "bytes");
+		checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
+
+		final byte[] dataOnly = Arrays.copyOfRange(bytes, 4, bytes.length);
+		final int version =
+				((bytes[0] & 0xff) << 24) |
+						((bytes[1] & 0xff) << 16) |
+						((bytes[2] & 0xff) <<  8) |
+						(bytes[3] & 0xff);
+
+		return serializer.deserialize(version, dataOnly);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** Utility class, not meant to be instantiated. */
+	private SimpleVersionedSerialization() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
index 4dfeea2..07ba12d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -24,11 +24,11 @@ import java.io.IOException;
 
 /**
  * A simple serializer interface for versioned serialization.
- * 
+ *
  * <p>The serializer has a version (returned by {@link #getVersion()}) which can be attached
  * to the serialized data. When the serializer evolves, the version can be used to identify
  * with which prior version the data was serialized.
- * 
+ *
  * <pre>{@code
  * MyType someObject = ...;
  * SimpleVersionedSerializer<MyType> serializer = ...;
@@ -37,13 +37,13 @@ import java.io.IOException;
  * int version = serializer.getVersion();
  *
  * MyType deserialized = serializer.deserialize(version, serializedData);
- * 
+ *
  * byte[] someOldData = ...;
  * int oldVersion = ...;
  * MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData);
- * 
+ *
  * }</pre>
- * 
+ *
  * @param <E> The data type serialized / deserialized by this serializer.
  */
 @Internal
@@ -51,7 +51,7 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
 
 	/**
 	 * Gets the version with which this serializer serializes.
-	 * 
+	 *
 	 * @return The version of the serialization schema.
 	 */
 	@Override
@@ -61,10 +61,9 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
 	 * Serializes the given object. The serialization is assumed to correspond to the
 	 * current serialization version (as returned by {@link #getVersion()}.
 	 *
-	 * 
 	 * @param obj The object to serialize.
 	 * @return The serialized data (bytes).
-	 * 
+	 *
 	 * @throws IOException Thrown, if the serialization fails.
 	 */
 	byte[] serialize(E obj) throws IOException;
@@ -72,11 +71,11 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
 	/**
 	 * De-serializes the given data (bytes) which was serialized with the scheme of the
 	 * indicated version.
-	 * 
+	 *
 	 * @param version The version in which the data was serialized
 	 * @param serialized The serialized data
 	 * @return The deserialized object
-	 * 
+	 *
 	 * @throws IOException Thrown, if the deserialization fails.
 	 */
 	E deserialize(int version, byte[] serialized) throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
new file mode 100644
index 0000000..89a6b27
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for the {@link SimpleVersionedSerialization} class.
+ */
+public class SimpleVersionedSerializationTest {
+
+	@Test
+	public void testStreamSerializationRoundTrip() throws IOException {
+		final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
+
+			private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
+
+			@Override
+			public int getVersion() {
+				return VERSION;
+			}
+
+			@Override
+			public byte[] serialize(String str) throws IOException {
+				return str.getBytes(StandardCharsets.UTF_8);
+			}
+
+			@Override
+			public String deserialize(int version, byte[] serialized) throws IOException {
+				assertEquals(VERSION, version);
+				return new String(serialized, StandardCharsets.UTF_8);
+			}
+		};
+
+		final String testString = "dugfakgs";
+		final DataOutputSerializer out = new DataOutputSerializer(32);
+		SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString, out);
+
+		final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, in);
+		assertEquals(testString, deserialized);
+	}
+
+	@Test
+	public void testStreamSerializeEmpty() throws IOException {
+		final String testString = "beeeep!";
+
+		SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
+
+			@Override
+			public int getVersion() {
+				return 42;
+			}
+
+			@Override
+			public byte[] serialize(String obj) throws IOException {
+				return new byte[0];
+			}
+
+			@Override
+			public String deserialize(int version, byte[] serialized) throws IOException {
+				assertEquals(42, version);
+				assertEquals(0, serialized.length);
+				return testString;
+			}
+		};
+
+		final DataOutputSerializer out = new DataOutputSerializer(32);
+		SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc", out);
+
+		final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in));
+	}
+
+	@Test
+	public void testSerializationRoundTrip() throws IOException {
+		final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
+
+			private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
+
+			@Override
+			public int getVersion() {
+				return VERSION;
+			}
+
+			@Override
+			public byte[] serialize(String str) throws IOException {
+				return str.getBytes(StandardCharsets.UTF_8);
+			}
+
+			@Override
+			public String deserialize(int version, byte[] serialized) throws IOException {
+				assertEquals(VERSION, version);
+				return new String(serialized, StandardCharsets.UTF_8);
+			}
+		};
+
+		final String testString = "dugfakgs";
+		byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
+
+		final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, serialized);
+		assertEquals(testString, deserialized);
+	}
+
+	@Test
+	public void testSerializeEmpty() throws IOException {
+		final String testString = "beeeep!";
+
+		SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
+
+			@Override
+			public int getVersion() {
+				return 42;
+			}
+
+			@Override
+			public byte[] serialize(String obj) throws IOException {
+				return new byte[0];
+			}
+
+			@Override
+			public String deserialize(int version, byte[] serialized) throws IOException {
+				assertEquals(42, version);
+				assertEquals(0, serialized.length);
+				return testString;
+			}
+		};
+
+		byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
+		assertNotNull(serialized);
+
+		assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, serialized));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
new file mode 100644
index 0000000..d9a6d75
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
+ *
+ * <p>For each incoming  element in the {@code BucketingSink}, the user-specified
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is
+ * queried to see in which bucket this element should be written to.
+ */
+@PublicEvolving
+public class Bucket<IN> {
+
+	private static final String PART_PREFIX = "part";
+
+	private final String bucketId;
+
+	private final Path bucketPath;
+
+	private final int subtaskIndex;
+
+	private final Encoder<IN> encoder;
+
+	private final RecoverableWriter fsWriter;
+
+	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint = new HashMap<>();
+
+	private long partCounter;
+
+	private PartFileHandler<IN> currentPart;
+
+	private List<RecoverableWriter.CommitRecoverable> pending;
+
+	/**
+	 * Constructor to restore a bucket from checkpointed state.
+	 */
+	public Bucket(
+			RecoverableWriter fsWriter,
+			int subtaskIndex,
+			long initialPartCounter,
+			Encoder<IN> writer,
+			BucketState bucketstate) throws IOException {
+
+		this(fsWriter, subtaskIndex, bucketstate.getBucketId(), bucketstate.getBucketPath(), initialPartCounter, writer);
+
+		// the constructor must have already initialized the filesystem writer
+		Preconditions.checkState(fsWriter != null);
+
+		// we try to resume the previous in-progress file, if the filesystem
+		// supports such operation. If not, we just commit the file and start fresh.
+
+		final RecoverableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress();
+		if (resumable != null) {
+			currentPart = PartFileHandler.resumeFrom(
+					bucketId, fsWriter, resumable, bucketstate.getCreationTime());
+		}
+
+		// we commit pending files for previous checkpoints to the last successful one
+		// (from which we are recovering from)
+		for (List<RecoverableWriter.CommitRecoverable> commitables: bucketstate.getPendingPerCheckpoint().values()) {
+			for (RecoverableWriter.CommitRecoverable commitable: commitables) {
+				fsWriter.recoverForCommit(commitable).commitAfterRecovery();
+			}
+		}
+	}
+
+	/**
+	 * Constructor to create a new empty bucket.
+	 */
+	public Bucket(
+			RecoverableWriter fsWriter,
+			int subtaskIndex,
+			String bucketId,
+			Path bucketPath,
+			long initialPartCounter,
+			Encoder<IN> writer) {
+
+		this.fsWriter = Preconditions.checkNotNull(fsWriter);
+		this.subtaskIndex = subtaskIndex;
+		this.bucketId = Preconditions.checkNotNull(bucketId);
+		this.bucketPath = Preconditions.checkNotNull(bucketPath);
+		this.partCounter = initialPartCounter;
+		this.encoder = Preconditions.checkNotNull(writer);
+
+		this.pending = new ArrayList<>();
+	}
+
+	public PartFileInfo getInProgressPartInfo() {
+		return currentPart;
+	}
+
+	public String getBucketId() {
+		return bucketId;
+	}
+
+	public Path getBucketPath() {
+		return bucketPath;
+	}
+
+	public long getPartCounter() {
+		return partCounter;
+	}
+
+	public boolean isActive() {
+		return currentPart != null || !pending.isEmpty() || !pendingPerCheckpoint.isEmpty();
+	}
+
+	void write(IN element, long currentTime) throws IOException {
+		Preconditions.checkState(currentPart != null, "bucket has been closed");
+		currentPart.write(element, encoder, currentTime);
+	}
+
+	void rollPartFile(final long currentTime) throws IOException {
+		closePartFile();
+		currentPart = PartFileHandler.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
+		partCounter++;
+	}
+
+	void merge(final Bucket<IN> bucket) throws IOException {
+		Preconditions.checkNotNull(bucket);
+		Preconditions.checkState(bucket.getBucketPath().equals(getBucketPath()));
+
+		// there should be no pending files in the "to-merge" states.
+		Preconditions.checkState(bucket.pending.isEmpty());
+		Preconditions.checkState(bucket.pendingPerCheckpoint.isEmpty());
+
+		RecoverableWriter.CommitRecoverable commitable = bucket.closePartFile();
+		if (commitable != null) {
+			pending.add(commitable);
+		}
+	}
+
+	RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
+		RecoverableWriter.CommitRecoverable commitable = null;
+		if (currentPart != null) {
+			commitable = currentPart.closeForCommit();
+			pending.add(commitable);
+			currentPart = null;
+		}
+		return commitable;
+	}
+
+	public void dispose() {
+		if (currentPart != null) {
+			currentPart.dispose();
+		}
+	}
+
+	public void commitUpToCheckpoint(long checkpointId) throws IOException {
+		Preconditions.checkNotNull(fsWriter);
+
+		Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
+				pendingPerCheckpoint.entrySet().iterator();
+
+		while (it.hasNext()) {
+			Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry = it.next();
+			if (entry.getKey() <= checkpointId) {
+				for (RecoverableWriter.CommitRecoverable commitable : entry.getValue()) {
+					fsWriter.recoverForCommit(commitable).commit();
+				}
+				it.remove();
+			}
+		}
+	}
+
+	public BucketState snapshot(long checkpointId) throws IOException {
+		RecoverableWriter.ResumeRecoverable resumable = null;
+		long creationTime = Long.MAX_VALUE;
+
+		if (currentPart != null) {
+			resumable = currentPart.persist();
+			creationTime = currentPart.getCreationTime();
+		}
+
+		if (!pending.isEmpty()) {
+			pendingPerCheckpoint.put(checkpointId, pending);
+			pending = new ArrayList<>();
+		}
+		return new BucketState(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
+	}
+
+	private Path getNewPartPath() {
+		return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
new file mode 100644
index 0000000..88f3c1a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A factory able to create {@link Bucket buckets} for the {@link StreamingFileSink}.
+ */
+@Internal
+public interface BucketFactory<IN> extends Serializable {
+
+	Bucket<IN> getNewBucket(
+			RecoverableWriter fsWriter,
+			int subtaskIndex,
+			String bucketId,
+			Path bucketPath,
+			long initialPartCounter,
+			Encoder<IN> writer) throws IOException;
+
+	Bucket<IN> restoreBucket(
+			RecoverableWriter fsWriter,
+			int subtaskIndex,
+			long initialPartCounter,
+			Encoder<IN> writer,
+			BucketState bucketstate) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
new file mode 100644
index 0000000..5ebc46c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The state of the {@link Bucket} that is to be checkpointed.
+ */
+@Internal
+public class BucketState {
+
+	private final String bucketId;
+
+	/**
+	 * The base path for the bucket, i.e. the directory where all the part files are stored.
+	 */
+	private final Path bucketPath;
+
+	/**
+	 * The creation time of the currently open part file, or {@code Long.MAX_VALUE} if there is no open part file.
+	 */
+	private final long creationTime;
+
+	/**
+	 * A {@link RecoverableWriter.ResumeRecoverable} for the currently open part file, or null
+	 * if there is no currently open part file.
+	 */
+	@Nullable
+	private final RecoverableWriter.ResumeRecoverable inProgress;
+
+	/**
+	 * The {@link RecoverableWriter.CommitRecoverable files} pending to be committed, organized by checkpoint id.
+	 */
+	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint;
+
+	public BucketState(
+			final String bucketId,
+			final Path bucketPath,
+			final long creationTime,
+			final @Nullable RecoverableWriter.ResumeRecoverable inProgress,
+			final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
+	) {
+		this.bucketId = Preconditions.checkNotNull(bucketId);
+		this.bucketPath = Preconditions.checkNotNull(bucketPath);
+		this.creationTime = creationTime;
+		this.inProgress = inProgress;
+		this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint);
+	}
+
+	public String getBucketId() {
+		return bucketId;
+	}
+
+	public Path getBucketPath() {
+		return bucketPath;
+	}
+
+	public long getCreationTime() {
+		return creationTime;
+	}
+
+	@Nullable
+	public RecoverableWriter.ResumeRecoverable getCurrentInProgress() {
+		return inProgress;
+	}
+
+	public Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPerCheckpoint() {
+		return pendingPerCheckpoint;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
new file mode 100644
index 0000000..a167ec9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * A {@code SimpleVersionedSerializer} used to serialize the {@link BucketState BucketState}.
+ */
+@Internal
+class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
+
+	private static final int MAGIC_NUMBER = 0x1e764b79;
+
+	private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
+
+	private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
+
+	public BucketStateSerializer(
+			final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
+			final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer) {
+
+		this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
+		this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return 1;
+	}
+
+	@Override
+	public byte[] serialize(BucketState state) throws IOException {
+		DataOutputSerializer out = new DataOutputSerializer(256);
+		out.writeInt(MAGIC_NUMBER);
+		serializeV1(state, out);
+		return out.getCopyOfBuffer();
+	}
+
+	@Override
+	public BucketState deserialize(int version, byte[] serialized) throws IOException {
+		switch (version) {
+			case 1:
+				DataInputDeserializer in = new DataInputDeserializer(serialized);
+				validateMagicNumber(in);
+				return deserializeV1(in);
+			default:
+				throw new IOException("Unrecognized version or corrupt state: " + version);
+		}
+	}
+
+	@VisibleForTesting
+	void serializeV1(BucketState state, DataOutputView out) throws IOException {
+		out.writeUTF(state.getBucketId());
+		out.writeUTF(state.getBucketPath().toString());
+		out.writeLong(state.getCreationTime());
+
+		// put the current open part file
+		final RecoverableWriter.ResumeRecoverable currentPart = state.getCurrentInProgress();
+		if (currentPart != null) {
+			out.writeBoolean(true);
+			SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, currentPart, out);
+		}
+		else {
+			out.writeBoolean(false);
+		}
+
+		// put the map of pending files per checkpoint
+		final Map<Long, List<CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint();
+
+		// manually keep the version here to safe some bytes
+		out.writeInt(commitableSerializer.getVersion());
+
+		out.writeInt(pendingCommitters.size());
+		for (Entry<Long, List<CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
+			List<CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
+
+			out.writeLong(resumablesForCheckpoint.getKey());
+			out.writeInt(resumables.size());
+
+			for (CommitRecoverable resumable : resumables) {
+				byte[] serialized = commitableSerializer.serialize(resumable);
+				out.writeInt(serialized.length);
+				out.write(serialized);
+			}
+		}
+	}
+
+	@VisibleForTesting
+	BucketState deserializeV1(DataInputView in) throws IOException {
+		final String bucketId = in.readUTF();
+		final String bucketPathStr = in.readUTF();
+		final long creationTime = in.readLong();
+
+		// then get the current resumable stream
+		RecoverableWriter.ResumeRecoverable current = null;
+		if (in.readBoolean()) {
+			current = SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in);
+		}
+
+		final int committableVersion = in.readInt();
+		final int numCheckpoints = in.readInt();
+		final HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<>(numCheckpoints);
+
+		for (int i = 0; i < numCheckpoints; i++) {
+			final long checkpointId = in.readLong();
+			final int noOfResumables = in.readInt();
+
+			final ArrayList<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
+			for (int j = 0; j < noOfResumables; j++) {
+				final byte[] bytes = new byte[in.readInt()];
+				in.readFully(bytes);
+				resumables.add(commitableSerializer.deserialize(committableVersion, bytes));
+			}
+			resumablesPerCheckpoint.put(checkpointId, resumables);
+		}
+
+		return new BucketState(
+				bucketId,
+				new Path(bucketPathStr),
+				creationTime,
+				current,
+				resumablesPerCheckpoint);
+	}
+
+	private static void validateMagicNumber(DataInputView in) throws IOException {
+		final int magicNumber = in.readInt();
+		if (magicNumber != MAGIC_NUMBER) {
+			throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
new file mode 100644
index 0000000..795ba74
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+
+/**
+ * A factory returning {@link Bucket buckets}.
+ */
+@Internal
+public class DefaultBucketFactory<IN> implements BucketFactory<IN> {
+
+	private static final long serialVersionUID = 3372881359208513357L;
+
+	@Override
+	public Bucket<IN> getNewBucket(
+			RecoverableWriter fsWriter,
+			int subtaskIndex,
+			String bucketId,
+			Path bucketPath,
+			long initialPartCounter,
+			Encoder<IN> writer) throws IOException {
+
+		return new Bucket<>(
+				fsWriter,
+				subtaskIndex,
+				bucketId,
+				bucketPath,
+				initialPartCounter,
+				writer);
+	}
+
+	@Override
+	public Bucket<IN> restoreBucket(
+			RecoverableWriter fsWriter,
+			int subtaskIndex,
+			long initialPartCounter,
+			Encoder<IN> writer,
+			BucketState bucketState) throws IOException {
+
+		return new Bucket<>(
+				fsWriter,
+				subtaskIndex,
+				initialPartCounter,
+				writer,
+				bucketState);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
new file mode 100644
index 0000000..026ac70
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The default implementation of the {@link RollingPolicy}.
+ *
+ * <p>This policy rolls a part file if:
+ * <ol>
+ *     <li>there is no open part file,</li>
+ * 	   <li>the current file has reached the maximum bucket size (by default 128MB),</li>
+ * 	   <li>the current file is older than the roll over interval (by default 60 sec), or</li>
+ * 	   <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li>
+ * </ol>
+ */
+@PublicEvolving
+public final class DefaultRollingPolicy implements RollingPolicy {
+
+	private static final long serialVersionUID = 1318929857047767030L;
+
+	private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
+
+	private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
+
+	private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
+
+	private final long partSize;
+
+	private final long rolloverInterval;
+
+	private final long inactivityInterval;
+
+	/**
+	 * Private constructor to avoid direct instantiation.
+	 */
+	private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) {
+		Preconditions.checkArgument(partSize > 0L);
+		Preconditions.checkArgument(rolloverInterval > 0L);
+		Preconditions.checkArgument(inactivityInterval > 0L);
+
+		this.partSize = partSize;
+		this.rolloverInterval = rolloverInterval;
+		this.inactivityInterval = inactivityInterval;
+	}
+
+	@Override
+	public boolean shouldRoll(final PartFileInfo state, final long currentTime) throws IOException {
+		if (state == null) {
+			// this means that there is no currently open part file.
+			return true;
+		}
+
+		if (state.getSize() > partSize) {
+			return true;
+		}
+
+		if (currentTime - state.getCreationTime() > rolloverInterval) {
+			return true;
+		}
+
+		return currentTime - state.getLastUpdateTime() > inactivityInterval;
+	}
+
+	/**
+	 * Initiates the instantiation of a {@link DefaultRollingPolicy}.
+	 * To finalize it and have the actual policy, call {@code .create()}.
+	 */
+	public static PolicyBuilder create() {
+		return new PolicyBuilder();
+	}
+
+	/**
+	 * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
+	 */
+	@PublicEvolving
+	public static class PolicyBuilder {
+
+		private long partSize = DEFAULT_MAX_PART_SIZE;
+
+		private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+
+		private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+
+		/**
+		 * Sets the part size above which a part file will have to roll.
+		 * @param size the allowed part size.
+		 */
+		public PolicyBuilder withMaxPartSize(long size) {
+			Preconditions.checkState(size > 0L);
+			this.partSize = size;
+			return this;
+		}
+
+		/**
+		 * Sets the interval of allowed inactivity after which a part file will have to roll.
+		 * @param interval the allowed inactivity interval.
+		 */
+		public PolicyBuilder withInactivityInterval(long interval) {
+			Preconditions.checkState(interval > 0L);
+			this.inactivityInterval = interval;
+			return this;
+		}
+
+		/**
+		 * Sets the max time a part file can stay open before having to roll.
+		 * @param interval the desired rollover interval.
+		 */
+		public PolicyBuilder withRolloverInterval(long interval) {
+			Preconditions.checkState(interval > 0L);
+			this.rolloverInterval = interval;
+			return this;
+		}
+
+		/**
+		 * Creates the actual policy.
+		 */
+		public DefaultRollingPolicy build() {
+			return new DefaultRollingPolicy(partSize, rolloverInterval, inactivityInterval);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
new file mode 100644
index 0000000..10fd12b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A handler for the currently open part file in a specific {@link Bucket}.
+ * This also implements the {@link PartFileInfo}.
+ */
+@Internal
+class PartFileHandler<IN> implements PartFileInfo {
+
+	private final String bucketId;
+
+	private final long creationTime;
+
+	private final RecoverableFsDataOutputStream currentPartStream;
+
+	private long lastUpdateTime;
+
+	private PartFileHandler(
+			final String bucketId,
+			final RecoverableFsDataOutputStream currentPartStream,
+			final long creationTime) {
+
+		Preconditions.checkArgument(creationTime >= 0L);
+		this.bucketId = Preconditions.checkNotNull(bucketId);
+		this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
+		this.creationTime = creationTime;
+		this.lastUpdateTime = creationTime;
+	}
+
+	public static <IN> PartFileHandler<IN> resumeFrom(
+			final String bucketId,
+			final RecoverableWriter fileSystemWriter,
+			final RecoverableWriter.ResumeRecoverable resumable,
+			final long creationTime) throws IOException {
+		Preconditions.checkNotNull(bucketId);
+		Preconditions.checkNotNull(fileSystemWriter);
+		Preconditions.checkNotNull(resumable);
+
+		final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
+		return new PartFileHandler<>(bucketId, stream, creationTime);
+	}
+
+	public static <IN> PartFileHandler<IN> openNew(
+			final String bucketId,
+			final RecoverableWriter fileSystemWriter,
+			final Path path,
+			final long creationTime) throws IOException {
+		Preconditions.checkNotNull(bucketId);
+		Preconditions.checkNotNull(fileSystemWriter);
+		Preconditions.checkNotNull(path);
+
+		final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
+		return new PartFileHandler<>(bucketId, stream, creationTime);
+	}
+
+	void write(IN element, Encoder<IN> encoder, long currentTime) throws IOException {
+		encoder.encode(element, currentPartStream);
+		this.lastUpdateTime = currentTime;
+	}
+
+	RecoverableWriter.ResumeRecoverable persist() throws IOException {
+		return currentPartStream.persist();
+	}
+
+	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+		return currentPartStream.closeForCommit().getRecoverable();
+	}
+
+	void dispose() {
+		// we can suppress exceptions here, because we do not rely on close() to
+		// flush or persist any data
+		IOUtils.closeQuietly(currentPartStream);
+	}
+
+	@Override
+	public String getBucketId() {
+		return bucketId;
+	}
+
+	@Override
+	public long getCreationTime() {
+		return creationTime;
+	}
+
+	@Override
+	public long getSize() throws IOException {
+		return currentPartStream.getPos();
+	}
+
+	@Override
+	public long getLastUpdateTime() {
+		return lastUpdateTime;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
new file mode 100644
index 0000000..9c3d047
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+
+import java.io.IOException;
+
+/**
+ * An interface exposing the information concerning the current (open) part file
+ * that is necessary to the {@link RollingPolicy} in order to determine if it
+ * should roll the part file or not.
+ */
+@PublicEvolving
+public interface PartFileInfo {
+
+	/**
+	 * @return The bucket identifier of the current buffer, as returned by the
+	 * {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
+	 */
+	String getBucketId();
+
+	/**
+	 * @return The creation time (in ms) of the currently open part file.
+	 */
+	long getCreationTime();
+
+	/**
+	 * @return The size of the currently open part file.
+	 */
+	long getSize() throws IOException;
+
+	/**
+	 * @return The last time (in ms) the currently open part file was written to.
+	 */
+	long getLastUpdateTime();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
new file mode 100644
index 0000000..936377e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The policy based on which a {@link Bucket} in the {@link StreamingFileSink}
+ * rolls its currently open part file and opens a new one.
+ */
+@PublicEvolving
+public interface RollingPolicy extends Serializable {
+
+	/**
+	 * Determines if the in-progress part file for a bucket should roll.
+	 * @param partFileState the state of the currently open part file of the bucket.
+	 * @param currentTime the current processing time.
+	 * @return {@code True} if the part file should roll, {@link false} otherwise.
+	 */
+	boolean shouldRoll(final PartFileInfo partFileState, final long currentTime) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
new file mode 100644
index 0000000..b6fff03
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once semantics.
+ *
+ *
+ * <p>When creating the sink a {@code basePath} must be specified. The base directory contains
+ * one directory for every bucket. The bucket directories themselves contain several part files,
+ * with at least one for each parallel subtask of the sink which is writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * <p>The sink uses a {@link Bucketer} to determine in which bucket directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for example, use time or
+ * a property of the element to determine the bucket directory. The default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * <p>The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask.
+ * Part files roll based on the user-specified {@link RollingPolicy}. By default, a {@link DefaultRollingPolicy}
+ * is used.
+ *
+ * <p>In some scenarios, the open buckets are required to change based on time. In these cases, the user
+ * can specify a {@code bucketCheckInterval} (by default 1m) and the sink will check periodically and roll
+ * the part file if the specified rolling policy says so.
+ *
+ * <p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
+ * The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
+ * semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
+ * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
+ * pending files will be moved to {@code finished}.
+ *
+ *
+ * <p>If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
+ * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending}
+ * state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
+ * they do not contain data that arrived after the checkpoint from which we restore.
+ *
+ * <p><b>NOTE:</b>
+ * <ol>
+ *     <li>
+ *         If checkpointing is not enabled the pending files will never be moved to the finished state.
+ *     </li>
+ *     <li>
+ *         The part files are written using an instance of {@link Encoder}. By default, a
+ *         {@link SimpleStringEncoder} is used, which writes the result of {@code toString()} for
+ *         every element, separated by newlines. You can configure the writer using the
+ *         {@link #setEncoder(Encoder)}.
+ *     </li>
+ * </ol>
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+@PublicEvolving
+public class StreamingFileSink<IN>
+		extends RichSinkFunction<IN>
+		implements CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamingFileSink.class);
+
+	// -------------------------- state descriptors ---------------------------
+
+	private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
+			new ListStateDescriptor<>("bucket-states", BytePrimitiveArraySerializer.INSTANCE);
+
+	private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC =
+			new ListStateDescriptor<>("max-part-counter", LongSerializer.INSTANCE);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final BucketFactory<IN> bucketFactory;
+
+	private long bucketCheckInterval = 60L * 1000L;
+
+	private Bucketer<IN> bucketer;
+
+	private Encoder<IN> encoder;
+
+	private RollingPolicy rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private transient BucketerContext bucketerContext;
+
+	private transient RecoverableWriter fileSystemWriter;
+
+	private transient ProcessingTimeService processingTimeService;
+
+	private transient Map<String, Bucket<IN>> activeBuckets;
+
+	//////////////////			State Related Fields			/////////////////////
+
+	private transient BucketStateSerializer bucketStateSerializer;
+
+	private transient ListState<byte[]> restoredBucketStates;
+
+	private transient ListState<Long> restoredMaxCounters;
+
+	private transient long initMaxPartCounter;
+
+	private transient long maxPartCounterUsed;
+
+	/**
+	 * Creates a new {@code StreamingFileSink} that writes files to the given base directory.
+	 *
+	 * <p>This uses a {@link DateTimeBucketer} as {@link Bucketer} and a {@link SimpleStringEncoder} as a writer.
+	 *
+	 * @param basePath The directory to which to write the bucket files.
+	 */
+	public StreamingFileSink(Path basePath) {
+		this(basePath, new DefaultBucketFactory<>());
+	}
+
+	@VisibleForTesting
+	StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) {
+		this.basePath = Preconditions.checkNotNull(basePath);
+		this.bucketer = new DateTimeBucketer<>();
+		this.encoder = new SimpleStringEncoder<>();
+		this.rollingPolicy = DefaultRollingPolicy.create().build();
+		this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+	}
+
+	public StreamingFileSink<IN> setEncoder(Encoder<IN> encoder) {
+		this.encoder = Preconditions.checkNotNull(encoder);
+		return this;
+	}
+
+	public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) {
+		this.bucketer = Preconditions.checkNotNull(bucketer);
+		return this;
+	}
+
+	public StreamingFileSink<IN> setBucketCheckInterval(long interval) {
+		this.bucketCheckInterval = interval;
+		return this;
+	}
+
+	public StreamingFileSink<IN> setRollingPolicy(RollingPolicy policy) {
+		this.rollingPolicy = Preconditions.checkNotNull(policy);
+		return this;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		final Iterator<Map.Entry<String, Bucket<IN>>> activeBucketIt =
+				activeBuckets.entrySet().iterator();
+
+		while (activeBucketIt.hasNext()) {
+			Bucket<IN> bucket = activeBucketIt.next().getValue();
+			bucket.commitUpToCheckpoint(checkpointId);
+
+			if (!bucket.isActive()) {
+				// We've dealt with all the pending files and the writer for this bucket is not currently open.
+				// Therefore this bucket is currently inactive and we can remove it from our state.
+				activeBucketIt.remove();
+			}
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		Preconditions.checkState(
+				restoredBucketStates != null && fileSystemWriter != null && bucketStateSerializer != null,
+				"sink has not been initialized");
+
+		restoredBucketStates.clear();
+		for (Bucket<IN> bucket : activeBuckets.values()) {
+
+			final PartFileInfo info = bucket.getInProgressPartInfo();
+			final long checkpointTimestamp = context.getCheckpointTimestamp();
+
+			if (info != null && rollingPolicy.shouldRoll(info, checkpointTimestamp)) {
+				// we also check here so that we do not have to always
+				// wait for the "next" element to arrive.
+				bucket.closePartFile();
+			}
+
+			final BucketState bucketState = bucket.snapshot(context.getCheckpointId());
+			restoredBucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
+		}
+
+		restoredMaxCounters.clear();
+		restoredMaxCounters.add(maxPartCounterUsed);
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		initFileSystemWriter();
+
+		this.activeBuckets = new HashMap<>();
+
+		// When resuming after a failure:
+		// 1) we get the max part counter used before in order to make sure that we do not overwrite valid data
+		// 2) we commit any pending files for previous checkpoints (previous to the last successful one)
+		// 3) we resume writing to the previous in-progress file of each bucket, and
+		// 4) if we receive multiple states for the same bucket, we merge them.
+
+		final OperatorStateStore stateStore = context.getOperatorStateStore();
+
+		restoredBucketStates = stateStore.getListState(BUCKET_STATE_DESC);
+		restoredMaxCounters = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+
+		if (context.isRestored()) {
+			final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+			LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
+
+			long maxCounter = 0L;
+			for (long partCounter: restoredMaxCounters.get()) {
+				maxCounter = Math.max(partCounter, maxCounter);
+			}
+			initMaxPartCounter = maxCounter;
+
+			for (byte[] recoveredState : restoredBucketStates.get()) {
+				final BucketState bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize(
+						bucketStateSerializer, recoveredState);
+
+				final String bucketId = bucketState.getBucketId();
+
+				LOG.info("Recovered bucket for {}", bucketId);
+
+				final Bucket<IN> restoredBucket = bucketFactory.restoreBucket(
+						fileSystemWriter,
+						subtaskIndex,
+						initMaxPartCounter,
+						encoder,
+						bucketState
+				);
+
+				final Bucket<IN> existingBucket = activeBuckets.get(bucketId);
+				if (existingBucket == null) {
+					activeBuckets.put(bucketId, restoredBucket);
+				} else {
+					existingBucket.merge(restoredBucket);
+				}
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(),
+							subtaskIndex, assembleBucketPath(bucketId));
+				}
+			}
+		}
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
+		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
+		processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
+		this.bucketerContext = new BucketerContext();
+	}
+
+	@Override
+	public void onProcessingTime(long timestamp) throws Exception {
+		final long currentTime = processingTimeService.getCurrentProcessingTime();
+		for (Bucket<IN> bucket : activeBuckets.values()) {
+			final PartFileInfo info = bucket.getInProgressPartInfo();
+			if (info != null && rollingPolicy.shouldRoll(info, currentTime)) {
+				bucket.closePartFile();
+			}
+		}
+		processingTimeService.registerTimer(timestamp + bucketCheckInterval, this);
+	}
+
+	@Override
+	public void invoke(IN value, Context context) throws Exception {
+		final long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
+		final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+		// setting the values in the bucketer context
+		bucketerContext.update(context.timestamp(), currentProcessingTime, context.currentWatermark());
+
+		final String bucketId = bucketer.getBucketId(value, bucketerContext);
+
+		Bucket<IN> bucket = activeBuckets.get(bucketId);
+		if (bucket == null) {
+			final Path bucketPath = assembleBucketPath(bucketId);
+			bucket = bucketFactory.getNewBucket(
+					fileSystemWriter,
+					subtaskIndex,
+					bucketId,
+					bucketPath,
+					initMaxPartCounter,
+					encoder);
+			activeBuckets.put(bucketId, bucket);
+		}
+
+		final PartFileInfo info = bucket.getInProgressPartInfo();
+		if (info == null || rollingPolicy.shouldRoll(info, currentProcessingTime)) {
+			bucket.rollPartFile(currentProcessingTime);
+		}
+		bucket.write(value, currentProcessingTime);
+
+		// we update the counter here because as buckets become inactive and
+		// get removed in the initializeState(), at the time we snapshot they
+		// may not be there to take them into account during checkpointing.
+		updateMaxPartCounter(bucket.getPartCounter());
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (activeBuckets != null) {
+			activeBuckets.values().forEach(Bucket::dispose);
+		}
+	}
+
+	private void initFileSystemWriter() throws IOException {
+		if (fileSystemWriter == null) {
+			fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+			bucketStateSerializer = new BucketStateSerializer(
+					fileSystemWriter.getResumeRecoverableSerializer(),
+					fileSystemWriter.getCommitRecoverableSerializer()
+			);
+		}
+	}
+
+	private void updateMaxPartCounter(long candidate) {
+		maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
+	}
+
+	private Path assembleBucketPath(String bucketId) {
+		return new Path(basePath, bucketId);
+	}
+
+	/**
+	 * The {@link Bucketer.Context} exposed to the
+	 * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
+	 * whenever a new incoming element arrives.
+	 */
+	private static class BucketerContext implements Bucketer.Context {
+
+		@Nullable
+		private Long elementTimestamp;
+
+		private long currentWatermark;
+
+		private long currentProcessingTime;
+
+		void update(@Nullable Long elementTimestamp, long currentWatermark, long currentProcessingTime) {
+			this.elementTimestamp = elementTimestamp;
+			this.currentWatermark = currentWatermark;
+			this.currentProcessingTime = currentProcessingTime;
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return currentProcessingTime;
+		}
+
+		@Override
+		public long currentWatermark() {
+			return currentWatermark;
+		}
+
+		@Override
+		@Nullable
+		public Long timestamp() {
+			return elementTimestamp;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
new file mode 100644
index 0000000..5ffe152
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@link Bucketer} that does not perform any
+ * bucketing of files. All files are written to the base path.
+ */
+@PublicEvolving
+public class BasePathBucketer<T> implements Bucketer<T> {
+
+	private static final long serialVersionUID = -6033643155550226022L;
+
+	@Override
+	public String getBucketId(T element, Context context) {
+		return "";
+	}
+
+	@Override
+	public String toString() {
+		return "BasePathBucketer";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
new file mode 100644
index 0000000..5c30927
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * A bucketer is used with a {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}
+ * to determine the {@link org.apache.flink.streaming.api.functions.sink.filesystem.Bucket} each incoming element
+ * should be put into.
+ *
+ * <p>The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing
+ * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket the
+ * element should fall in. The {@code Bucketer} can, for example, determine buckets based on system time.
+ */
+@PublicEvolving
+public interface Bucketer<T> extends Serializable {
+
+	/**
+	 * Returns the identifier of the bucket the provided element should be put into.
+	 * @param element The current element being processed.
+	 * @param context The {@link SinkFunction.Context context} used by the
+	 *                {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
+	 *
+	 * @return A string representing the identifier of the bucket the element should be put into.
+	 * This actual path to the bucket will result from the concatenation of the returned string
+	 * and the {@code base path} provided during the initialization of the
+	 * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
+	 */
+	String getBucketId(T element, Context context);
+
+	/**
+	 * Context that the {@link Bucketer} can use for getting additional data about
+	 * an input record.
+	 *
+	 * <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Context)} call.
+	 * Do not store the context and use afterwards!
+	 */
+	@PublicEvolving
+	interface Context {
+
+		/**
+		 * Returns the current processing time.
+		 */
+		long currentProcessingTime();
+
+		/**
+		 * Returns the current event-time watermark.
+		 */
+		long currentWatermark();
+
+		/**
+		 * Returns the timestamp of the current input record or
+		 * {@code null} if the element does not have an assigned timestamp.
+		 */
+		@Nullable
+		Long timestamp();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
new file mode 100644
index 0000000..515468c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * A {@link Bucketer} that assigns to buckets based on current system time.
+ *
+ *
+ * <p>The {@code DateTimeBucketer} will create directories of the following form:
+ * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
+ * that was specified as a base path when creating the
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.
+ * The {@code dateTimePath} is determined based on the current system time and the
+ * user provided format string.
+ *
+ *
+ * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
+ * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
+ * files will have a granularity of hours.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
+ * }</pre>
+ *
+ * <p>This will create for example the following bucket path:
+ * {@code /base/1976-12-31-14/}
+ *
+ */
+@PublicEvolving
+public class DateTimeBucketer<T> implements Bucketer<T> {
+
+	private static final long serialVersionUID = 3284420879277893804L;
+
+	private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
+
+	private final String formatString;
+
+	private transient SimpleDateFormat dateFormatter;
+
+	/**
+	 * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
+	 */
+	public DateTimeBucketer() {
+		this(DEFAULT_FORMAT_STRING);
+	}
+
+	/**
+	 * Creates a new {@code DateTimeBucketer} with the given date/time format string.
+	 *
+	 * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
+	 *                     the bucket path.
+	 */
+	public DateTimeBucketer(String formatString) {
+		this.formatString = formatString;
+	}
+
+	@Override
+	public String getBucketId(T element, Context context) {
+		if (dateFormatter == null) {
+			dateFormatter = new SimpleDateFormat(formatString);
+		}
+		return dateFormatter.format(new Date(context.currentProcessingTime()));
+	}
+
+	@Override
+	public String toString() {
+		return "DateTimeBucketer{" +
+				"formatString='" + formatString + '\'' +
+				'}';
+	}
+}