You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/30 08:40:35 UTC

[2/2] flink git commit: [FLINK-4190] Generalise RollingSink to work with arbitrary buckets

[FLINK-4190] Generalise RollingSink to work with arbitrary buckets


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e99949a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e99949a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e99949a5

Branch: refs/heads/master
Commit: e99949a5cb976217b3c37b09705a40e4af009fe0
Parents: 8195001
Author: Josh <jo...@gmail.com>
Authored: Tue Jul 12 18:38:54 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 30 10:39:52 2016 +0200

----------------------------------------------------------------------
 docs/dev/connectors/filesystem_sink.md          |   53 +-
 .../flink-connector-filesystem/pom.xml          |    9 +
 .../connectors/fs/AvroKeyValueSinkWriter.java   |    6 +-
 .../flink/streaming/connectors/fs/Bucketer.java |    3 +
 .../connectors/fs/DateTimeBucketer.java         |    2 +
 .../connectors/fs/NonRollingBucketer.java       |    6 +-
 .../streaming/connectors/fs/RollingSink.java    |    8 +-
 .../connectors/fs/SequenceFileWriter.java       |    3 +-
 .../connectors/fs/StreamWriterBase.java         |    3 +-
 .../flink/streaming/connectors/fs/Writer.java   |    3 +-
 .../fs/bucketing/BasePathBucketer.java          |   39 +
 .../connectors/fs/bucketing/Bucketer.java       |   47 +
 .../connectors/fs/bucketing/BucketingSink.java  | 1034 ++++++++++++++++++
 .../fs/bucketing/DateTimeBucketer.java          |  103 ++
 .../fs/RollingSinkFaultTolerance2ITCase.java    |    3 +
 .../fs/RollingSinkFaultToleranceITCase.java     |    3 +
 .../connectors/fs/RollingSinkITCase.java        |    3 +
 .../BucketingSinkFaultTolerance2ITCase.java     |  289 +++++
 .../BucketingSinkFaultToleranceITCase.java      |  282 +++++
 .../fs/bucketing/BucketingSinkTest.java         |  511 +++++++++
 .../util/OneInputStreamOperatorTestHarness.java |   12 +-
 21 files changed, 2388 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index c6318e8..79cfe6a 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -23,7 +23,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This connector provides a Sink that writes rolling files to any filesystem supported by
+This connector provides a Sink that writes partitioned files to any filesystem supported by
 Hadoop FileSystem. To use this connector, add the
 following dependency to your project:
 
@@ -41,17 +41,17 @@ distribution. See
 for information about how to package the program with the libraries for
 cluster execution.
 
-#### Rolling File Sink
+#### Bucketing File Sink
 
-The rolling behaviour as well as the writing can be configured but we will get to that later.
-This is how you can create a default rolling sink:
+The bucketing behaviour as well as the writing can be configured but we will get to that later.
+This is how you can create a bucketing sick which by default, sinks to rolling files that are split by time:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 DataStream<String> input = ...;
 
-input.addSink(new RollingSink<String>("/base/path"));
+input.addSink(new BucketingSink<String>("/base/path"));
 
 {% endhighlight %}
 </div>
@@ -59,27 +59,34 @@ input.addSink(new RollingSink<String>("/base/path"));
 {% highlight scala %}
 val input: DataStream[String] = ...
 
-input.addSink(new RollingSink("/base/path"))
+input.addSink(new BucketingSink[String]("/base/path"))
 
 {% endhighlight %}
 </div>
 </div>
 
-The only required parameter is the base path where the rolling files (buckets) will be
-stored. The sink can be configured by specifying a custom bucketer, writer and batch size.
-
-By default the rolling sink will use the pattern `"yyyy-MM-dd--HH"` to name the rolling buckets.
-This pattern is passed to `SimpleDateFormat` with the current system time to form a bucket path. A
-new bucket will be created whenever the bucket path changes. For example, if you have a pattern
-that contains minutes as the finest granularity you will get a new bucket every minute.
-Each bucket is itself a directory that contains several part files: Each parallel instance
-of the sink will create its own part file and when part files get too big the sink will also
-create a new part file next to the others. To specify a custom bucketer use `setBucketer()`
-on a `RollingSink`.
+The only required parameter is the base path where the buckets will be
+stored. The sink can be further configured by specifying a custom bucketer, writer and batch size.
+
+By default the bucketing sink will split by the current system time when elements arrive and will
+use the datetime pattern `"yyyy-MM-dd--HH"` to name the buckets. This pattern is passed to
+`SimpleDateFormat` with the current system time to form a bucket path. A new bucket will be created
+whenever a new date is encountered. For example, if you have a pattern that contains minutes as the
+finest granularity you will get a new bucket every minute. Each bucket is itself a directory that
+contains several part files: each parallel instance of the sink will create its own part file and
+when part files get too big the sink will also create a new part file next to the others. When a
+bucket becomes inactive, the open part file will be flushed and closed. A bucket is regarded as
+inactive when it hasn't been written to recently. By default, the sink checks for inactive buckets
+every minute, and closes any buckets which haven't been written to for over a minute. This
+behaviour can be configured with `setInactiveBucketCheckInterval()` and
+`setInactiveBucketThreshold()` on a `BucketingSink`.
+
+You can also specify a custom bucketer by using `setBucketer()` on a `BucketingSink`. If desired,
+the bucketer can use a property of the element or tuple to determine the bucket directory.
 
 The default writer is `StringWriter`. This will call `toString()` on the incoming elements
 and write them to part files, separated by newline. To specify a custom writer use `setWriter()`
-on a `RollingSink`. If you want to write Hadoop SequenceFiles you can use the provided
+on a `BucketingSink`. If you want to write Hadoop SequenceFiles you can use the provided
 `SequenceFileWriter` which can also be configured to use compression.
 
 The last configuration option is the batch size. This specifies when a part file should be closed
@@ -92,8 +99,8 @@ Example:
 {% highlight java %}
 DataStream<Tuple2<IntWritable,Text>> input = ...;
 
-RollingSink sink = new RollingSink<String>("/base/path");
-sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
+BucketingSink<String> sink = new BucketingSink<String>("/base/path");
+sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
 sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
 sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
 
