You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/18 16:03:57 UTC

[1/4] flink git commit: [FLINK-2583] [hdfs connector] Add Stream Sink For Rolling HDFS Files

Repository: flink
Updated Branches:
  refs/heads/master 057109488 -> b234b0b16


http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
new file mode 100644
index 0000000..e0592e9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
+ *
+ * <p>
+ * This test only verifies the exactly once behaviour of the sink. Another test tests the
+ * rolling behaviour.
+ */
+public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
+
+	final long NUM_STRINGS = 16_000;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+	private static String hdfsURI;
+
+	private static String outPath;
+
+
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+		Configuration conf = new Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+
+		outPath = hdfsURI + "/string-non-rolling-out";
+
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
+	}
+
+
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+		int PARALLELISM = 6;
+
+		env.enableCheckpointing(200);
+		env.setParallelism(PARALLELISM);
+		env.disableOperatorChaining();
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+		DataStream<String> mapped = stream
+				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new NonRollingBucketer())
+				.setBatchSize(10000)
+				.setValidLengthPrefix("")
+				.setPendingPrefix("");
+
+		mapped.addSink(sink);
+
+	}
+
+	@Override
+	public void postSubmit() throws Exception {
+		// We read the files and verify that we have read all the strings. If a valid-length
+		// file exists we only read the file to that point. (This test should work with
+		// FileSystems that support truncate() and with others as well.)
+
+		Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+		// Keep a set of the message IDs that we read. The size must equal the read count and
+		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
+		// elements twice.
+		Set<Integer> readNumbers = Sets.newHashSet();
+		int numRead = 0;
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
+				outPath), true);
+
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+
+			if (!file.getPath().toString().endsWith(".valid-length")) {
+				int validLength = (int) file.getLen();
+				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
+					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
+					String validLengthString = inStream.readUTF();
+					validLength = Integer.parseInt(validLengthString);
+					System.out.println("VALID LENGTH: " + validLength);
+				}
+				FSDataInputStream inStream = dfs.open(file.getPath());
+				byte[] buffer = new byte[validLength];
+				inStream.readFully(0, buffer, 0, validLength);
+				inStream.close();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+
+				InputStreamReader inStreamReader = new InputStreamReader(bais);
+				BufferedReader br = new BufferedReader(inStreamReader);
+
+				String line = br.readLine();
+				while (line != null) {
+					Matcher matcher = messageRegex.matcher(line);
+					if (matcher.matches()) {
+						numRead++;
+						int messageId = Integer.parseInt(matcher.group(1));
+						readNumbers.add(messageId);
+					} else {
+						Assert.fail("Read line does not match expected pattern.");
+					}
+					line = br.readLine();
+				}
+				br.close();
+				inStreamReader.close();
+				bais.close();
+			}
+		}
+
+		// Verify that we read all strings (at-least-once)
+		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+		// Verify that we don't have duplicates (boom!, exactly-once)
+		Assert.assertEquals(NUM_STRINGS, numRead);
+	}
+
+	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private long count;
+
+
+		OnceFailingIdentityMapper(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
+			long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+			implements CheckpointedAsynchronously<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final long numElements;
+
+		private int index;
+
+		private volatile boolean isRunning = true;
+
+
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
+
+			while (isRunning && index < numElements) {
+
+				Thread.sleep(1);
+				synchronized (lockingObject) {
+					ctx.collect("message " + index);
+					index += step;
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
new file mode 100644
index 0000000..008b4b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -0,0 +1,503 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.taskmanager.MultiShotLatch;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * Tests for {@link RollingSink}. These
+ * tests test the different output methods as well as the rolling feature using a manual clock
+ * that increases time in lockstep with element computation using latches.
+ *
+ * <p>
+ * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
+ * exactly once behaviour.
+ */
+public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+	private static String hdfsURI;
+
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+		Configuration conf = new Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		hdfsCluster.shutdown();
+	}
+
+	/**
+	 * This tests {@link StringWriter} with
+	 * non-rolling output.
+	 */
+	@Test
+	public void testNonRollingStringWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/string-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		source
+				.map(new MapFunction<Tuple2<Integer,String>, String>() {
+					private static final long serialVersionUID = 1L;
+					@Override
+					public String map(Tuple2<Integer, String> value) throws Exception {
+						return value.f1;
+					}
+				})
+				.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+	}
+
+	/**
+	 * This tests {@link SequenceFileWriter}
+	 * with non-rolling output and without compression.
+	 */
+	@Test
+	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
+				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
+			}
+		});
+
+
+		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+				.setWriter(new SequenceFileWriter<IntWritable, Text>())
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		IntWritable intWritable = new IntWritable();
+		Text txt = new Text();
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+	}
+
+	/**
+	 * This tests {@link SequenceFileWriter}
+	 * with non-rolling output but with compression.
+	 */
+	@Test
+	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/seq-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
+				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
+			}
+		});
+
+
+		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+				.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		IntWritable intWritable = new IntWritable();
+		Text txt = new Text();
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+	}
+
+	// we use this to synchronize the clock changes to elements being processed
+	final static MultiShotLatch latch1 = new MultiShotLatch();
+	final static MultiShotLatch latch2 = new MultiShotLatch();
+
+	/**
+	 * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
+	 * produce rolling files. The clock of DateTimeBucketer is set to
+	 * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using
+	 * latches.
+	 */
+	@Test
+	public void testDateTimeRollingStringWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/rolling-out";
+		DateTimeBucketer.setClock(new ModifyableClock());
+		ModifyableClock.setCurrentTime(0);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
+				NUM_ELEMENTS))
+				.broadcast();
+
+		// the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
+		// fire the latch
+		DataStream<String> mapped = source
+				.flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() {
+					private static final long serialVersionUID = 1L;
+
+					int count = 0;
+					@Override
+					public void flatMap(Tuple2<Integer, String> value,
+							Collector<String> out) throws Exception {
+						out.collect(value.f1);
+						count++;
+						if (count >= 5) {
+							if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+								latch1.trigger();
+							} else {
+								latch2.trigger();
+							}
+							count = 0;
+						}
+					}
+
+				});
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new DateTimeBucketer("ss"))
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true);
+
+		// we should have 8 rolling files, 4 time intervals and parallelism of 2
+		int numFiles = 0;
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+			numFiles++;
+			if (file.getPath().toString().contains("rolling-out/00")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 0; i < 5; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/05")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 5; i < 10; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/10")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 10; i < 15; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/15")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 15; i < 20; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else {
+				Assert.fail("File " + file + " does not match any expected roll pattern.");
+			}
+		}
+
+		Assert.assertEquals(8, numFiles);
+	}
+
+
+	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		private final int numElements;
+
+		public TestSourceFunction(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < numElements && running; i++) {
+				ctx.collect(Tuple2.of(i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	/**
+	 * This waits on the two multi-shot latches. The latches are triggered in a parallel
+	 * flatMap inside the test topology.
+	 */
+	private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		private final int numElements;
+
+		public WaitingTestSourceFunction(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < numElements && running; i++) {
+				if (i % 5 == 0 && i > 0) {
+					// update the clock after "five seconds", so we get 20 seconds in total
+					// with 5 elements in each time window
+					latch1.await();
+					latch2.await();
+					ModifyableClock.setCurrentTime(i * 1000);
+				}
+				ctx.collect(Tuple2.of(i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Tuple2<Integer, String> value) throws Exception {
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				return value.f0 % 2 == 0;
+			} else {
+				return value.f0 % 2 == 1;
+			}
+		}
+	}
+
+	public static class ModifyableClock implements Clock {
+
+		private static volatile long currentTime = 0;
+
+		public static void setCurrentTime(long currentTime) {
+			ModifyableClock.currentTime = currentTime;
+		}
+
+		@Override
+		public long currentTimeMillis() {
+			return currentTime;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fe60d94
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 4e9730f..142d7c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -43,4 +43,23 @@ under the License.
 		<module>flink-connector-twitter</module>
 	</modules>
 
+	<!-- See main pom.xml for explanation of profiles -->
+	<profiles>
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<modules>
+				<!-- Include the flink-fs-tests project only for HD2.
+				 	The HDFS minicluster interfaces changed between the two versions.
+				 -->
+				<module>flink-connector-filesystem</module>
+			</modules>
+		</profile>
+	</profiles>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 4f8ec18..d7cc1f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -39,6 +39,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
@@ -1102,6 +1103,11 @@ public class DataStream<T> {
 		// read the output type of the input Transform to coax out errors about MissingTypeInfo
 		transformation.getOutputType();
 
+		// configure the type if needed
+		if (sinkFunction instanceof InputTypeConfigurable) {
+			((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() );
+		}
+
 		StreamSink<T> sinkOperator = new StreamSink<T>(clean(sinkFunction));
 
 		DataStreamSink<T> sink = new DataStreamSink<T>(this, sinkOperator);

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 20e19a5..ac5c1d6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -83,7 +83,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	/**
 	 * Implementations are expected to provide test here to verify the correct behavior.
 	 */
-	abstract public void postSubmit();
+	abstract public void postSubmit() throws Exception ;
 
 	/**
 	 * Runs the following program the test program defined in {@link #testProgram(StreamExecutionEnvironment)}


[2/4] flink git commit: [FLINK-2583] [hdfs connector] Add Stream Sink For Rolling HDFS Files

Posted by se...@apache.org.
[FLINK-2583] [hdfs connector] Add Stream Sink For Rolling HDFS Files

This is also integrated with Checkpointing to provide exactly-once semantics.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35dcceb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35dcceb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35dcceb9

Branch: refs/heads/master
Commit: 35dcceb9ebc3a8a47b8b8aeb2c4e1e2d453767f4
Parents: 0571094
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Aug 31 10:01:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 12:17:16 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    | 111 +++
 .../runtime/taskmanager/MultiShotLatch.java     |  57 ++
 .../flink/runtime/taskmanager/OneShotLatch.java |  21 +-
 .../flink-connector-filesystem/pom.xml          | 112 +++
 .../flink/streaming/connectors/fs/Bucketer.java |  52 ++
 .../flink/streaming/connectors/fs/Clock.java    |  32 +
 .../connectors/fs/DateTimeBucketer.java         | 124 +++
 .../connectors/fs/NonRollingBucketer.java       |  43 +
 .../streaming/connectors/fs/RollingSink.java    | 900 +++++++++++++++++++
 .../connectors/fs/SequenceFileWriter.java       | 160 ++++
 .../streaming/connectors/fs/StringWriter.java   | 103 +++
 .../streaming/connectors/fs/SystemClock.java    |  28 +
 .../flink/streaming/connectors/fs/Writer.java   |  64 ++
 .../src/main/resources/log4j.properties         |  27 +
 .../fs/RollingSinkFaultTolerance2ITCase.java    | 290 ++++++
 .../fs/RollingSinkFaultToleranceITCase.java     | 285 ++++++
 .../connectors/fs/RollingSinkITCase.java        | 503 +++++++++++
 .../src/test/resources/log4j-test.properties    |  27 +
 .../src/test/resources/logback-test.xml         |  30 +
 .../flink-streaming-connectors/pom.xml          |  19 +
 .../streaming/api/datastream/DataStream.java    |   6 +
 .../StreamFaultToleranceTestBase.java           |   2 +-
 22 files changed, 2992 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 081c602..c3a3ef6 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1526,6 +1526,7 @@ Currently these systems are supported:
 
  * [Apache Kafka](https://kafka.apache.org/) (sink/source)
  * [Elasticsearch](https://elastic.co/) (sink)
+ * [Hadoop FileSystem](http://hadoop.apache.org) (sink)
  * [RabbitMQ](http://www.rabbitmq.com/) (sink/source)
  * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
 
@@ -1828,6 +1829,116 @@ More about information about Elasticsearch can be found [here](https://elastic.c
 
 [Back to top](#top)
 
+### Hadoop FileSystem
+
+This connector provides a Sink that writes rolling files to any filesystem supported by
+Hadoop FileSystem. To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-filesystem</artifactId>
+  <version>{{site.version}}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Rolling File Sink
+
+The rolling behaviour as well as the writing can be configured but we will get to that later.
+This is how you can create a default rolling sink:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+input.addSink(new RollingSink<String>("/base/path"));
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new RollingSink("/base/path"))
+
+{% endhighlight %}
+</div>
+</div>
+
+The only required parameter is the base path where the rolling files (buckets) will be
+stored. The sink can be configured by specifying a custom bucketer, writer and batch size.
+
+By default the rolling sink will use the pattern `"yyyy-MM-dd--HH"` to name the rolling buckets.
+This pattern is passed to `SimpleDateFormat` with the current system time to form a bucket path. A
+new bucket will be created whenever the bucket path changes. For example, if you have a pattern
+that contains minutes as the finest granularity you will get a new bucket every minute.
+Each bucket is itself a directory that contains several part files: Each parallel instance
+of the sink will create its own part file and when part files get too big the sink will also
+create a new part file next to the others. To specify a custom bucketer use `setBucketer()`
+on a `RollingSink`.
+
+The default writer is `StringWriter`. This will call `toString()` on the incoming elements
+and write them to part files, separated by newline. To specify a custom writer use `setWriter()`
+on a `RollingSink`. If you want to write Hadoop SequenceFiles you can use the provided
+`SequenceFileWriter` which can also be configured to use compression.
+
+The last configuration option is the batch size. This specifies when a part file should be closed
+and a new one started. (The default part file size is 384 MB).
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<IntWritable,Text>> input = ...;
+
+RollingSink sink = new RollingSink<String>("/base/path");
+sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
+sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
+sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, 
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[Tuple2[IntWritable, Text]] = ...
+
+val sink = new RollingSink[String]("/base/path")
+sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
+sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, 
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+This will create a sink that writes to bucket files that follow this schema:
+
+```
+/base/path/{date-time}/part-{parallel-task}-{count}
+```
+
+Where `date-time` is the string that we get from the date/time format, `parallel-task` is the index
+of the parallel sink instance and `count` is the running number of part files that where created
+because of the batch size. 
+
+For in-depth information, please refer to the JavaDoc for 
+[RollingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html).
+
+[Back to top](#top)
+
 ### RabbitMQ
 
 This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following dependency to your project:

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java
new file mode 100644
index 0000000..7c55419
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+/**
+ * Latch for synchronizing parts of code in tests. In contrast to
+ * {@link org.apache.flink.runtime.taskmanager.OneShotLatch} this will reset the state once
+ * {@link #await()} returns.
+ *
+ * <p>
+ * A part of the code that should only run after other code calls {@link #await()}. The call
+ * will only return once the other part is finished and calls {@link #trigger()}.
+ */
+public final class MultiShotLatch {
+	
+	private final Object lock = new Object();
+	
+	private volatile boolean triggered;
+
+	/**
+	 * Fires the latch. Code that is blocked on {@link #await()} will now return.
+	 */
+	public void trigger() {
+		synchronized (lock) {
+			triggered = true;
+			lock.notifyAll();
+		}
+	}
+
+	/**
+	 * Waits until {@link #trigger())} is called.
+	 */
+	public void await() throws InterruptedException {
+		synchronized (lock) {
+			while (!triggered) {
+				lock.wait();
+			}
+			triggered = false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
index 97ec2ad..504ccca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
@@ -18,19 +18,34 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-final class OneShotLatch {
+/**
+ * Latch for synchronizing parts of code in tests. Once the latch has fired once calls to
+ * {@link #await()} will return immediately in the future.
+ *
+ * <p>
+ * A part of the code that should only run after other code calls {@link #await()}. The call
+ * will only return once the other part is finished and calls {@link #trigger()}.
+ */
+public final class OneShotLatch {
 	
 	private final Object lock = new Object();
 	
 	private boolean triggered;
-	
+
+	/**
+	 * Fires the latch. Code that is blocked on {@link #await()} will now return.
+	 */
 	public void trigger() {
 		synchronized (lock) {
 			triggered = true;
 			lock.notifyAll();
 		}
 	}
-	
+
+	/**
+	 * Waits until {@link #trigger())} is called. Once {@code trigger()} has been called this
+	 * call will always return immediately.
+	 */
 	public void await() throws InterruptedException {
 		synchronized (lock) {
 			while (!triggered) {

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/pom.xml
new file mode 100644
index 0000000..e0319b1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-filesystem</artifactId>
+	<name>flink-connector-filesystem</name>
+
+	<packaging>jar</packaging>
+
+	<!--
+		This is a Hadoop2 only flink module.
+	-->
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>${shading-artifact.name}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+		</dependency>
+
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
new file mode 100644
index 0000000..913da97
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * A bucketer is used with a {@link RollingSink}
+ * to put emitted elements into rolling files.
+ *
+ * <p>
+ * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
+ * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
+ * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
+ * based on system time.
+ */
+public interface Bucketer extends Serializable {
+
+	/**
+	 * Returns {@code true} when a new bucket should be started.
+	 *
+	 * @param currentBucketPath The bucket {@code Path} that is currently being used.
+	 */
+	boolean shouldStartNewBucket(Path basePath, Path currentBucketPath);
+
+	/**
+	 * Returns the {@link Path} of a new bucket file.
+	 *
+	 * @param basePath The base path containing all the buckets.
+	 *
+	 * @return The complete new {@code Path} of the new bucket. This should include the {@code basePath}
+	 *      and also the {@code subtaskIndex} tp avoid clashes with parallel sinks.
+	 */
+	Path getNextBucketPath(Path basePath);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
new file mode 100644
index 0000000..152c75a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+/**
+ * A clock that can provide the current time.
+ *
+ * <p>
+ * Normally this would be system time, but for testing a custom {@code Clock} can be provided.
+ */
+public interface Clock {
+
+	/**
+	 * Return the current system time in milliseconds.
+	 */
+	public long currentTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
new file mode 100644
index 0000000..0be40f5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * A {@link Bucketer} that assigns to buckets based on current system time.
+ *
+ * <p>
+ * The {@code DateTimeBucketer} will create directories of the following form:
+ * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
+ * that was specified as a base path when creating the
+ * {@link RollingSink}. The {@code dateTimePath}
+ * is determined based on the current system time and the user provided format string.
+ *
+ * <p>
+ * {@link SimpleDateFormat} is used to derive a date string from the current system time and
+ * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
+ * files will have a granularity of hours.
+ *
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
+ * }</pre>
+ *
+ * This will create for example the following bucket path:
+ * {@code /base/1976-12-31-14/}
+ *
+ */
+public class DateTimeBucketer implements Bucketer {
+
+	private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
+
+	// We have this so that we can manually set it for tests.
+	private static Clock clock = new SystemClock();
+
+	private final String formatString;
+
+	private transient SimpleDateFormat dateFormatter;
+
+	/**
+	 * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
+	 */
+	public DateTimeBucketer() {
+		this(DEFAULT_FORMAT_STRING);
+	}
+
+	/**
+	 * Creates a new {@code DateTimeBucketer} with the given date/time format string.
+	 *
+	 * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
+	 *                     the bucket path.
+	 */
+	public DateTimeBucketer(String formatString) {
+		this.formatString = formatString;
+
+		this.dateFormatter = new SimpleDateFormat(formatString);
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		this.dateFormatter = new SimpleDateFormat(formatString);
+	}
+
+
+	@Override
+	public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
+		String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
+		return !(new Path(basePath, newDateTimeString).equals(currentBucketPath));
+	}
+
+	@Override
+	public Path getNextBucketPath(Path basePath) {
+		String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
+		return new Path(basePath + "/" + newDateTimeString);
+	}
+
+	@Override
+	public String toString() {
+		return "DateTimeBucketer{" +
+				"formatString='" + formatString + '\'' +
+				'}';
+	}
+
+	/**
+	 * This sets the internal {@link Clock} implementation. This method should only be used for testing
+	 *
+	 * @param newClock The new clock to set.
+	 */
+	public static void setClock(Clock newClock) {
+		clock = newClock;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
new file mode 100644
index 0000000..1307d11
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link org.apache.flink.streaming.connectors.fs.Bucketer} that does not perform any
+ * rolling of files. All files are written to the base path.
+ */
+public class NonRollingBucketer implements Bucketer {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
+		return false;
+	}
+
+	@Override
+	public Path getNextBucketPath(Path basePath) {
+		return basePath;
+	}
+
+	@Override
+	public String toString() {
+		return "NonRollingBucketer";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
new file mode 100644
index 0000000..c705767
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -0,0 +1,900 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This
+ * is itegrated with the checkpointing mechanism to provide exactly once semantics.
+ *
+ * <p>
+ * When creating the sink a {@code basePath} must be specified. The base directory contains
+ * one directory for every bucket. The bucket directories themselves contain several part files.
+ * These contain the actual written data.
+ *
+ * <p>
+ * The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
+ * base directory. Whenever the {@code Bucketer} returns a different directory name than
+ * it returned before the sink will close the current part files inside that bucket
+ * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with
+ * date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer}
+ * using {@link #setBucketer(Bucketer)}. For example, use
+ * {@link org.apache.flink.streaming.connectors.fs.NonRollingBucketer} if you don't want to have
+ * buckets but still write part files in a fault-tolerant way.
+ *
+ * <p>
+ * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
+ * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is
+ * {@code "part"} but this can be
+ * configured using {@link #setPartPrefix(String)}. When a part file becomes bigger
+ * than the batch size the current part file is closed, the part counter is increased and
+ * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
+ * using {@link #setBatchSize(long)}.
+ *
+ * <p>
+ * Part files can be in one of three states: in-progress, pending or finished. The reason for this
+ * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
+ * and fault-tolerance. The part file that is currently being written to is in-progress. Once
+ * a part file is closed for writing it becomes pending. When a checkpoint is successful the
+ * currently pending files will be moved to finished. If a failure occurs the pending files
+ * will be deleted to reset state to the last checkpoint. The data in in-progress files will
+ * also have to be rolled back. If the {@code FileSystem} supports the {@code truncate} call
+ * this will be used to reset the file back to a previous state. If not, a special file
+ * with the same name as the part file and the suffix {@code ".valid-length"} will be written
+ * that contains the length up to which the file contains valid data. When reading the file
+ * it must be ensured that it is only read up to that point. The prefixes and suffixes for
+ * the different file states and valid-length files can be configured, for example with
+ * {@link #setPendingSuffix(String)}.
+ *
+ * <p>
+ * Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
+ * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work
+ * in a non-fault-tolerant way but still provide output without prefixes and suffixes.
+ *
+ * <p>
+ * The part files are written using an instance of {@link Writer}. By default
+ * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
+ * of {@code toString()} for every element. Separated by newlines. You can configure the writer
+ * using {@link #setWriter(Writer)}. For example,
+ * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write
+ * Hadoop {@code SequenceFiles}.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *     new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+ *         .setWriter(new SequenceFileWriter<IntWritable, Text>())
+ *         .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
+ * }</pre>
+ *
+ * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
+ *
+ * @see org.apache.flink.streaming.connectors.fs.DateTimeBucketer
+ * @see StringWriter
+ * @see SequenceFileWriter
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, Checkpointed<RollingSink.BucketState>, CheckpointNotifier {
+	private static final long serialVersionUID = 1L;
+
+	private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
+
+
+	// --------------------------------------------------------------------------------------------
+	//  User configuration values
+	// --------------------------------------------------------------------------------------------
+	// These are initialized with some defaults but are meant to be changeable by the user
+
+	/**
+	 * The default maximum size of part files.
+	 *
+	 * 6 times the default block size
+	 */
+	private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
+
+	/**
+	 * This is used for part files that we are writing to but which where not yet confirmed
+	 * by a checkpoint.
+	 */
+	private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
+
+	/**
+	 * See above, but for prefix
+	 */
+	private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
+
+	/**
+	 * This is used for part files that we are not writing to but which are not yet confirmed by
+	 * checkpoint.
+	 */
+	private final String DEFAULT_PENDING_SUFFIX = ".pending";
+
+	/**
+	 * See above, but for prefix.
+	 */
+	private final String DEFAULT_PENDING_PREFIX = "_";
+
+	/**
+	 * When truncate() is not supported on the used FileSystem we instead write a
+	 * file along the part file with this ending that contains the length up to which
+	 * the part file is valid.
+	 */
+	private final String DEFAULT_VALID_SUFFIX = ".valid-length";
+
+	/**
+	 * See above, but for prefix.
+	 */
+	private final String DEFAULT_VALID_PREFIX = "_";
+
+	/**
+	 * The default prefix for part files.
+	 */
+	private final String DEFAULT_PART_REFIX = "part";
+
+	/**
+	 * The base {@code Path} that stored all rolling bucket directories.
+	 */
+	private final String basePath;
+
+	/**
+	 * The {@code Bucketer} that is used to determine the path of bucket directories.
+	 */
+	private Bucketer bucketer;
+
+	/**
+	 * We have a template and call duplicate() for each parallel writer in open() to get the actual
+	 * writer that is used for the part files.
+	 */
+	private Writer<T> writerTemplate;
+
+	/**
+	 * The actual writer that we user for writing the part files.
+	 */
+	private Writer<T> writer;
+
+	/**
+	 * Maximum size of part files. If files exceed this we close and create a new one in the same
+	 * bucket directory.
+	 */
+	private long batchSize;
+
+	/**
+	 * If this is true we remove any leftover in-progress/pending files when the sink is opened.
+	 *
+	 * <p>
+	 * This should only be set to false if using the sink without checkpoints, to not remove
+	 * the files already in the directory.
+	 */
+	private boolean cleanupOnOpen = true;
+
+	// These are the actually configured prefixes/suffixes
+	private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
+	private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
+
+	private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
+	private String pendingPrefix = DEFAULT_PENDING_PREFIX;
+
+	private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
+	private String validLengthPrefix= DEFAULT_VALID_PREFIX;
+
+	private String partPrefix = DEFAULT_PART_REFIX;
+
+	/**
+	 * The part file that we are currently writing to.
+	 */
+	private transient Path currentPartPath;
+
+	/**
+	 * The bucket directory that we are currently filling.
+	 */
+	private transient Path currentBucketDirectory;
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal fields (not configurable by user)
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * The {@code FSDataOutputStream} for the current part file.
+	 */
+	private transient FSDataOutputStream outStream;
+
+	/**
+	 * Our subtask index, retrieved from the {@code RuntimeContext} in {@link #open}.
+	 */
+	private transient int subtaskIndex;
+
+	/**
+	 * For counting the part files inside a bucket directory. Part files follow the patter
+	 * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter.
+	 */
+	private transient int partCounter;
+
+	/**
+	 * We use reflection to get the hflush method or use sync as a fallback.
+	 * The idea for this and the code comes from the Flume HDFS Sink.
+	 */
+	private transient Method refHflushOrSync;
+
+	/**
+	 * We use reflection to get the .truncate() method, this is only available starting with
+	 * Hadoop 2.7
+	 */
+	private transient Method refTruncate;
+
+	/**
+	 * The state object that is handled by flink from snapshot/restore. In there we store the
+	 * current part file path, the valid length of the in-progress files and pending part files.
+	 */
+	private transient BucketState bucketState;
+
+	/**
+	 * Creates a new {@code RollingSink} that writes files to the given base directory.
+	 *
+	 * <p>
+	 * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
+	 * The maximum bucket size is set to 384 MB.
+	 *
+	 * @param basePath The directory to which to write the bucket files.
+	 */
+	public RollingSink(String basePath) {
+		this.basePath = basePath;
+		this.bucketer = new DateTimeBucketer();
+		this.batchSize = DEFAULT_BATCH_SIZE;
+		this.writerTemplate = new StringWriter<>();
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+		if (this.writerTemplate instanceof InputTypeConfigurable) {
+			((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
+		}
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+		partCounter = 0;
+
+		this.writer = writerTemplate.duplicate();
+
+		if (bucketState == null) {
+			bucketState = new BucketState();
+		}
+
+		FileSystem fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
+		refTruncate = reflectTruncate(fs);
+
+		// delete pending/in-progress files that might be left if we fail while
+		// no checkpoint has yet been done
+		try {
+			if (fs.exists(new Path(basePath)) && cleanupOnOpen) {
+				RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true);
+
+				while (bucketFiles.hasNext()) {
+					LocatedFileStatus file = bucketFiles.next();
+					if (file.getPath().toString().endsWith(pendingSuffix)) {
+						// only delete files that contain our subtask index
+						if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+							LOG.debug("Deleting leftover pending file {}", file.getPath().toString());
+							fs.delete(file.getPath(), true);
+						}
+					}
+					if (file.getPath().toString().endsWith(inProgressSuffix)) {
+						// only delete files that contain our subtask index
+						if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+							LOG.debug("Deleting leftover in-progress file {}", file.getPath().toString());
+							fs.delete(file.getPath(), true);
+						}
+					}
+				}
+			}
+		} catch (IOException e) {
+			LOG.error("Error while deleting leftover pending/in-progress files: {}", e);
+			throw new RuntimeException("Error while deleting leftover pending/in-progress files.", e);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+//		boolean interrupted = Thread.interrupted();
+		closeCurrentPartFile();
+
+//		if (interrupted) {
+//			Thread.currentThread().interrupt();
+//		}
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+
+		if (shouldRoll()) {
+			openNewPartFile();
+		}
+
+		writer.write(value);
+	}
+
+	/**
+	 * Determines whether we should change the bucket file we are writing to.
+	 *
+	 * <p>
+	 * This will roll if no file was created yet, if the file size is larger than the specified size
+	 * or if the {@code Bucketer} determines that we should roll.
+	 */
+	private boolean shouldRoll() throws IOException {
+		boolean shouldRoll = false;
+		if (outStream == null) {
+			shouldRoll = true;
+			LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex);
+		}
+		if (bucketer.shouldStartNewBucket(new Path(basePath), currentBucketDirectory)) {
+			shouldRoll = true;
+			LOG.debug("RollingSink {} starting new bucket because {} said we should. ", subtaskIndex, bucketer);
+			// we will retrieve a new bucket base path in openNewPartFile so reset the part counter
+			partCounter = 0;
+		}
+		if (outStream != null) {
+			long writePosition = outStream.getPos();
+			if (outStream != null && writePosition > batchSize) {
+				shouldRoll = true;
+				LOG.debug(
+						"RollingSink {} starting new bucket because file position {} is above batch size {}.",
+						subtaskIndex,
+						writePosition,
+						batchSize);
+			}
+		}
+		return shouldRoll;
+	}
+
+	/**
+	 * Opens a new part file.
+	 *
+	 * <p>
+	 * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
+	 */
+	private void openNewPartFile() throws Exception {
+		closeCurrentPartFile();
+
+		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+
+		FileSystem fs = new Path(basePath).getFileSystem(conf);
+
+		Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
+
+		if (!newBucketDirectory.equals(currentBucketDirectory)) {
+			currentBucketDirectory = newBucketDirectory;
+			try {
+				if (fs.mkdirs(currentBucketDirectory)) {
+					LOG.debug("Created new bucket directory: {}", currentBucketDirectory);
+				}
+			} catch (IOException e) {
+				throw new RuntimeException("Could not create base path for new rolling file.", e);
+			}
+		}
+
+
+		currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
+
+		// This should work since there is only one parallel subtask that tries names with
+		// our subtask id. Otherwise we would run into concurrency issues here.
+		while (fs.exists(currentPartPath) || fs.exists(new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix))) {
+			partCounter++;
+			currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
+		}
+
+		// increase, so we don't have to check for this name next time
+		partCounter++;
+
+		LOG.debug("Next part path is {}", currentPartPath.toString());
+
+		Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
+
+
+
+		outStream = fs.create(inProgressPath, false);
+
+		// We do the reflection here since this is the first time that we have a FSDataOutputStream
+		if (refHflushOrSync == null) {
+			refHflushOrSync = reflectHflushOrSync(outStream);
+		}
+
+		writer.open(outStream);
+	}
+
+	/**
+	 * Closes the current part file.
+	 *
+	 * <p>
+	 * This moves the current in-progress part file to a pending file and adds it to the list
+	 * of pending files in our bucket state.
+	 */
+	private void closeCurrentPartFile() throws Exception {
+		if (writer != null) {
+			writer.close();
+		}
+
+		if (outStream != null) {
+			hflushOrSync(outStream);
+			outStream.close();
+			outStream = null;
+		}
+		if (currentPartPath != null) {
+			Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
+			Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
+			FileSystem fs = inProgressPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+			fs.rename(inProgressPath, pendingPath);
+			LOG.debug("Moving in-progress bucket {} to pending file {}",
+					inProgressPath,
+					pendingPath);
+			this.bucketState.pendingFiles.add(currentPartPath.toString());
+		}
+	}
+
+	/**
+	 * If hflush is available in this version of HDFS, then this method calls
+	 * hflush, else it calls sync.
+	 * @param os - The stream to flush/sync
+	 * @throws java.io.IOException
+	 *
+	 * <p>
+	 * Note: This code comes from Flume
+	 */
+	protected void hflushOrSync(FSDataOutputStream os) throws IOException {
+		try {
+			// At this point the refHflushOrSync cannot be null,
+			// since register method would have thrown if it was.
+			this.refHflushOrSync.invoke(os);
+		} catch (InvocationTargetException e) {
+			String msg = "Error while trying to hflushOrSync!";
+			LOG.error(msg + " " + e.getCause());
+			Throwable cause = e.getCause();
+			if(cause != null && cause instanceof IOException) {
+				throw (IOException)cause;
+			}
+			throw new RuntimeException(msg, e);
+		} catch (Exception e) {
+			String msg = "Error while trying to hflushOrSync!";
+			LOG.error(msg + " " + e);
+			throw new RuntimeException(msg, e);
+		}
+	}
+
+	/**
+	 * Gets the hflush call using reflection. Fallback to sync if hflush is not available.
+	 *
+	 * <p>
+	 * Note: This code comes from Flume
+	 */
+	private Method reflectHflushOrSync(FSDataOutputStream os) {
+		Method m = null;
+		if(os != null) {
+			Class<?> fsDataOutputStreamClass = os.getClass();
+			try {
+				m = fsDataOutputStreamClass.getMethod("hflush");
+			} catch (NoSuchMethodException ex) {
+				LOG.debug("HFlush not found. Will use sync() instead");
+				try {
+					m = fsDataOutputStreamClass.getMethod("sync");
+				} catch (Exception ex1) {
+					String msg = "Neither hflush not sync were found. That seems to be " +
+							"a problem!";
+					LOG.error(msg);
+					throw new RuntimeException(msg, ex1);
+				}
+			}
+		}
+		return m;
+	}
+
+	/**
+	 * Gets the truncate() call using reflection.
+	 *
+	 * <p>
+	 * Note: This code comes from Flume
+	 */
+	private Method reflectTruncate(FileSystem fs) {
+		Method m = null;
+		if(fs != null) {
+			Class<?> fsClass = fs.getClass();
+			try {
+				m = fsClass.getMethod("truncate", Path.class, long.class);
+			} catch (NoSuchMethodException ex) {
+				LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
+						" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
+				return null;
+			}
+
+
+			// verify that truncate actually works
+			FSDataOutputStream outputStream;
+			Path testPath = new Path(UUID.randomUUID().toString());
+			try {
+				outputStream = fs.create(testPath);
+				outputStream.writeUTF("hello");
+				outputStream.close();
+			} catch (IOException e) {
+				LOG.error("Could not create file for checking if truncate works.", e);
+				throw new RuntimeException("Could not create file for checking if truncate works.", e);
+			}
+
+
+			try {
+				m.invoke(fs, testPath, 2);
+			} catch (IllegalAccessException | InvocationTargetException e) {
+				LOG.debug("Truncate is not supported.", e);
+				m = null;
+			}
+
+			try {
+				fs.delete(testPath, false);
+			} catch (IOException e) {
+				LOG.error("Could not delete truncate test file.", e);
+				throw new RuntimeException("Could not delete truncate test file.", e);
+			}
+		}
+		return m;
+	}
+
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		synchronized (bucketState.pendingFilesPerCheckpoint) {
+			Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
+			Set<Long> checkpointsToRemove = Sets.newHashSet();
+			for (Long pastCheckpointId : pastCheckpointIds) {
+				if (pastCheckpointId <= checkpointId) {
+					LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
+					// All the pending files are buckets that have been completed but are waiting to be renamed
+					// to their final name
+					for (String filename : bucketState.pendingFilesPerCheckpoint.get(
+							pastCheckpointId)) {
+						Path finalPath = new Path(filename);
+						Path pendingPath = new Path(finalPath.getParent(),
+								pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
+
+						FileSystem fs = pendingPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+						fs.rename(pendingPath, finalPath);
+						LOG.debug(
+								"Moving pending file {} to final location after complete checkpoint {}.",
+								pendingPath,
+								pastCheckpointId);
+					}
+					checkpointsToRemove.add(pastCheckpointId);
+				}
+			}
+			for (Long toRemove: checkpointsToRemove) {
+				bucketState.pendingFilesPerCheckpoint.remove(toRemove);
+			}
+		}
+	}
+
+	@Override
+	public BucketState snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		if (writer != null) {
+			writer.flush();
+		}
+		if (outStream != null) {
+			hflushOrSync(outStream);
+			bucketState.currentFile = currentPartPath.toString();
+			bucketState.currentFileValidLength = outStream.getPos();
+		}
+		synchronized (bucketState.pendingFilesPerCheckpoint) {
+			bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles);
+		}
+		bucketState.pendingFiles = Lists.newArrayList();
+		return bucketState;
+	}
+
+	@Override
+	public void restoreState(BucketState state) {
+		bucketState = state;
+		// we can clean all the pending files since they where renamed to final files
+		// after this checkpoint was successfull
+		bucketState.pendingFiles.clear();
+		FileSystem fs = null;
+		try {
+			fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem in checkpoint restore.", e);
+			throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);
+		}
+		if (bucketState.currentFile != null) {
+			// We were writing to a file when the last checkpoint occured. This file can either
+			// be still in-progress or became a pending file at some point after the checkpoint.
+			// Either way, we have to truncate it back to a valid state (or write a .valid-length)
+			// file that specifies up to which length it is valid and rename it to the final name
+			// before starting a new bucket file.
+			Path partPath = new Path(bucketState.currentFile);
+			try {
+				Path partPendingPath = new Path(partPath.getParent(), pendingPrefix + partPath.getName()).suffix(
+						pendingSuffix);
+				Path partInProgressPath = new Path(partPath.getParent(), inProgressPrefix + partPath.getName()).suffix(inProgressSuffix);
+
+				if (fs.exists(partPendingPath)) {
+					LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
+					// has been moved to pending in the mean time, rename to final location
+					fs.rename(partPendingPath, partPath);
+				} else if (fs.exists(partInProgressPath)) {
+					LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
+					// it was still in progress, rename to final path
+					fs.rename(partInProgressPath, partPath);
+				} else {
+					LOG.error("In-Progress file {} was neither moved to pending nor is still in progress.", bucketState.currentFile);
+					throw new RuntimeException("In-Progress file " + bucketState.currentFile+ " " +
+							"was neither moved to pending nor is still in progress.");
+				}
+
+				refTruncate = reflectTruncate(fs);
+				// truncate it or write a ".valid-length" file to specify up to which point it is valid
+				if (refTruncate != null) {
+					LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
+					refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
+				} else {
+					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
+					Path validLengthFilePath = new Path(partPath.getParent(), validLengthPrefix + partPath.getName()).suffix(validLengthSuffix);
+					FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
+					lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
+					lengthFileOut.close();
+				}
+
+				// invalidate in the state object
+				bucketState.currentFile = null;
+				bucketState.currentFileValidLength = -1;
+			} catch (IOException e) {
+				LOG.error("Error while restoring RollingSink state.", e);
+				throw new RuntimeException("Error while restoring RollingSink state.", e);
+			} catch (InvocationTargetException | IllegalAccessException e) {
+				LOG.error("Cound not invoke truncate.", e);
+				throw new RuntimeException("Could not invoke truncate.", e);
+			}
+		}
+
+		LOG.debug("Clearing pending/in-progress files.");
+
+		// Move files that are confirmed by a checkpoint but did not get moved to final location
+		// because the checkpoint notification did not happen before a failure
+
+		Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
+		LOG.debug("Moving pending files to final location on restore.");
+		for (Long pastCheckpointId : pastCheckpointIds) {
+			// All the pending files are buckets that have been completed but are waiting to be renamed
+			// to their final name
+			for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+				Path finalPath = new Path(filename);
+				Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
+
+				try {
+					if (fs.exists(pendingPath)) {
+						LOG.debug(
+								"Moving pending file {} to final location after complete checkpoint {}.",
+								pendingPath,
+								pastCheckpointId);
+						fs.rename(pendingPath, finalPath);
+					}
+				} catch (IOException e) {
+					LOG.error("Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
+					throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to final path " + finalPath, e);
+				}
+			}
+		}
+		bucketState.pendingFiles.clear();
+		synchronized (bucketState.pendingFilesPerCheckpoint) {
+			bucketState.pendingFilesPerCheckpoint.clear();
+		}
+
+		// we need to get this here since open() has not yet been called
+		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+		// delete pending files
+		try {
+
+			RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true);
+
+			while (bucketFiles.hasNext()) {
+				LocatedFileStatus file = bucketFiles.next();
+				if (file.getPath().toString().endsWith(pendingSuffix)) {
+					// only delete files that contain our subtask index
+					if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+						fs.delete(file.getPath(), true);
+					}
+				}
+				if (file.getPath().toString().endsWith(inProgressSuffix)) {
+					// only delete files that contain our subtask index
+					if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+						LOG.debug("Deleting in-progress file {}", file.getPath().toString());
+						fs.delete(file.getPath(), true);
+					}
+				}
+			}
+		} catch (IOException e) {
+			LOG.error("Error while deleting old pending files: {}", e);
+			throw new RuntimeException("Error while deleting old pending files.", e);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Setters for User configuration values
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Sets the maximum bucket size in bytes.
+	 *
+	 * <p>
+	 * When a bucket part file becomes larger than this size a new bucket part file is started and
+	 * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
+	 *
+	 * @param batchSize The bucket part file size in bytes.
+	 */
+	public RollingSink<T> setBatchSize(long batchSize) {
+		this.batchSize = batchSize;
+		return this;
+	}
+
+	/**
+	 * Sets the {@link Bucketer} to use for determining the bucket files to write to.
+	 *
+	 * @param bucketer The bucketer to use.
+	 */
+	public RollingSink<T> setBucketer(Bucketer bucketer) {
+		this.bucketer = bucketer;
+		return this;
+	}
+
+	/**
+	 * Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
+	 *
+	 * @param writer The {@code Writer} to use.
+	 */
+	public RollingSink<T> setWriter(Writer<T> writer) {
+		this.writerTemplate = writer;
+		return this;
+	}
+
+	/**
+	 * Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
+	 */
+	public RollingSink<T> setInProgressSuffix(String inProgressSuffix) {
+		this.inProgressSuffix = inProgressSuffix;
+		return this;
+	}
+
+	/**
+	 * Sets the prefix of in-progress part files. The default is {@code "_"}.
+	 */
+	public RollingSink<T> setInProgressPrefix(String inProgressPrefix) {
+		this.inProgressPrefix = inProgressPrefix;
+		return this;
+	}
+
+	/**
+	 * Sets the suffix of pending part files. The default is {@code ".pending"}.
+	 */
+	public RollingSink<T> setPendingSuffix(String pendingSuffix) {
+		this.pendingSuffix = pendingSuffix;
+		return this;
+	}
+
+	/**
+	 * Sets the prefix of pending part files. The default is {@code "_"}.
+	 */
+	public RollingSink<T> setPendingPrefix(String pendingPrefix) {
+		this.pendingPrefix = pendingPrefix;
+		return this;
+	}
+
+	/**
+	 * Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
+	 */
+	public RollingSink<T> setValidLengthSuffix(String validLengthSuffix) {
+		this.validLengthSuffix = validLengthSuffix;
+		return this;
+	}
+
+	/**
+	 * Sets the prefix of valid-length files. The default is {@code "_"}.
+	 */
+	public RollingSink<T> setValidLengthPrefix(String validLengthPrefix) {
+		this.validLengthPrefix = validLengthPrefix;
+		return this;
+	}
+
+	/**
+	 * Sets the prefix of part files.  The default is {@code "part"}.
+	 */
+	public RollingSink<T> setPartPrefix(String partPrefix) {
+		this.partPrefix = partPrefix;
+		return this;
+	}
+
+	/**
+	 * Disable cleanup of leftover in-progress/pending files when the sink is opened.
+	 *
+	 * <p>
+	 * This should only be disabled if using the sink without checkpoints, to not remove
+	 * the files already in the directory.
+	 */
+	public RollingSink<T> disableCleanupOnOpen() {
+		this.cleanupOnOpen = false;
+		return this;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal Classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * This is used for keeping track of the current in-progress files and files that we mark
+	 * for moving from pending to final location after we get a checkpoint-complete notification.
+	 */
+	static final class BucketState implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * The file that was in-progress when the last checkpoint occured.
+		 */
+		String currentFile = null;
+
+		/**
+		 * The valid length of the in-progress file at the time of the last checkpoint.
+		 */
+		long currentFileValidLength = -1;
+
+		/**
+		 * Pending files that accumulated since the last checkpoint.
+		 */
+		List<String> pendingFiles = Lists.newArrayList();
+
+		/**
+		 * When doing a checkpoint we move the pending files since the last checkpoint to this map
+		 * with the id of the checkpoint. When we get the checkpoint-complete notification we move
+		 * pending files of completed checkpoints to their final location.
+		 */
+		final Map<Long, List<String>> pendingFilesPerCheckpoint = Maps.newHashMap();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
new file mode 100644
index 0000000..928d96e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}.
+ * The input to the {@link RollingSink} must
+ * be a {@link org.apache.flink.api.java.tuple.Tuple2} of two Hadopo
+ * {@link org.apache.hadoop.io.Writable Writables}.
+ *
+ * @param <K> The type of the first tuple field.
+ * @param <V> The type of the second tuple field.
+ */
+public class SequenceFileWriter<K extends Writable, V extends Writable> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
+	private static final long serialVersionUID = 1L;
+
+	private final String compressionCodecName;
+
+	private SequenceFile.CompressionType compressionType;
+
+	private transient FSDataOutputStream outputStream;
+
+	private transient SequenceFile.Writer writer;
+
+	private Class<K> keyClass;
+
+	private Class<V> valueClass;
+
+	/**
+	 * Creates a new {@code SequenceFileWriter} that writes sequence files without compression.
+	 */
+	public SequenceFileWriter() {
+		this("None", SequenceFile.CompressionType.NONE);
+	}
+
+	/**
+	 * Creates a new {@code SequenceFileWriter} that writes sequence with the given
+	 * compression codec and compression type.
+	 *
+	 * @param compressionCodecName Name of a Hadoop Compression Codec.
+	 * @param compressionType The compression type to use.
+	 */
+	public SequenceFileWriter(String compressionCodecName,
+			SequenceFile.CompressionType compressionType) {
+		this.compressionCodecName = compressionCodecName;
+		this.compressionType = compressionType;
+	}
+
+	@Override
+	public void open(FSDataOutputStream outStream) throws IOException {
+		if (outputStream != null) {
+			throw new IllegalStateException("SequenceFileWriter has already been opened.");
+		}
+		if (keyClass == null) {
+			throw new IllegalStateException("Key Class has not been initialized.");
+		}
+		if (valueClass == null) {
+			throw new IllegalStateException("Value Class has not been initialized.");
+		}
+
+		this.outputStream = outStream;
+
+		CompressionCodec codec = null;
+
+		if (!compressionCodecName.equals("None")) {
+			CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration());
+			codec = codecFactory.getCodecByName(compressionCodecName);
+			if (codec == null) {
+				throw new RuntimeException("Codec " + compressionCodecName + " not found.");
+			}
+		}
+
+		// the non-deprecated constructor syntax is only available in recent hadoop versions...
+		writer = SequenceFile.createWriter(new Configuration(),
+				outStream,
+				keyClass,
+				valueClass,
+				compressionType,
+				codec);
+	}
+
+	@Override
+	public void flush() throws IOException {
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (writer != null) {
+			writer.close();
+		}
+		writer = null;
+		outputStream = null;
+	}
+
+	@Override
+	public void write(Tuple2<K, V> element) throws IOException {
+		if (outputStream == null) {
+			throw new IllegalStateException("SequenceFileWriter has not been opened.");
+		}
+		writer.append(element.f0, element.f1);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+		if (!type.isTupleType()) {
+			throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
+		}
+
+		TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;
+
+		if (tupleType.getArity() != 2) {
+			throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
+		}
+
+		TypeInformation<K> keyType = tupleType.getTypeAt(0);
+		TypeInformation<V> valueType = tupleType.getTypeAt(1);
+
+		this.keyClass = keyType.getTypeClass();
+		this.valueClass = valueType.getTypeClass();
+	}
+
+	@Override
+	public Writer<Tuple2<K, V>> duplicate() {
+		SequenceFileWriter<K, V> result = new SequenceFileWriter<>(compressionCodecName, compressionType);
+		result.keyClass = keyClass;
+		result.valueClass = valueClass;
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
new file mode 100644
index 0000000..ad0ab46
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
+
+/**
+ * A {@link Writer} that uses {@code toString()} on the input elements and writes them to
+ * the output bucket file separated by newline.
+ *
+ * @param <T> The type of the elements that are being written by the sink.
+ */
+public class StringWriter<T> implements Writer<T> {
+	private static final long serialVersionUID = 1L;
+
+	private transient FSDataOutputStream outputStream;
+
+	private String charsetName;
+
+	private transient Charset charset;
+
+	/**
+	 * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
+	 * strings to bytes.
+	 */
+	public StringWriter() {
+		this("UTF-8");
+	}
+
+	/**
+	 * Creates a new {@code StringWriter} that uses the given charset to convert
+	 * strings to bytes.
+	 *
+	 * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
+	 */
+	public StringWriter(String charsetName) {
+		this.charsetName = charsetName;
+	}
+
+	@Override
+	public void open(FSDataOutputStream outStream) throws IOException {
+		if (outputStream != null) {
+			throw new IllegalStateException("StringWriter has already been opened.");
+		}
+		this.outputStream = outStream;
+
+		try {
+			this.charset = Charset.forName(charsetName);
+		}
+		catch (IllegalCharsetNameException e) {
+			throw new IOException("The charset " + charsetName + " is not valid.", e);
+		}
+		catch (UnsupportedCharsetException e) {
+			throw new IOException("The charset " + charsetName + " is not supported.", e);
+		}
+	}
+
+	@Override
+	public void flush() throws IOException {
+
+	}
+
+	@Override
+	public void close() throws IOException {
+		outputStream = null;
+	}
+
+	@Override
+	public void write(T element) throws IOException {
+		if (outputStream == null) {
+			throw new IllegalStateException("StringWriter has not been opened.");
+		}
+		outputStream.write(element.toString().getBytes(charset));
+		outputStream.write('\n');
+
+	}
+
+	@Override
+	public Writer<T> duplicate() {
+		return new StringWriter<>();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
new file mode 100644
index 0000000..2bab8cf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+/**
+ * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.
+ */
+public class SystemClock implements Clock {
+	@Override
+	public long currentTimeMillis() {
+		return System.currentTimeMillis();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
new file mode 100644
index 0000000..98cad32
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An implementation of {@code Writer} is used in conjunction with a
+ * {@link RollingSink} to perform the actual
+ * writing to the bucket files.
+ *
+ * @param <T> The type of the elements that are being written by the sink.
+ */
+public interface Writer<T> extends Serializable {
+
+	/**
+	 * Initializes the {@code Writer} for a newly opened bucket file.
+	 * Any internal per-bucket initialization should be performed here.
+	 *
+	 * @param outStream The {@link org.apache.hadoop.fs.FSDataOutputStream} for the newly opened file.
+	 */
+	void open(FSDataOutputStream outStream) throws IOException;
+
+	/**
+	 * Flushes out any internally held data.
+	 */
+	void flush()throws IOException ;
+
+	/**
+	 * Closes the {@code Writer}. This must not close the {@code FSDataOutputStream} that
+	 * was handed in in the {@link #open} method. Only internally held state should be
+	 * closed.
+	 */
+	void close() throws IOException ;
+
+	/**
+	 * Writes one element to the bucket file.
+	 */
+	void write(T element)throws IOException;
+
+	/**
+	 * Duplicates the {@code Writer}. This is used to get one {@code Writer} for each
+	 * parallel instance of the sink.
+	 */
+	Writer<T> duplicate();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
new file mode 100644
index 0000000..fe60d94
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/35dcceb9/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
new file mode 100644
index 0000000..9c70ed2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
@@ -0,0 +1,290 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.fs;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+* Tests for {@link RollingSink}.
+*
+* <p>
+* This test only verifies the exactly once behaviour of the sink. Another test tests the
+* rolling behaviour.
+*
+* <p>
+* This differs from RollingSinkFaultToleranceITCase in that the checkpoint interval is extremely
+* high. This provokes the case that the sink restarts without any checkpoint having been performed.
+* This tests the initial cleanup of pending/in-progress files.
+*/
+public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBase {
+
+	final long NUM_STRINGS = 16_000;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+	private static String hdfsURI;
+
+	private static String outPath;
+
+
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+		Configuration conf = new Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+
+		outPath = hdfsURI + "/string-non-rolling-out-no-checkpoint";
+
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
+	}
+
+
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+		int PARALLELISM = 6;
+
+		env.enableCheckpointing(Long.MAX_VALUE);
+		env.setParallelism(PARALLELISM);
+		env.disableOperatorChaining();
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+		DataStream<String> mapped = stream
+				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new NonRollingBucketer())
+				.setBatchSize(5000)
+				.setValidLengthPrefix("")
+				.setPendingPrefix("");
+
+		mapped.addSink(sink);
+
+	}
+
+	@Override
+	public void postSubmit() throws Exception {
+		// We read the files and verify that we have read all the strings. If a valid-length
+		// file exists we only read the file to that point. (This test should work with
+		// FileSystems that support truncate() and with others as well.)
+
+		Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+		// Keep a set of the message IDs that we read. The size must equal the read count and
+		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
+		// elements twice.
+		Set<Integer> readNumbers = Sets.newHashSet();
+		int numRead = 0;
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
+				outPath), true);
+
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+
+			if (!file.getPath().toString().endsWith(".valid-length")) {
+				int validLength = (int) file.getLen();
+				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
+					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
+					String validLengthString = inStream.readUTF();
+					validLength = Integer.parseInt(validLengthString);
+					System.out.println("VALID LENGTH: " + validLength);
+				}
+				FSDataInputStream inStream = dfs.open(file.getPath());
+				byte[] buffer = new byte[validLength];
+				inStream.readFully(0, buffer, 0, validLength);
+				inStream.close();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+
+				InputStreamReader inStreamReader = new InputStreamReader(bais);
+				BufferedReader br = new BufferedReader(inStreamReader);
+
+				String line = br.readLine();
+				while (line != null) {
+					Matcher matcher = messageRegex.matcher(line);
+					if (matcher.matches()) {
+						numRead++;
+						int messageId = Integer.parseInt(matcher.group(1));
+						readNumbers.add(messageId);
+					} else {
+						Assert.fail("Read line does not match expected pattern.");
+					}
+					line = br.readLine();
+				}
+				br.close();
+				inStreamReader.close();
+				bais.close();
+			}
+		}
+
+		// Verify that we read all strings (at-least-once)
+		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+		// Verify that we don't have duplicates (boom!, exactly-once)
+		Assert.assertEquals(NUM_STRINGS, numRead);
+	}
+
+	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private long count;
+
+
+		OnceFailingIdentityMapper(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
+			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+			implements CheckpointedAsynchronously<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final long numElements;
+
+		private int index;
+
+		private volatile boolean isRunning = true;
+
+
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
+
+			while (isRunning && index < numElements) {
+
+				Thread.sleep(1);
+				synchronized (lockingObject) {
+					ctx.collect("message " + index);
+					index += step;
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+	}
+}


[4/4] flink git commit: [FLINK-2643] [build] Change Travis Build Matrix to Include Recent Hadoop Versions

Posted by se...@apache.org.
[FLINK-2643] [build] Change Travis Build Matrix to Include Recent Hadoop Versions

This removes 2.0.0-alpha. There are currently bugs that prevent 2.3.0
and 2.7.0 from being used.

This closes #1084


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b234b0b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b234b0b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b234b0b1

Branch: refs/heads/master
Commit: b234b0b16d01c0f843ae6031337a8a2a3ba16d5b
Parents: e31a4d8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 11 13:40:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 12:17:39 2015 +0200

----------------------------------------------------------------------
 .travis.yml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b234b0b1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 792afb7..181a131 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,9 @@ matrix:
     - jdk: "oraclejdk7" # this will also deploy a uberjar to s3 at some point
       env: PROFILE="-Dhadoop.profile=1"
     - jdk: "openjdk7"
-      env: PROFILE="-P!include-yarn -Dhadoop.version=2.0.0-alpha"
-    - jdk: "oraclejdk7"
-      env: PROFILE="-Dhadoop.version=2.2.0"
+      env: PROFILE="-Dhadoop.version=2.4.0"
+    - jdk: "oraclejdk8"
+      env: PROFILE="-Dhadoop.version=2.5.0 -Dmaven.javadoc.skip=true"
     - jdk: "oraclejdk8"
       env: PROFILE="-Dhadoop.version=2.6.0 -Dscala-2.11 -Pinclude-tez -Dmaven.javadoc.skip=true"
 


[3/4] flink git commit: [hotfix] [build] Fix curator deps, Exclude Curator deps in Tests

Posted by se...@apache.org.
[hotfix] [build] Fix curator deps, Exclude Curator deps in Tests

Tests would fail because of version conflicts because the tests include
the original curator dependencies even though we shade them away in the
final build result.

This also fixes dependency management entries for curator dependencies.
We shade it, therefore it cannot be in the dependency management
section in the root pom.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e31a4d8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e31a4d8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e31a4d8a

Branch: refs/heads/master
Commit: e31a4d8a32448641a95f48edb1d85db6d9a0a257
Parents: 35dcceb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 9 17:29:04 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 12:17:31 2015 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                                  | 12 ++++++++++++
 .../flink-connector-kafka/pom.xml                      |  2 +-
 flink-test-utils/pom.xml                               |  1 +
 flink-tests/pom.xml                                    | 10 ++++++++++
 flink-yarn/pom.xml                                     |  1 +
 pom.xml                                                | 13 -------------
 6 files changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e31a4d8a/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5b8d0ca..ca3486b 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -201,6 +201,8 @@ under the License.
 		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+			<scope>test</scope>
 		</dependency>
 
 	</dependencies>
@@ -338,6 +340,11 @@ under the License.
 						<exclude>**/TestData.java</exclude>
 						<exclude>**/TestInstanceListener.java</exclude>
 					</excludes>
+					<classpathDependencyExcludes>
+						<classpathDependencyExclude>org.apache.curator:curator-recipes</classpathDependencyExclude>
+						<classpathDependencyExclude>org.apache.curator:curator-client</classpathDependencyExclude>
+						<classpathDependencyExclude>org.apache.curator:curator-framework</classpathDependencyExclude>
+					</classpathDependencyExcludes>
 					<systemPropertyVariables>
 						<log.level>WARN</log.level>
 					</systemPropertyVariables>
@@ -350,6 +357,11 @@ under the License.
 					<excludes>
 						<exclude>**/TestData.java</exclude>
 					</excludes>
+					<classpathDependencyExcludes>
+						<classpathDependencyExclude>org.apache.curator:curator-recipes</classpathDependencyExclude>
+						<classpathDependencyExclude>org.apache.curator:curator-client</classpathDependencyExclude>
+						<classpathDependencyExclude>org.apache.curator:curator-framework</classpathDependencyExclude>
+					</classpathDependencyExcludes>
 					<systemPropertyVariables>
 						<log.level>WARN</log.level>
 					</systemPropertyVariables>

http://git-wip-us.apache.org/repos/asf/flink/blob/e31a4d8a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
index 98d7f0d..528fc88 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
@@ -101,7 +101,7 @@ under the License.
 		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>
-			<version>2.7.1</version>
+			<version>${curator.version}</version>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e31a4d8a/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index 7c1ac3b..c8c3d15 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -93,6 +93,7 @@ under the License.
 		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
 			<scope>compile</scope>
 		</dependency>
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/e31a4d8a/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 1f5bfda..b4bddef 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -312,6 +312,11 @@ under the License.
 					<excludes>
 						<exclude>**/*TestBase*.class</exclude>
 					</excludes>
+					<classpathDependencyExcludes>
+						<classpathDependencyExclude>org.apache.curator:curator-recipes</classpathDependencyExclude>
+						<classpathDependencyExclude>org.apache.curator:curator-client</classpathDependencyExclude>
+						<classpathDependencyExclude>org.apache.curator:curator-framework</classpathDependencyExclude>
+					</classpathDependencyExcludes>
 					<reuseForks>false</reuseForks>
 				</configuration>
 			</plugin>
@@ -322,6 +327,11 @@ under the License.
 					<systemPropertyVariables>
 						<log.level>WARN</log.level>
 					</systemPropertyVariables>
+					<classpathDependencyExcludes>
+						<classpathDependencyExclude>org.apache.curator:curator-recipes</classpathDependencyExclude>
+						<classpathDependencyExclude>org.apache.curator:curator-client</classpathDependencyExclude>
+						<classpathDependencyExclude>org.apache.curator:curator-framework</classpathDependencyExclude>
+					</classpathDependencyExcludes>
 				</configuration>
 			</plugin>
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/e31a4d8a/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 56795c4..f851b8c 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -60,6 +60,7 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
 			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e31a4d8a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6220ac8..082ece0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -336,19 +336,6 @@ under the License.
 			</dependency>
 
 			<dependency>
-				<groupId>org.apache.curator</groupId>
-				<artifactId>curator-recipes</artifactId>
-				<version>${curator.version}</version>
-			</dependency>
-
-			<dependency>
-				<groupId>org.apache.curator</groupId>
-				<artifactId>curator-test</artifactId>
-				<version>${curator.version}</version>
-				<scope>test</scope>
-			</dependency>
-
-			<dependency>
 				<groupId>org.apache.zookeeper</groupId>
 				<artifactId>zookeeper</artifactId>
 				<version>${zookeeper.version}</version>