@@ -105,8 +112,8 @@ input.addSink(sink);
 {% highlight scala %}
 val input: DataStream[Tuple2[IntWritable, Text]] = ...
 
-val sink = new RollingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
+val sink = new BucketingSink[String]("/base/path")
+sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
 sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
 sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
 
@@ -127,4 +134,4 @@ of the parallel sink instance and `count` is the running number of part files th
 because of the batch size.
 
 For in-depth information, please refer to the JavaDoc for
-[RollingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html).
+[BucketingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
index 7aacb67..5712856 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
@@ -97,6 +97,15 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+
+		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index a8919c3..3e3c86b 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -46,8 +46,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 <pre>
 Usage:
 {@code
-		RollingSink<Tuple2<Long , Long>> sink = new RollingSink<Tuple2<Long , Long>>("/tmp/path");
-		sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"));
+		BucketingSink<Tuple2<Long, Long>> sink = new BucketingSink<Tuple2<Long, Long>>("/tmp/path");
+		sink.setBucketer(new DateTimeBucketer<Tuple2<Long, Long>>("yyyy-MM-dd/HH/mm/"));
 		sink.setPendingSuffix(".avro");
 		Map<String,String> properties = new HashMap<>();
 		Schema longSchema = Schema.create(Type.LONG);
@@ -58,7 +58,7 @@ Usage:
 		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true));
 		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
 		
-		sink.setWriter(new AvroSinkWriter<Long , Long>(properties));
+		sink.setWriter(new AvroSinkWriter<Long, Long>(properties));
 		sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
 }
 </pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
index 913da97..24ad6ab 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
@@ -30,7 +30,10 @@ import java.io.Serializable;
  * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
  * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
  * based on system time.
+ *
+ * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.Bucketer} instead.
  */
+@Deprecated
 public interface Bucketer extends Serializable {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
index 0be40f5..0df8998 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
@@ -52,7 +52,9 @@ import java.util.Date;
  * This will create for example the following bucket path:
  * {@code /base/1976-12-31-14/}
  *
+ * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead.
  */
+@Deprecated
 public class DateTimeBucketer implements Bucketer {
 
 	private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
index 1307d11..6854596 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
@@ -17,12 +17,16 @@
  */
 package org.apache.flink.streaming.connectors.fs;
 
+import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
 import org.apache.hadoop.fs.Path;
 
 /**
- * A {@link org.apache.flink.streaming.connectors.fs.Bucketer} that does not perform any
+ * A {@link Bucketer} that does not perform any
  * rolling of files. All files are written to the base path.
+ *
+ * @deprecated use {@link BasePathBucketer} instead.
  */
+@Deprecated
 public class NonRollingBucketer implements Bucketer {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 799e908..738857f 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -63,7 +64,7 @@ import java.util.UUID;
  * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with
  * date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer}
  * using {@link #setBucketer(Bucketer)}. For example, use
- * {@link org.apache.flink.streaming.connectors.fs.NonRollingBucketer} if you don't want to have
+ * {@link NonRollingBucketer} if you don't want to have
  * buckets but still write part files in a fault-tolerant way.
  *
  * <p>
@@ -114,12 +115,15 @@ import java.util.UUID;
  *
  * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
  *
- * @see org.apache.flink.streaming.connectors.fs.DateTimeBucketer
+ * @see DateTimeBucketer
  * @see StringWriter
  * @see SequenceFileWriter
  *
  * @param <T> Type of the elements emitted by this sink
+ *
+ * @deprecated use {@link BucketingSink} instead.
  */
+@Deprecated
 public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, Checkpointed<RollingSink.BucketState>, CheckpointListener {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index c71e97f..08c0d0a 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,7 +37,7 @@ import java.io.IOException;
 
 /**
  * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}.
- * The input to the {@link RollingSink} must
+ * The input to the {@link BucketingSink} must
  * be a {@link org.apache.flink.api.java.tuple.Tuple2} of two Hadopo
  * {@link org.apache.hadoop.io.Writable Writables}.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index 0fd85e2..140246f 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.fs;
 
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -32,7 +33,7 @@ import java.lang.reflect.Method;
  */
 public abstract class StreamWriterBase<T> implements Writer<T> {
 
-	private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
+	private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
 
 	/**
 	 * The {@code FSDataOutputStream} for the current part file.

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
index e7aaaa7..c3b4cb6 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.fs;
 
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -25,7 +26,7 @@ import java.io.Serializable;
 
 /**
  * An implementation of {@code Writer} is used in conjunction with a
- * {@link RollingSink} to perform the actual
+ * {@link BucketingSink} to perform the actual
  * writing to the bucket files.
  *
  * @param <T> The type of the elements that are being written by the sink.

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
new file mode 100644
index 0000000..0bf14b3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link Bucketer} that does not perform any
+ * bucketing of files. All files are written to the base path.
+ */
+public class BasePathBucketer<T> implements Bucketer<T> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Path getBucketPath(Clock clock, Path basePath, T element) {
+		return basePath;
+	}
+
+	@Override
+	public String toString() {
+		return "BasePathBucketer";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
new file mode 100644
index 0000000..86aa9f3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * A bucketer is used with a {@link BucketingSink}
+ * to put emitted elements into rolling files.
+ *
+ * <p>
+ * The {@code BucketingSink} can be writing to many buckets at a time, and it is responsible for managing
+ * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket
+ * path the element should fall in. The {@code Bucketer} can, for example, determine buckets based on
+ * system time.
+ */
+public interface Bucketer<T> extends Serializable {
+	/**
+	 * Returns the {@link Path} of a bucket file.
+	 *
+	 * @param basePath The base path containing all the buckets.
+	 * @param element The current element being processed.
+	 *
+	 * @return The complete {@code Path} of the bucket which the provided element should fall in. This
+	 * should include the {@code basePath} and also the {@code subtaskIndex} to avoid clashes with
+	 * parallel sinks.
+	 */
+	Path getBucketPath(Clock clock, Path basePath, T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
new file mode 100644
index 0000000..1e05c0d
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -0,0 +1,1034 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.Iterator;
+
+/**
+ * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within
+ * buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
+ *
+ * <p>
+ * When creating the sink a {@code basePath} must be specified. The base directory contains
+ * one directory for every bucket. The bucket directories themselves contain several part files.
+ * These contain the actual written data.
+ *
+ * <p>
+ * The sink uses a {@link Bucketer} to determine the bucket directories that elements should
+ * be written to inside the
+ * base directory. The {@code Bucketer} can, for example, use time or a property of the element
+ * to determine the bucket directory. The default bucketer is a {@link DateTimeBucketer}
+ * with date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer}
+ * using {@link #setBucketer(Bucketer)}. For example, use
+ * {@link BasePathBucketer} if you don't want to have buckets but still write part files in a
+ * fault-tolerant way.
+ *
+ * <p>
+ * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
+ * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is
+ * {@code "part"} but this can be configured using {@link #setPartPrefix(String)}. When a part file
+ * becomes bigger than the batch size the current part file is closed, the part counter is increased and
+ * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
+ * using {@link #setBatchSize(long)}.
+ *
+ * <p>
+ * In some scenarios, the buckets being written to change over time. In these cases, the sink needs to
+ * determine when a bucket has become inactive, in order to flush and close the part file. A bucket is
+ * regarded inactive if it hasn't been written to recently. To support this there are two configurable
+ * settings: the frequency to check for inactivity, configured by
+ * {@link #setInactiveBucketCheckInterval(long)} and the minimum amount of time a bucket must not be
+ * written to before it should be regarded inactive, configured by
+ * {@link #setInactiveBucketThreshold(long)}. Both of these parameters default to {@code 60,000 ms}.
+ *
+ * <p>
+ * Part files can be in one of three states: in-progress, pending or finished. The reason for this
+ * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
+ * and fault-tolerance. The part file that is currently being written to is in-progress. Once
+ * a part file is closed for writing it becomes pending. When a checkpoint is successful the
+ * currently pending files will be moved to finished. If a failure occurs the pending files
+ * will be deleted to reset state to the last checkpoint. The data in in-progress files will
+ * also have to be rolled back. If the {@code FileSystem} supports the {@code truncate} call
+ * this will be used to reset the file back to a previous state. If not, a special file
+ * with the same name as the part file and the suffix {@code ".valid-length"} will be written
+ * that contains the length up to which the file contains valid data. When reading the file
+ * it must be ensured that it is only read up to that point. The prefixes and suffixes for
+ * the different file states and valid-length files can be configured, for example with
+ * {@link #setPendingSuffix(String)}.
+ *
+ * <p>
+ * Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
+ * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work
+ * in a non-fault-tolerant way but still provide output without prefixes and suffixes.
+ *
+ * <p>
+ * The part files are written using an instance of {@link Writer}. By default
+ * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
+ * of {@code toString()} for every element. Separated by newlines. You can configure the writer
+ * using {@link #setWriter(Writer)}. For example,
+ * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write
+ * Hadoop {@code SequenceFiles}.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *     new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
+ *         .setWriter(new SequenceFileWriter<IntWritable, Text>())
+ *         .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
+ * }</pre>
+ *
+ * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
+ *
+ * @see DateTimeBucketer
+ * @see StringWriter
+ * @see SequenceFileWriter
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class BucketingSink<T>
+		extends RichSinkFunction<T>
+		implements InputTypeConfigurable, Checkpointed<BucketingSink.State<T>>, CheckpointListener, Triggerable {
+	private static final long serialVersionUID = 1L;
+
+	private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
+
+	// --------------------------------------------------------------------------------------------
+	//  User configuration values
+	// --------------------------------------------------------------------------------------------
+	// These are initialized with some defaults but are meant to be changeable by the user
+
+	/**
+	 * The default maximum size of part files.
+	 *
+	 * 6 times the default block size
+	 */
+	private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
+
+	/**
+	 * The default time between checks for inactive buckets.
+	 */
+	private final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L;
+
+	/**
+	 * The default threshold for marking a bucket as inactive and closing its part files.
+	 * Buckets which haven't been written to for at least this period of time become inactive.
+	 */
+	private final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
+
+	/**
+	 * This is used for part files that we are writing to but which where not yet confirmed
+	 * by a checkpoint.
+	 */
+	private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
+
+	/**
+	 * See above, but for prefix
+	 */
+	private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
+
+	/**
+	 * This is used for part files that we are not writing to but which are not yet confirmed by
+	 * checkpoint.
+	 */
+	private final String DEFAULT_PENDING_SUFFIX = ".pending";
+
+	/**
+	 * See above, but for prefix.
+	 */
+	private final String DEFAULT_PENDING_PREFIX = "_";
+
+	/**
+	 * When truncate() is not supported on the used FileSystem we instead write a
+	 * file along the part file with this ending that contains the length up to which
+	 * the part file is valid.
+	 */
+	private final String DEFAULT_VALID_SUFFIX = ".valid-length";
+
+	/**
+	 * See above, but for prefix.
+	 */
+	private final String DEFAULT_VALID_PREFIX = "_";
+
+	/**
+	 * The default prefix for part files.
+	 */
+	private final String DEFAULT_PART_REFIX = "part";
+
+	/**
+	 * The default timeout for asynchronous operations such as recoverLease and truncate. In
+	 * milliseconds.
+	 */
+	private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
+
+
+	/**
+	 * The base {@code Path} that stores all bucket directories.
+	 */
+	private final String basePath;
+
+	/**
+	 * The {@code Bucketer} that is used to determine the path of bucket directories.
+	 */
+	private Bucketer<T> bucketer;
+
+	/**
+	 * We have a template and call duplicate() for each parallel writer in open() to get the actual
+	 * writer that is used for the part files.
+	 */
+	private Writer<T> writerTemplate;
+
+	/**
+	 * If this is true we remove any leftover in-progress/pending files when the sink is opened.
+	 *
+	 * <p>
+	 * This should only be set to false if using the sink without checkpoints, to not remove
+	 * the files already in the directory.
+	 */
+	private boolean cleanupOnOpen = true;
+
+	private long batchSize = DEFAULT_BATCH_SIZE;
+	private long inactiveBucketCheckInterval = DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS;
+	private long inactiveBucketThreshold = DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS;
+
+	// These are the actually configured prefixes/suffixes
+	private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
+	private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
+
+	private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
+	private String pendingPrefix = DEFAULT_PENDING_PREFIX;
+
+	private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
+	private String validLengthPrefix= DEFAULT_VALID_PREFIX;
+
+	private String partPrefix = DEFAULT_PART_REFIX;
+
+	/**
+	 * The timeout for asynchronous operations such as recoverLease and truncate. In
+	 * milliseconds.
+	 */
+	private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal fields (not configurable by user)
+	// -------------------------------------------�-------------------------------------------------
+
+	/**
+	 * Our subtask index, retrieved from the {@code RuntimeContext} in {@link #open}.
+	 */
+	private transient int subtaskIndex;
+
+	/**
+	 * We use reflection to get the .truncate() method, this is only available starting with
+	 * Hadoop 2.7
+	 */
+	private transient Method refTruncate;
+
+	/**
+	 * The state object that is handled by flink from snapshot/restore. In there we store state for
+	 * every open bucket: the current part file path, the valid length of the in-progress files and
+	 * pending part files.
+	 */
+	private transient State<T> state;
+
+	private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+	private transient Clock clock;
+
+	/**
+	 * Creates a new {@code BucketingSink} that writes files to the given base directory.
+	 *
+	 * <p>
+	 * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
+	 * The maximum bucket size is set to 384 MB.
+	 *
+	 * @param basePath The directory to which to write the bucket files.
+	 */
+	public BucketingSink(String basePath) {
+		this.basePath = basePath;
+		this.bucketer = new DateTimeBucketer<T>();
+		this.writerTemplate = new StringWriter<>();
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+		if (this.writerTemplate instanceof InputTypeConfigurable) {
+			((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
+		}
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+		state = new State<T>();
+
+		Path baseDirectory = new Path(basePath);
+		hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+		FileSystem fs = baseDirectory.getFileSystem(hadoopConf);
+		refTruncate = reflectTruncate(fs);
+
+		long currentProcessingTime =
+				((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime();
+
+		checkForInactiveBuckets(currentProcessingTime);
+
+		((StreamingRuntimeContext) getRuntimeContext()).registerTimer(
+				currentProcessingTime + inactiveBucketCheckInterval, this);
+
+		this.clock = new Clock() {
+			@Override
+			public long currentTimeMillis() {
+				return ((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime();
+			}
+		};
+
+		// delete pending/in-progress files that might be left if we fail while
+		// no checkpoint has yet been done
+		try {
+			if (fs.exists(baseDirectory) && cleanupOnOpen) {
+				RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(baseDirectory, true);
+
+				while (bucketFiles.hasNext()) {
+					LocatedFileStatus file = bucketFiles.next();
+					if (file.getPath().toString().endsWith(pendingSuffix)) {
+						// only delete files that contain our subtask index
+						if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
+							LOG.debug("(OPEN) Deleting leftover pending file {}", file.getPath().toString());
+							fs.delete(file.getPath(), true);
+						}
+					}
+					if (file.getPath().toString().endsWith(inProgressSuffix)) {
+						// only delete files that contain our subtask index
+						if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
+							LOG.debug("(OPEN) Deleting leftover in-progress file {}", file.getPath().toString());
+							fs.delete(file.getPath(), true);
+						}
+					}
+				}
+			}
+		} catch (IOException e) {
+			LOG.error("Error while deleting leftover pending/in-progress files: {}", e);
+			throw new RuntimeException("Error while deleting leftover pending/in-progress files.", e);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
+			closeCurrentPartFile(entry.getValue());
+		}
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+		Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
+
+		long currentProcessingTime =
+				((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime();
+
+		if (!state.hasBucketState(bucketPath)) {
+			state.addBucketState(bucketPath, new BucketState<T>(currentProcessingTime));
+		}
+
+		BucketState<T> bucketState = state.getBucketState(bucketPath);
+
+		if (shouldRoll(bucketState)) {
+			openNewPartFile(bucketPath, bucketState);
+		}
+
+		bucketState.writer.write(value);
+		bucketState.lastWrittenToTime = currentProcessingTime;
+	}
+
+	/**
+	 * Determines whether we should change the file we are writing to within a given bucket.
+	 *
+	 * <p>
+	 * This will roll if no file was created yet or if the file size is larger than the specified size.
+	 */
+	private boolean shouldRoll(BucketState<T> bucketState) throws IOException {
+		boolean shouldRoll = false;
+		if (!bucketState.isWriterOpen) {
+			shouldRoll = true;
+			LOG.debug("BucketingSink {} starting new bucket.", subtaskIndex);
+		} else {
+			long writePosition = bucketState.writer.getPos();
+			if (writePosition > batchSize) {
+				shouldRoll = true;
+				LOG.debug(
+					"BucketingSink {} starting new bucket because file position {} is above batch size {}.",
+					subtaskIndex,
+					writePosition,
+					batchSize);
+			}
+		}
+		return shouldRoll;
+	}
+
+	@Override
+	public void trigger(long timestamp) throws Exception {
+		long currentProcessingTime =
+				((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime();
+
+		checkForInactiveBuckets(currentProcessingTime);
+
+		((StreamingRuntimeContext) getRuntimeContext()).registerTimer(
+				currentProcessingTime + inactiveBucketCheckInterval, this);
+	}
+
+	/**
+	 * Checks for inactive buckets, and closes them. This enables in-progress files to be moved to
+	 * the pending state and finalised on the next checkpoint.
+	 */
+	private void checkForInactiveBuckets(long currentProcessingTime) throws Exception {
+
+		synchronized (state.bucketStates) {
+			for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
+				if (entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) {
+					LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
+						subtaskIndex, inactiveBucketThreshold);
+					closeCurrentPartFile(entry.getValue());
+				}
+			}
+		}
+	}
+
+	/**
+	 * Opens a new part file.
+	 *
+	 * <p>
+	 * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
+	 */
+	private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception {
+		closeCurrentPartFile(bucketState);
+
+		FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
+
+		if (!fs.exists(bucketPath)) {
+			try {
+				if (fs.mkdirs(bucketPath)) {
+					LOG.debug("Created new bucket directory: {}", bucketPath);
+				}
+			} catch (IOException e) {
+				throw new RuntimeException("Could not create new bucket path.", e);
+			}
+		}
+
+		Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
+
+		// This should work since there is only one parallel subtask that tries names with
+		// our subtask id. Otherwise we would run into concurrency issues here.
+		while (fs.exists(partPath) || fs.exists(new Path(partPath.getParent(), pendingPrefix + partPath.getName()).suffix(pendingSuffix))) {
+			bucketState.partCounter++;
+			partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
+		}
+
+		// increase, so we don't have to check for this name next time
+		bucketState.partCounter++;
+
+		LOG.debug("Next part path is {}", partPath.toString());
+		bucketState.currentFile = partPath.toString();
+
+		Path inProgressPath = new Path(partPath.getParent(), inProgressPrefix + partPath.getName()).suffix(inProgressSuffix);
+
+		// If we don't already have a writer for this bucket, create one
+		if (bucketState.writer == null) {
+			bucketState.writer = writerTemplate.duplicate();
+		}
+
+		bucketState.writer.open(fs, inProgressPath);
+		bucketState.isWriterOpen = true;
+	}
+
+	/**
+	 * Closes the current part file.
+	 *
+	 * <p>
+	 * This moves the current in-progress part file to a pending file and adds it to the list
+	 * of pending files in our bucket state.
+	 */
+	private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception {
+		if (bucketState.isWriterOpen) {
+			bucketState.writer.close();
+			bucketState.isWriterOpen = false;
+		}
+
+		if (bucketState.currentFile != null) {
+			Path currentPartPath = new Path(bucketState.currentFile);
+			Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
+			Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
+			FileSystem fs = inProgressPath.getFileSystem(hadoopConf);
+			fs.rename(inProgressPath, pendingPath);
+			LOG.debug("Moving in-progress bucket {} to pending file {}",
+				inProgressPath,
+				pendingPath);
+			bucketState.pendingFiles.add(currentPartPath.toString());
+			bucketState.currentFile = null;
+		}
+	}
+
+	/**
+	 * Gets the truncate() call using reflection.
+	 *
+	 * <p>
+	 * Note: This code comes from Flume
+	 */
+	private Method reflectTruncate(FileSystem fs) {
+		Method m = null;
+		if(fs != null) {
+			Class<?> fsClass = fs.getClass();
+			try {
+				m = fsClass.getMethod("truncate", Path.class, long.class);
+			} catch (NoSuchMethodException ex) {
+				LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
+					" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
+				return null;
+			}
+
+
+			// verify that truncate actually works
+			FSDataOutputStream outputStream;
+			Path testPath = new Path(UUID.randomUUID().toString());
+			try {
+				outputStream = fs.create(testPath);
+				outputStream.writeUTF("hello");
+				outputStream.close();
+			} catch (IOException e) {
+				LOG.error("Could not create file for checking if truncate works.", e);
+				throw new RuntimeException("Could not create file for checking if truncate works.", e);
+			}
+
+
+			try {
+				m.invoke(fs, testPath, 2);
+			} catch (IllegalAccessException | InvocationTargetException e) {
+				LOG.debug("Truncate is not supported.", e);
+				m = null;
+			}
+
+			try {
+				fs.delete(testPath, false);
+			} catch (IOException e) {
+				LOG.error("Could not delete truncate test file.", e);
+				throw new RuntimeException("Could not delete truncate test file.", e);
+			}
+		}
+		return m;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		synchronized (state.bucketStates) {
+			Iterator<Map.Entry<String, BucketState<T>>> it = state.bucketStates.entrySet().iterator();
+			while (it.hasNext()) {
+				BucketState<T> bucketState = it.next().getValue();
+				synchronized (bucketState.pendingFilesPerCheckpoint) {
+					Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
+					Set<Long> checkpointsToRemove = new HashSet<>();
+					for (Long pastCheckpointId : pastCheckpointIds) {
+						if (pastCheckpointId <= checkpointId) {
+							LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
+							// All the pending files are buckets that have been completed but are waiting to be renamed
+							// to their final name
+							for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+								Path finalPath = new Path(filename);
+								Path pendingPath = new Path(finalPath.getParent(),
+									pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
+
+								FileSystem fs = pendingPath.getFileSystem(hadoopConf);
+								fs.rename(pendingPath, finalPath);
+								LOG.debug(
+									"Moving pending file {} to final location having completed checkpoint {}.",
+									pendingPath,
+									pastCheckpointId);
+							}
+							checkpointsToRemove.add(pastCheckpointId);
+						}
+					}
+					if (!bucketState.isWriterOpen && bucketState.pendingFiles.isEmpty()) {
+						// We've dealt with all the pending files and the writer for this bucket is not currently open.
+						// Therefore this bucket is currently inactive and we can remove it from our state.
+						it.remove();
+					} else {
+						for (Long toRemove : checkpointsToRemove) {
+							bucketState.pendingFilesPerCheckpoint.remove(toRemove);
+						}
+					}
+				}
+			}
+		}
+	}
+
+	@Override
+	public State<T> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		synchronized (state.bucketStates) {
+			for (BucketState<T> bucketState : state.bucketStates.values()) {
+				if (bucketState.isWriterOpen) {
+					long pos = bucketState.writer.flush();
+					bucketState.currentFileValidLength = pos;
+				}
+				synchronized (bucketState.pendingFilesPerCheckpoint) {
+					bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles);
+				}
+				bucketState.pendingFiles = new ArrayList<>();
+			}
+		}
+		return state;
+	}
+
+	@Override
+	public void restoreState(State<T> state) {
+		this.state = state;
+
+		FileSystem fs;
+		try {
+			fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem in checkpoint restore.", e);
+			throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);
+		}
+
+		for (BucketState<T> bucketState : state.bucketStates.values()) {
+			// we can clean all the pending files since they where renamed to final files
+			// after this checkpoint was successful
+			bucketState.pendingFiles.clear();
+
+			if (bucketState.currentFile != null) {
+				// We were writing to a file when the last checkpoint occured. This file can either
+				// be still in-progress or became a pending file at some point after the checkpoint.
+				// Either way, we have to truncate it back to a valid state (or write a .valid-length)
+				// file that specifies up to which length it is valid and rename it to the final name
+				// before starting a new bucket file.
+				Path partPath = new Path(bucketState.currentFile);
+				try {
+					Path partPendingPath = new Path(partPath.getParent(), pendingPrefix + partPath.getName()).suffix(
+						pendingSuffix);
+					Path partInProgressPath = new Path(partPath.getParent(), inProgressPrefix + partPath.getName()).suffix(inProgressSuffix);
+
+					if (fs.exists(partPendingPath)) {
+						LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
+						// has been moved to pending in the mean time, rename to final location
+						fs.rename(partPendingPath, partPath);
+					} else if (fs.exists(partInProgressPath)) {
+						LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
+						// it was still in progress, rename to final path
+						fs.rename(partInProgressPath, partPath);
+					} else if (fs.exists(partPath)) {
+						LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
+					} else {
+						LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
+							"it was moved to final location by a previous snapshot restore", bucketState.currentFile);
+					}
+
+					refTruncate = reflectTruncate(fs);
+					// truncate it or write a ".valid-length" file to specify up to which point it is valid
+					if (refTruncate != null) {
+						LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
+						// some-one else might still hold the lease from a previous try, we are
+						// recovering, after all ...
+						if (fs instanceof DistributedFileSystem) {
+							DistributedFileSystem dfs = (DistributedFileSystem) fs;
+							LOG.debug("Trying to recover file lease {}", partPath);
+							dfs.recoverLease(partPath);
+							boolean isclosed = dfs.isFileClosed(partPath);
+							StopWatch sw = new StopWatch();
+							sw.start();
+							while (!isclosed) {
+								if (sw.getTime() > asyncTimeout) {
+									break;
+								}
+								try {
+									Thread.sleep(500);
+								} catch (InterruptedException e1) {
+									// ignore it
+								}
+								isclosed = dfs.isFileClosed(partPath);
+							}
+						}
+						Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
+						if (!truncated) {
+							LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
+
+							// we must wait for the asynchronous truncate operation to complete
+							StopWatch sw = new StopWatch();
+							sw.start();
+							long newLen = fs.getFileStatus(partPath).getLen();
+							while (newLen != bucketState.currentFileValidLength) {
+								if (sw.getTime() > asyncTimeout) {
+									break;
+								}
+								try {
+									Thread.sleep(500);
+								} catch (InterruptedException e1) {
+									// ignore it
+								}
+								newLen = fs.getFileStatus(partPath).getLen();
+							}
+							if (newLen != bucketState.currentFileValidLength) {
+								throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
+							}
+						}
+
+					} else {
+						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
+						Path validLengthFilePath = new Path(partPath.getParent(), validLengthPrefix + partPath.getName()).suffix(validLengthSuffix);
+						if (!fs.exists(validLengthFilePath)) {
+							FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
+							lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
+							lengthFileOut.close();
+						}
+					}
+
+					// Now that we've restored the bucket to a valid state, reset the current file info
+					bucketState.currentFile = null;
+					bucketState.currentFileValidLength = -1;
+				} catch (IOException e) {
+					LOG.error("Error while restoring BucketingSink state.", e);
+					throw new RuntimeException("Error while restoring BucketingSink state.", e);
+				} catch (InvocationTargetException | IllegalAccessException e) {
+					LOG.error("Cound not invoke truncate.", e);
+					throw new RuntimeException("Could not invoke truncate.", e);
+				}
+			}
+
+			LOG.debug("Clearing pending/in-progress files.");
+
+			// Move files that are confirmed by a checkpoint but did not get moved to final location
+			// because the checkpoint notification did not happen before a failure
+
+			Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
+			LOG.debug("Moving pending files to final location on restore.");
+			for (Long pastCheckpointId : pastCheckpointIds) {
+				// All the pending files are buckets that have been completed but are waiting to be renamed
+				// to their final name
+				for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+					Path finalPath = new Path(filename);
+					Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
+
+					try {
+						if (fs.exists(pendingPath)) {
+							LOG.debug("(RESTORE) Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
+							fs.rename(pendingPath, finalPath);
+						}
+					} catch (IOException e) {
+						LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
+						throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
+					}
+				}
+			}
+
+			synchronized (bucketState.pendingFilesPerCheckpoint) {
+				bucketState.pendingFilesPerCheckpoint.clear();
+			}
+		}
+
+		// we need to get this here since open() has not yet been called
+		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+		// delete pending files
+		try {
+
+			RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true);
+
+			while (bucketFiles.hasNext()) {
+				LocatedFileStatus file = bucketFiles.next();
+				if (file.getPath().toString().endsWith(pendingSuffix)) {
+					// only delete files that contain our subtask index
+					if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
+						LOG.debug("(RESTORE) Deleting pending file {}", file.getPath().toString());
+						fs.delete(file.getPath(), true);
+					}
+				}
+				if (file.getPath().toString().endsWith(inProgressSuffix)) {
+					// only delete files that contain our subtask index
+					if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
+						LOG.debug("(RESTORE) Deleting in-progress file {}", file.getPath().toString());
+						fs.delete(file.getPath(), true);
+					}
+				}
+			}
+		} catch (IOException e) {
+			LOG.error("Error while deleting old pending files: {}", e);
+			throw new RuntimeException("Error while deleting old pending files.", e);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Setters for User configuration values
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Sets the maximum bucket size in bytes.
+	 *
+	 * <p>
+	 * When a bucket part file becomes larger than this size a new bucket part file is started and
+	 * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
+	 *
+	 * @param batchSize The bucket part file size in bytes.
+	 */
+	public BucketingSink<T> setBatchSize(long batchSize) {
+		this.batchSize = batchSize;
+		return this;
+	}
+
+	/**
+	 * Sets the default time between checks for inactive buckets.
+	 *
+	 * @param interval The timeout, in milliseconds.
+	 */
+	public BucketingSink<T> setInactiveBucketCheckInterval(long interval) {
+		this.inactiveBucketCheckInterval = interval;
+		return this;
+	}
+
+	/**
+	 * Sets the default threshold for marking a bucket as inactive and closing its part files.
+	 * Buckets which haven't been written to for at least this period of time become inactive.
+	 *
+	 * @param threshold The timeout, in milliseconds.
+	 */
+	public BucketingSink<T> setInactiveBucketThreshold(long threshold) {
+		this.inactiveBucketThreshold = threshold;
+		return this;
+	}
+
+	/**
+	 * Sets the {@link Bucketer} to use for determining the bucket files to write to.
+	 *
+	 * @param bucketer The bucketer to use.
+	 */
+	public BucketingSink<T> setBucketer(Bucketer<T> bucketer) {
+		this.bucketer = bucketer;
+		return this;
+	}
+
+	/**
+	 * Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
+	 *
+	 * @param writer The {@code Writer} to use.
+	 */
+	public BucketingSink<T> setWriter(Writer<T> writer) {
+		this.writerTemplate = writer;
+		return this;
+	}
+
+	/**
+	 * Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
+	 */
+	public BucketingSink<T> setInProgressSuffix(String inProgressSuffix) {
+		this.inProgressSuffix = inProgressSuffix;
+		return this;
+	}
+
+	/**
+	 * Sets the prefix of in-progress part files. The default is {@code "_"}.
+	 */
+	public BucketingSink<T> setInProgressPrefix(String inProgressPrefix) {
+		this.inProgressPrefix = inProgressPrefix;
+		return this;
+	}
+
+	/**
+	 * Sets the suffix of pending part files. The default is {@code ".pending"}.
+	 */
+	public BucketingSink<T> setPendingSuffix(String pendingSuffix) {
+		this.pendingSuffix = pendingSuffix;
+		return this;
+	}
+
+	/**
+	 * Sets the prefix of pending part files. The default is {@code "_"}.
+	 */
+	public BucketingSink<T> setPendingPrefix(String pendingPrefix) {
+		this.pendingPrefix = pendingPrefix;
+		return this;
+	}
+
+	/**
+	 * Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
+	 */
+	public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix) {
+		this.validLengthSuffix = validLengthSuffix;
+		return this;
+	}
+
+	/**
+	 * Sets the prefix of valid-length files. The default is {@code "_"}.
+	 */
+	public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix) {
+		this.validLengthPrefix = validLengthPrefix;
+		return this;
+	}
+
+	/**
+	 * Sets the prefix of part files.  The default is {@code "part"}.
+	 */
+	public BucketingSink<T> setPartPrefix(String partPrefix) {
+		this.partPrefix = partPrefix;
+		return this;
+	}
+
+	/**
+	 * Disable cleanup of leftover in-progress/pending files when the sink is opened.
+	 *
+	 * <p>
+	 * This should only be disabled if using the sink without checkpoints, to not remove
+	 * the files already in the directory.
+	 */
+	public BucketingSink<T> disableCleanupOnOpen() {
+		this.cleanupOnOpen = false;
+		return this;
+	}
+
+	/**
+	 * Sets the default timeout for asynchronous operations such as recoverLease and truncate.
+	 *
+	 * @param timeout The timeout, in milliseconds.
+	 */
+	public BucketingSink<T> setAsyncTimeout(long timeout) {
+		this.asyncTimeout = timeout;
+		return this;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal Classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * This is used during snapshot/restore to keep track of in-progress buckets.
+	 * For each bucket, we maintain a state.
+	 */
+	static final class State<T> implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * For every bucket directory (key), we maintain a bucket state (value).
+		 */
+		final Map<String, BucketState<T>> bucketStates = new HashMap<>();
+
+		boolean hasBucketState(Path bucketPath) {
+			synchronized (bucketStates) {
+				return bucketStates.containsKey(bucketPath.toString());
+			}
+		}
+
+		void addBucketState(Path bucketPath, BucketState<T> state) {
+			synchronized (bucketStates) {
+				bucketStates.put(bucketPath.toString(), state);
+			}
+		}
+
+		BucketState<T> getBucketState(Path bucketPath) {
+			synchronized (bucketStates) {
+				return bucketStates.get(bucketPath.toString());
+			}
+		}
+	}
+
+	/**
+	 * This is used for keeping track of the current in-progress buckets and files that we mark
+	 * for moving from pending to final location after we get a checkpoint-complete notification.
+	 */
+	static final class BucketState<T> implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * The file that was in-progress when the last checkpoint occurred.
+		 */
+		String currentFile = null;
+
+		/**
+		 * The valid length of the in-progress file at the time of the last checkpoint.
+		 */
+		long currentFileValidLength = -1;
+
+		/**
+		 * The time this bucket was last written to.
+		 */
+		long lastWrittenToTime;
+
+		/**
+		 * Pending files that accumulated since the last checkpoint.
+		 */
+		List<String> pendingFiles = new ArrayList<>();
+
+		/**
+		 * When doing a checkpoint we move the pending files since the last checkpoint to this map
+		 * with the id of the checkpoint. When we get the checkpoint-complete notification we move
+		 * pending files of completed checkpoints to their final location.
+		 */
+		final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
+
+		/**
+		 * For counting the part files inside a bucket directory. Part files follow the pattern
+		 * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter.
+		 */
+		private transient int partCounter;
+
+		/**
+		 * Tracks if the writer is currently opened or closed.
+		 */
+		private transient boolean isWriterOpen = false;
+
+		/**
+		 * The actual writer that we user for writing the part files.
+		 */
+		private transient Writer<T> writer;
+
+		public BucketState(long lastWrittenToTime) {
+			this.lastWrittenToTime = lastWrittenToTime;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
new file mode 100644
index 0000000..d47eed1
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.SystemClock;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * A {@link Bucketer} that assigns to buckets based on current system time.
+ *
+ * <p>
+ * The {@code DateTimeBucketer} will create directories of the following form:
+ * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
+ * that was specified as a base path when creating the
+ * {@link BucketingSink}. The {@code dateTimePath}
+ * is determined based on the current system time and the user provided format string.
+ *
+ * <p>
+ * {@link SimpleDateFormat} is used to derive a date string from the current system time and
+ * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
+ * files will have a granularity of hours.
+ *
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
+ * }</pre>
+ *
+ * This will create for example the following bucket path:
+ * {@code /base/1976-12-31-14/}
+ *
+ */
+public class DateTimeBucketer<T> implements Bucketer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
+
+	private final String formatString;
+
+	private transient SimpleDateFormat dateFormatter;
+
+	/**
+	 * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
+	 */
+	public DateTimeBucketer() {
+		this(DEFAULT_FORMAT_STRING);
+	}
+
+	/**
+	 * Creates a new {@code DateTimeBucketer} with the given date/time format string.
+	 *
+	 * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
+	 *                     the bucket path.
+	 */
+	public DateTimeBucketer(String formatString) {
+		this.formatString = formatString;
+
+		this.dateFormatter = new SimpleDateFormat(formatString);
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		this.dateFormatter = new SimpleDateFormat(formatString);
+	}
+
+	@Override
+	public Path getBucketPath(Clock clock, Path basePath, T element) {
+		String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
+		return new Path(basePath + "/" + newDateTimeString);
+	}
+
+	@Override
+	public String toString() {
+		return "DateTimeBucketer{" +
+				"formatString='" + formatString + '\'' +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
index e516f50..2b93721 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
@@ -60,7 +60,10 @@ import static org.junit.Assert.assertTrue;
 * This differs from RollingSinkFaultToleranceITCase in that the checkpoint interval is extremely
 * high. This provokes the case that the sink restarts without any checkpoint having been performed.
 * This tests the initial cleanup of pending/in-progress files.
+*
+* @deprecated should be removed with the {@link RollingSink}.
 */
+@Deprecated
 public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBase {
 
 	final long NUM_STRINGS = 16_000;

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 334e761..9c39237 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -55,7 +55,10 @@ import static org.junit.Assert.assertTrue;
  * <p>
  * This test only verifies the exactly once behaviour of the sink. Another test tests the
  * rolling behaviour.
+ *
+ * @deprecated should be removed with the {@link RollingSink}.
  */
+@Deprecated
 public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
 
 	final long NUM_STRINGS = 16_000;

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index dd2fc26..7ee75c1 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -73,7 +73,10 @@ import java.util.Map;
  * <p>
  * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
  * exactly once behaviour.
+ *
+ * @deprecated should be removed with the {@link RollingSink}.
  */
+@Deprecated
 public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 	@ClassRule

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultTolerance2ITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultTolerance2ITCase.java
new file mode 100644
index 0000000..e2ac4d3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultTolerance2ITCase.java
@@ -0,0 +1,289 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+* Tests for {@link BucketingSink}.
+*
+* <p>
+* This test only verifies the exactly once behaviour of the sink. Another test tests the
+* rolling behaviour.
+*
+* <p>
+* This differs from BucketingSinkFaultToleranceITCase in that the checkpoint interval is extremely
+* high. This provokes the case that the sink restarts without any checkpoint having been performed.
+* This tests the initial cleanup of pending/in-progress files.
+*/
+public class BucketingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBase {
+
+	final long NUM_STRINGS = 16_000;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+
+	private static String outPath;
+
+
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+		Configuration conf = new Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		outPath = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(),  hdfsCluster.getNameNodePort())
+				+ "/string-non-rolling-out-no-checkpoint";
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
+	}
+
+
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+		int PARALLELISM = 12;
+
+		env.enableCheckpointing(Long.MAX_VALUE);
+		env.setParallelism(PARALLELISM);
+		env.disableOperatorChaining();
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+		DataStream<String> mapped = stream
+				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
+
+		BucketingSink<String> sink = new BucketingSink<String>(outPath)
+				.setBucketer(new BasePathBucketer<String>())
+				.setBatchSize(5000)
+				.setValidLengthPrefix("")
+				.setPendingPrefix("");
+
+		mapped.addSink(sink);
+
+	}
+
+	@Override
+	public void postSubmit() throws Exception {
+		// We read the files and verify that we have read all the strings. If a valid-length
+		// file exists we only read the file to that point. (This test should work with
+		// FileSystems that support truncate() and with others as well.)
+
+		Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+		// Keep a set of the message IDs that we read. The size must equal the read count and
+		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
+		// elements twice.
+		Set<Integer> readNumbers = Sets.newHashSet();
+		int numRead = 0;
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
+				outPath), true);
+
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+
+			if (!file.getPath().toString().endsWith(".valid-length")) {
+				int validLength = (int) file.getLen();
+				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
+					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
+					String validLengthString = inStream.readUTF();
+					validLength = Integer.parseInt(validLengthString);
+					System.out.println("VALID LENGTH: " + validLength);
+				}
+				FSDataInputStream inStream = dfs.open(file.getPath());
+				byte[] buffer = new byte[validLength];
+				inStream.readFully(0, buffer, 0, validLength);
+				inStream.close();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+
+				InputStreamReader inStreamReader = new InputStreamReader(bais);
+				BufferedReader br = new BufferedReader(inStreamReader);
+
+				String line = br.readLine();
+				while (line != null) {
+					Matcher matcher = messageRegex.matcher(line);
+					if (matcher.matches()) {
+						numRead++;
+						int messageId = Integer.parseInt(matcher.group(1));
+						readNumbers.add(messageId);
+					} else {
+						Assert.fail("Read line does not match expected pattern.");
+					}
+					line = br.readLine();
+				}
+				br.close();
+				inStreamReader.close();
+				bais.close();
+			}
+		}
+
+		// Verify that we read all strings (at-least-once)
+		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+		// Verify that we don't have duplicates (boom!, exactly-once)
+		Assert.assertEquals(NUM_STRINGS, numRead);
+	}
+
+	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private long count;
+
+
+		OnceFailingIdentityMapper(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
+			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+			implements CheckpointedAsynchronously<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final long numElements;
+
+		private int index;
+
+		private volatile boolean isRunning = true;
+
+
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
+
+			while (isRunning && index < numElements) {
+
+				Thread.sleep(1);
+				synchronized (lockingObject) {
+					ctx.collect("message " + index);
+					index += step;
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+	}
+}