You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:29 UTC

[39/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
new file mode 100644
index 0000000..80ae294
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -0,0 +1,991 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link RollingSink}. These
+ * tests test the different output methods as well as the rolling feature using a manual clock
+ * that increases time in lockstep with element computation using latches.
+ *
+ * <p>
+ * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
+ * exactly once behaviour.
+ *
+ * @deprecated should be removed with the {@link RollingSink}.
+ */
+@Deprecated
+public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class);
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	protected static MiniDFSCluster hdfsCluster;
+	protected static org.apache.hadoop.fs.FileSystem dfs;
+	protected static String hdfsURI;
+	protected static Configuration conf = new Configuration();
+
+	protected static File dataDir;
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+
+		LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
+
+		dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/";
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
+		hdfsCluster.shutdown();
+	}
+
+	/**
+	 * This tests {@link StringWriter} with
+	 * non-rolling output.
+	 */
+	@Test
+	public void testNonRollingStringWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/string-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		source
+				.map(new MapFunction<Tuple2<Integer,String>, String>() {
+					private static final long serialVersionUID = 1L;
+					@Override
+					public String map(Tuple2<Integer, String> value) throws Exception {
+						return value.f1;
+					}
+				})
+				.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+	}
+
+	/**
+	 * This tests {@link SequenceFileWriter}
+	 * with non-rolling output and without compression.
+	 */
+	@Test
+	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
+				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
+			}
+		});
+
+
+		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+				.setWriter(new SequenceFileWriter<IntWritable, Text>())
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		IntWritable intWritable = new IntWritable();
+		Text txt = new Text();
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+	}
+
+	/**
+	 * This tests {@link SequenceFileWriter}
+	 * with non-rolling output but with compression.
+	 */
+	@Test
+	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/seq-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
+				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
+			}
+		});
+
+
+		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+				.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		IntWritable intWritable = new IntWritable();
+		Text txt = new Text();
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+	}
+	
+	
+	/**
+	 * This tests {@link AvroKeyValueSinkWriter}
+	 * with non-rolling output and without compression.
+	 */
+	@Test
+	public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+
+		Map<String, String> properties = new HashMap<>();
+		Schema keySchema = Schema.create(Type.INT);
+		Schema valueSchema = Schema.create(Type.STRING);
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
+		RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath)
+				.setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties))
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		source.addSink(sink);
+
+		env.execute("RollingSink Avro KeyValue Writer Test");
+
+		GenericData.setStringType(valueSchema, StringType.String);
+		Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema);
+		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
+			int key = wrappedEntry.getKey().intValue();
+			Assert.assertEquals(i, key);
+			String value = wrappedEntry.getValue();
+			Assert.assertEquals("message #" + i, value);
+		}
+
+		dataFileStream.close();
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+		dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
+			int key = wrappedEntry.getKey().intValue();
+			Assert.assertEquals(i, key);
+			String value = wrappedEntry.getValue();
+			Assert.assertEquals("message #" + i, value);
+		}
+
+		dataFileStream.close();
+		inStream.close();
+	}
+	
+	/**
+	 * This tests {@link AvroKeyValueSinkWriter}
+	 * with non-rolling output and with compression.
+	 */
+	@Test
+	public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+
+		Map<String, String> properties = new HashMap<>();
+		Schema keySchema = Schema.create(Type.INT);
+		Schema valueSchema = Schema.create(Type.STRING);
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true));
+		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
+		RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath)
+				.setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties))
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		source.addSink(sink);
+
+		env.execute("RollingSink Avro KeyValue Writer Test");
+
+		GenericData.setStringType(valueSchema, StringType.String);
+		Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema);
+		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
+			int key = wrappedEntry.getKey().intValue();
+			Assert.assertEquals(i, key);
+			String value = wrappedEntry.getValue();
+			Assert.assertEquals("message #" + i, value);
+		}
+
+		dataFileStream.close();
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+		dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
+			int key = wrappedEntry.getKey().intValue();
+			Assert.assertEquals(i, key);
+			String value = wrappedEntry.getValue();
+			Assert.assertEquals("message #" + i, value);
+		}
+
+		dataFileStream.close();
+		inStream.close();
+	}
+
+
+	/**
+	 * This tests user defined hdfs configuration
+	 * @throws Exception
+     */
+	@Test
+	public void testUserDefinedConfiguration() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/string-non-rolling-with-config";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+			.broadcast()
+			.filter(new OddEvenFilter());
+
+		Configuration conf = new Configuration();
+		conf.set("io.file.buffer.size", "40960");
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+			.setFSConfig(conf)
+			.setWriter(new StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960"))
+			.setBucketer(new NonRollingBucketer())
+			.setPartPrefix("part")
+			.setPendingPrefix("")
+			.setPendingSuffix("");
+
+		source
+			.map(new MapFunction<Tuple2<Integer,String>, String>() {
+				private static final long serialVersionUID = 1L;
+				@Override
+				public String map(Tuple2<Integer, String> value) throws Exception {
+					return value.f1;
+				}
+			})
+			.addSink(sink);
+
+		env.execute("RollingSink with configuration Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+	}
+
+	// we use this to synchronize the clock changes to elements being processed
+	final static MultiShotLatch latch1 = new MultiShotLatch();
+	final static MultiShotLatch latch2 = new MultiShotLatch();
+
+	/**
+	 * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
+	 * produce rolling files. The clock of DateTimeBucketer is set to
+	 * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using
+	 * latches.
+	 */
+	@Test
+	public void testDateTimeRollingStringWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/rolling-out";
+		DateTimeBucketer.setClock(new ModifyableClock());
+		ModifyableClock.setCurrentTime(0);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
+				NUM_ELEMENTS))
+				.broadcast();
+
+		// the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
+		// fire the latch
+		DataStream<String> mapped = source
+				.flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() {
+					private static final long serialVersionUID = 1L;
+
+					int count = 0;
+					@Override
+					public void flatMap(Tuple2<Integer, String> value,
+							Collector<String> out) throws Exception {
+						out.collect(value.f1);
+						count++;
+						if (count >= 5) {
+							if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+								latch1.trigger();
+							} else {
+								latch2.trigger();
+							}
+							count = 0;
+						}
+					}
+
+				});
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new DateTimeBucketer("ss"))
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true);
+
+		// we should have 8 rolling files, 4 time intervals and parallelism of 2
+		int numFiles = 0;
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+			numFiles++;
+			if (file.getPath().toString().contains("rolling-out/00")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 0; i < 5; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/05")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 5; i < 10; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/10")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 10; i < 15; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/15")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 15; i < 20; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else {
+				Assert.fail("File " + file + " does not match any expected roll pattern.");
+			}
+		}
+
+		Assert.assertEquals(8, numFiles);
+	}
+
+	private static final String PART_PREFIX = "part";
+	private static final String PENDING_SUFFIX = ".pending";
+	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+	@Test
+	public void testBucketStateTransitions() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0);
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+
+		// we have a bucket size of 5 bytes, so each record will get its own bucket,
+		// i.e. the bucket should roll after every record.
+
+		testHarness.processElement(new StreamRecord<>("test1", 1L));
+		testHarness.processElement(new StreamRecord<>("test2", 1L));
+		checkFs(outDir, 1, 1 ,0, 0);
+
+		testHarness.processElement(new StreamRecord<>("test3", 1L));
+		checkFs(outDir, 1, 2, 0, 0);
+
+		testHarness.snapshot(0, 0);
+		checkFs(outDir, 1, 2, 0, 0);
+
+		testHarness.notifyOfCompletedCheckpoint(0);
+		checkFs(outDir, 1, 0, 2, 0);
+
+		OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+		testHarness.close();
+		checkFs(outDir, 0, 1, 2, 0);
+
+		testHarness = createRescalingTestSink(outDir, 1, 0);
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+		checkFs(outDir, 0, 0, 3, 1);
+
+		snapshot = testHarness.snapshot(2, 0);
+
+		testHarness.processElement(new StreamRecord<>("test4", 10));
+		checkFs(outDir, 1, 0, 3, 1);
+
+		testHarness = createRescalingTestSink(outDir, 1, 0);
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		// the in-progress file remains as we do not clean up now
+		checkFs(outDir, 1, 0, 3, 1);
+
+		testHarness.close();
+
+		// at close it is not moved to final because it is not part
+		// of the current task's state, it was just a not cleaned up leftover.
+		checkFs(outDir, 1, 0, 3, 1);
+	}
+
+	@Test
+	public void testScalingDown() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0);
+		testHarness1.setup();
+		testHarness1.open();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1);
+		testHarness2.setup();
+		testHarness2.open();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2);
+		testHarness3.setup();
+		testHarness3.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		checkFs(outDir, 1, 0, 0, 0);
+
+		testHarness2.processElement(new StreamRecord<>("test2", 0L));
+		testHarness2.processElement(new StreamRecord<>("test3", 0L));
+		testHarness2.processElement(new StreamRecord<>("test4", 0L));
+		testHarness2.processElement(new StreamRecord<>("test5", 0L));
+		testHarness2.processElement(new StreamRecord<>("test6", 0L));
+		checkFs(outDir, 2, 4, 0, 0);
+
+		testHarness3.processElement(new StreamRecord<>("test7", 0L));
+		testHarness3.processElement(new StreamRecord<>("test8", 0L));
+		checkFs(outDir, 3, 5, 0, 0);
+
+		// intentionally we snapshot them in a not ascending order so that the states are shuffled
+		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
+			testHarness3.snapshot(0, 0),
+			testHarness1.snapshot(0, 0),
+			testHarness2.snapshot(0, 0)
+		);
+
+		// with the above state reshuffling, we expect testHarness4 to take the
+		// state of the previous testHarness3 and testHarness1 while testHarness5
+		// will take that of the previous testHarness1
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness4 = createRescalingTestSink(outDir, 2, 0);
+		testHarness4.setup();
+		testHarness4.initializeState(mergedSnapshot);
+		testHarness4.open();
+
+		// we do not have a length file for part-2-0 because bucket part-2-0
+		// was not "in-progress", but "pending" (its full content is valid).
+		checkFs(outDir, 1, 4, 3, 2);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness5 = createRescalingTestSink(outDir, 2, 1);
+		testHarness5.setup();
+		testHarness5.initializeState(mergedSnapshot);
+		testHarness5.open();
+
+		checkFs(outDir, 0, 0, 8, 3);
+	}
+
+	@Test
+	public void testScalingUp() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0);
+		testHarness1.setup();
+		testHarness1.open();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0);
+		testHarness2.setup();
+		testHarness2.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness2.processElement(new StreamRecord<>("test3", 0L));
+		testHarness2.processElement(new StreamRecord<>("test4", 0L));
+		testHarness2.processElement(new StreamRecord<>("test5", 0L));
+
+		checkFs(outDir, 2, 3, 0, 0);
+
+		// intentionally we snapshot them in the reverse order so that the states are shuffled
+		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
+			testHarness2.snapshot(0, 0),
+			testHarness1.snapshot(0, 0)
+		);
+
+		testHarness1 = createRescalingTestSink(outDir, 3, 0);
+		testHarness1.setup();
+		testHarness1.initializeState(mergedSnapshot);
+		testHarness1.open();
+
+		checkFs(outDir, 1, 1, 3, 1);
+
+		testHarness2 = createRescalingTestSink(outDir, 3, 1);
+		testHarness2.setup();
+		testHarness2.initializeState(mergedSnapshot);
+		testHarness2.open();
+
+		checkFs(outDir, 0, 0, 5, 2);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2);
+		testHarness3.setup();
+		testHarness3.initializeState(mergedSnapshot);
+		testHarness3.open();
+
+		checkFs(outDir, 0, 0, 5, 2);
+
+		testHarness1.processElement(new StreamRecord<>("test6", 0));
+		testHarness2.processElement(new StreamRecord<>("test6", 0));
+		testHarness3.processElement(new StreamRecord<>("test6", 0));
+
+		// 3 for the different tasks
+		checkFs(outDir, 3, 0, 5, 2);
+
+		testHarness1.snapshot(1, 0);
+		testHarness2.snapshot(1, 0);
+		testHarness3.snapshot(1, 0);
+
+		testHarness1.close();
+		testHarness2.close();
+		testHarness3.close();
+
+		checkFs(outDir, 0, 3, 5, 2);
+	}
+
+	private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink(
+		File outDir, int totalParallelism, int taskIdx) throws Exception {
+
+		RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		return createTestSink(sink, totalParallelism, taskIdx);
+	}
+
+	private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
+		RollingSink<T> sink, int totalParallelism, int taskIdx) throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
+	}
+
+	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+		int inProg = 0;
+		int pend = 0;
+		int compl = 0;
+		int val = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			String path = file.getPath();
+			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+				inProg++;
+			} else if (path.endsWith(PENDING_SUFFIX)) {
+				pend++;
+			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+				val++;
+			} else if (path.contains(PART_PREFIX)) {
+				compl++;
+			}
+		}
+
+		Assert.assertEquals(inprogress, inProg);
+		Assert.assertEquals(pending, pend);
+		Assert.assertEquals(completed, compl);
+		Assert.assertEquals(valid, val);
+	}
+
+	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		private final int numElements;
+
+		public TestSourceFunction(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < numElements && running; i++) {
+				ctx.collect(Tuple2.of(i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	/**
+	 * This waits on the two multi-shot latches. The latches are triggered in a parallel
+	 * flatMap inside the test topology.
+	 */
+	private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		private final int numElements;
+
+		public WaitingTestSourceFunction(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < numElements && running; i++) {
+				if (i % 5 == 0 && i > 0) {
+					// update the clock after "five seconds", so we get 20 seconds in total
+					// with 5 elements in each time window
+					latch1.await();
+					latch2.await();
+					ModifyableClock.setCurrentTime(i * 1000);
+				}
+				ctx.collect(Tuple2.of(i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+
+	private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
+		private String key;
+		private String expect;
+		public StreamWriterWithConfigCheck(String key, String expect) {
+			this.key = key;
+			this.expect = expect;
+		}
+
+		@Override
+		public void open(FileSystem fs, Path path) throws IOException {
+			super.open(fs, path);
+			Assert.assertEquals(expect, fs.getConf().get(key));
+		}
+
+		@Override
+		public Writer<T> duplicate() {
+			return new StreamWriterWithConfigCheck<>(key, expect);
+		}
+	}
+
+	public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Tuple2<Integer, String> value) throws Exception {
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				return value.f0 % 2 == 0;
+			} else {
+				return value.f0 % 2 == 1;
+			}
+		}
+	}
+
+	public static class ModifyableClock implements Clock {
+
+		private static volatile long currentTime = 0;
+
+		public static void setCurrentTime(long currentTime) {
+			ModifyableClock.currentTime = currentTime;
+		}
+
+		@Override
+		public long currentTimeMillis() {
+			return currentTime;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
new file mode 100644
index 0000000..eb12d07
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.VersionInfo;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+
+/**
+ * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment
+ * Note: only executed for Hadoop version > 3.x.x
+ */
+public class RollingSinkSecuredITCase extends RollingSinkITCase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
+
+	/**
+	 * Skips all tests if the Hadoop version doesn't match.
+	 * We can't run this test class until HDFS-9213 is fixed which allows a secure DataNode
+	 * to bind to non-privileged ports for testing.
+	 * For now, we skip this test class until Hadoop version 3.x.x.
+	 */
+	private static void skipIfHadoopVersionIsNotAppropriate() {
+		// Skips all tests if the Hadoop version doesn't match
+		String hadoopVersionString = VersionInfo.getVersion();
+		String[] split = hadoopVersionString.split("\\.");
+		if (split.length != 3) {
+			throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString);
+		}
+		Assume.assumeTrue(
+			// check whether we're running Hadoop version >= 3.x.x
+			Integer.parseInt(split[0]) >= 3
+		);
+	}
+
+	/*
+	 * override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations
+	 * and out-of-order sequence for secure cluster
+	 */
+	@BeforeClass
+	public static void setup() throws Exception {}
+
+	@AfterClass
+	public static void teardown() throws Exception {}
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {}
+
+	@AfterClass
+	public static void destroyHDFS() {}
+
+	@BeforeClass
+	public static void startSecureCluster() throws Exception {
+
+		skipIfHadoopVersionIsNotAppropriate();
+
+		LOG.info("starting secure cluster environment for testing");
+
+		dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+
+		SecureTestEnvironment.prepare(tempFolder);
+
+		populateSecureConfigurations();
+
+		Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+				SecureTestEnvironment.getTestKeytab());
+		flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+				SecureTestEnvironment.getHadoopServicePrincipal());
+
+		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
+		ctx.setHadoopConfiguration(conf);
+		try {
+			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
+		} catch (Exception e) {
+			throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
+		}
+
+		File hdfsSiteXML = new File(dataDir.getAbsolutePath() + "/hdfs-site.xml");
+
+		FileWriter writer = new FileWriter(hdfsSiteXML);
+		conf.writeXml(writer);
+		writer.flush();
+		writer.close();
+
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath());
+		TestBaseUtils.setEnv(map);
+
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		builder.checkDataNodeAddrConfig(true);
+		builder.checkDataNodeHostConfig(true);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/";
+
+		startSecureFlinkClusterWithRecoveryModeEnabled();
+	}
+
+	@AfterClass
+	public static void teardownSecureCluster() throws Exception {
+		LOG.info("tearing down secure cluster environment");
+
+		TestStreamEnvironment.unsetAsContext();
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
+
+		SecureTestEnvironment.cleanup();
+	}
+
+	private static void populateSecureConfigurations() {
+
+		String dataTransferProtection = "authentication";
+
+		SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
+		conf.set(DFS_NAMENODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+		conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab());
+		conf.set(DFS_DATANODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+		conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab());
+		conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
+
+		conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+		conf.set("dfs.data.transfer.protection", dataTransferProtection);
+
+		conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name());
+
+		conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false");
+
+		conf.setInt("dfs.datanode.socket.write.timeout", 0);
+
+		/*
+		 * We ae setting the port number to privileged port - see HDFS-9213
+		 * This requires the user to have root privilege to bind to the port
+		 * Use below command (ubuntu) to set privilege to java process for the
+		 * bind() to work if the java process is not running as root.
+		 * setcap 'cap_net_bind_service=+ep' /path/to/java
+		 */
+		conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002");
+		conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost");
+		conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
+	}
+
+	private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
+		try {
+			LOG.info("Starting Flink and ZK in secure mode");
+
+			dfs.mkdirs(new Path("/flink/checkpoints"));
+			dfs.mkdirs(new Path("/flink/recovery"));
+
+			org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
+
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+			config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
+			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
+
+			SecureTestEnvironment.populateFlinkSecureConfigurations(config);
+
+			cluster = TestBaseUtils.startCluster(config, false);
+			TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
+
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	/* For secure cluster testing, it is enough to run only one test and override below test methods
+	 * to keep the overall build time minimal
+	 */
+	@Override
+	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {}
+
+	@Override
+	public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {}
+
+	@Override
+	public void testDateTimeRollingStringWriter() throws Exception {}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
new file mode 100644
index 0000000..54703a3
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link BucketingSink}.
+ *
+ * <p>
+ * This test only verifies the exactly once behaviour of the sink. Another test tests the
+ * rolling behaviour.
+ */
+public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
+
+	final long NUM_STRINGS = 16_000;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+
+	private static String outPath;
+
+	private static final String PENDING_SUFFIX = ".pending";
+	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+		Configuration conf = new Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		outPath = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/string-non-rolling-out";
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
+	}
+
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+		int PARALLELISM = 12;
+
+		env.enableCheckpointing(20);
+		env.setParallelism(PARALLELISM);
+		env.disableOperatorChaining();
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+		DataStream<String> mapped = stream
+				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
+
+		BucketingSink<String> sink = new BucketingSink<String>(outPath)
+				.setBucketer(new BasePathBucketer<String>())
+				.setBatchSize(10000)
+				.setValidLengthPrefix("")
+				.setPendingPrefix("")
+				.setPendingSuffix(PENDING_SUFFIX)
+				.setInProgressSuffix(IN_PROGRESS_SUFFIX);
+
+		mapped.addSink(sink);
+
+	}
+
+	@Override
+	public void postSubmit() throws Exception {
+		// We read the files and verify that we have read all the strings. If a valid-length
+		// file exists we only read the file to that point. (This test should work with
+		// FileSystems that support truncate() and with others as well.)
+
+		Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+		// Keep a set of the message IDs that we read. The size must equal the read count and
+		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
+		// elements twice.
+		Set<Integer> readNumbers = Sets.newHashSet();
+
+		HashSet<String> uniqMessagesRead = new HashSet<>();
+		HashSet<String> messagesInCommittedFiles = new HashSet<>();
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
+				outPath), true);
+
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+
+			if (!file.getPath().toString().endsWith(".valid-length")) {
+				int validLength = (int) file.getLen();
+				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
+					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
+					String validLengthString = inStream.readUTF();
+					validLength = Integer.parseInt(validLengthString);
+					System.out.println("VALID LENGTH: " + validLength);
+				}
+				FSDataInputStream inStream = dfs.open(file.getPath());
+				byte[] buffer = new byte[validLength];
+				inStream.readFully(0, buffer, 0, validLength);
+				inStream.close();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+
+				InputStreamReader inStreamReader = new InputStreamReader(bais);
+				BufferedReader br = new BufferedReader(inStreamReader);
+
+				String line = br.readLine();
+				while (line != null) {
+					Matcher matcher = messageRegex.matcher(line);
+					if (matcher.matches()) {
+						uniqMessagesRead.add(line);
+
+						// check that in the committed files there are no duplicates
+						if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX)) {
+							if (!messagesInCommittedFiles.add(line)) {
+								Assert.fail("Duplicate entry in committed bucket.");
+							}
+						}
+
+						int messageId = Integer.parseInt(matcher.group(1));
+						readNumbers.add(messageId);
+					} else {
+						Assert.fail("Read line does not match expected pattern.");
+					}
+					line = br.readLine();
+				}
+				br.close();
+				inStreamReader.close();
+				bais.close();
+			}
+		}
+
+		// Verify that we read all strings (at-least-once)
+		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+		// Verify that we don't have duplicates (boom!, exactly-once)
+		Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size());
+	}
+
+	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private long count;
+
+
+		OnceFailingIdentityMapper(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
+			long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+			implements CheckpointedAsynchronously<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final long numElements;
+
+		private int index;
+
+		private volatile boolean isRunning = true;
+
+
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
+
+			while (isRunning && index < numElements) {
+
+				Thread.sleep(1);
+				synchronized (lockingObject) {
+					ctx.collect("message " + index);
+					index += step;
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
new file mode 100644
index 0000000..d671874
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -0,0 +1,867 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class BucketingSinkTest {
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+	private static String hdfsURI;
+
+	private static final String PART_PREFIX = "part";
+	private static final String PENDING_SUFFIX = ".pending";
+	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+	private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink(
+		File outDir, int totalParallelism, int taskIdx, long inactivityInterval) throws Exception {
+
+		BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath())
+			.setBucketer(new Bucketer<String>() {
+				private static final long serialVersionUID = 1L;
+
+				@Override
+				public Path getBucketPath(Clock clock, Path basePath, String element) {
+					return new Path(basePath, element);
+				}
+			})
+			.setWriter(new StringWriter<String>())
+			.setInactiveBucketCheckInterval(inactivityInterval)
+			.setInactiveBucketThreshold(inactivityInterval)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		return createTestSink(sink, totalParallelism, taskIdx);
+	}
+
+	private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, int totalParallelism, int taskIdx) throws Exception {
+		BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath())
+			.setBucketer(new Bucketer<String>() {
+				private static final long serialVersionUID = 1L;
+
+				@Override
+				public Path getBucketPath(Clock clock, Path basePath, String element) {
+					return new Path(basePath, element);
+				}
+			})
+			.setWriter(new StringWriter<String>())
+			.setPartPrefix(PART_PREFIX)
+			.setPendingPrefix("")
+			.setInactiveBucketCheckInterval(5*60*1000L)
+			.setInactiveBucketThreshold(5*60*1000L)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX);
+
+		return createTestSink(sink, totalParallelism, taskIdx);
+	}
+
+	private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
+			BucketingSink<T> sink, int totalParallelism, int taskIdx) throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
+	}
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+		Configuration conf = new Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://"
+			+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+			+ "/";
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		hdfsCluster.shutdown();
+	}
+
+	@Test
+	public void testInactivityPeriodWithLateNotify() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100);
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.processElement(new StreamRecord<>("test1", 1L));
+		testHarness.processElement(new StreamRecord<>("test2", 1L));
+		checkFs(outDir, 2, 0 ,0, 0);
+
+		testHarness.setProcessingTime(101L);	// put some in pending
+		checkFs(outDir, 0, 2, 0, 0);
+
+		testHarness.snapshot(0, 0);				// put them in pending for 0
+		checkFs(outDir, 0, 2, 0, 0);
+
+		testHarness.processElement(new StreamRecord<>("test3", 1L));
+		testHarness.processElement(new StreamRecord<>("test4", 1L));
+
+		testHarness.setProcessingTime(202L);	// put some in pending
+
+		testHarness.snapshot(1, 0);				// put them in pending for 1
+		checkFs(outDir, 0, 4, 0, 0);
+
+		testHarness.notifyOfCompletedCheckpoint(0);	// put the pending for 0 to the "committed" state
+		checkFs(outDir, 0, 2, 2, 0);
+
+		testHarness.notifyOfCompletedCheckpoint(1); // put the pending for 1 to the "committed" state
+		checkFs(outDir, 0, 0, 4, 0);
+	}
+
+	@Test
+	public void testBucketStateTransitions() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100);
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.processElement(new StreamRecord<>("test1", 1L));
+		testHarness.processElement(new StreamRecord<>("test2", 1L));
+		checkFs(outDir, 2, 0 ,0, 0);
+
+		// this is to check the inactivity threshold
+		testHarness.setProcessingTime(101L);
+		checkFs(outDir, 0, 2, 0, 0);
+
+		testHarness.processElement(new StreamRecord<>("test3", 1L));
+		checkFs(outDir, 1, 2, 0, 0);
+
+		testHarness.snapshot(0, 0);
+		checkFs(outDir, 1, 2, 0, 0);
+
+		testHarness.notifyOfCompletedCheckpoint(0);
+		checkFs(outDir, 1, 0, 2, 0);
+
+		OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+		testHarness.close();
+		checkFs(outDir, 0, 1, 2, 0);
+
+		testHarness = createRescalingTestSink(outDir, 1, 0, 100);
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+		checkFs(outDir, 0, 0, 3, 1);
+
+		snapshot = testHarness.snapshot(2, 0);
+
+		testHarness.processElement(new StreamRecord<>("test4", 10));
+		checkFs(outDir, 1, 0, 3, 1);
+
+		testHarness = createRescalingTestSink(outDir, 1, 0, 100);
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		// the in-progress file remains as we do not clean up now
+		checkFs(outDir, 1, 0, 3, 1);
+
+		testHarness.close();
+
+		// at close it is not moved to final because it is not part
+		// of the current task's state, it was just a not cleaned up leftover.
+		checkFs(outDir, 1, 0, 3, 1);
+	}
+
+	@Test
+	public void testSameParallelismWithShufflingStates() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
+		testHarness1.setup();
+		testHarness1.open();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
+		testHarness2.setup();
+		testHarness2.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		checkFs(outDir, 1, 0, 0, 0);
+
+		testHarness2.processElement(new StreamRecord<>("test2", 0L));
+		checkFs(outDir, 2, 0, 0, 0);
+
+		// intentionally we snapshot them in the reverse order so that the states are shuffled
+		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
+			testHarness2.snapshot(0, 0),
+			testHarness1.snapshot(0, 0)
+		);
+
+		checkFs(outDir, 2, 0, 0, 0);
+
+		// this will not be included in any checkpoint so it can be cleaned up (although we do not)
+		testHarness2.processElement(new StreamRecord<>("test3", 0L));
+		checkFs(outDir, 3, 0, 0, 0);
+
+		testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
+		testHarness1.setup();
+		testHarness1.initializeState(mergedSnapshot);
+		testHarness1.open();
+
+		// the one in-progress will be the one assigned to the next instance,
+		// the other is the test3 which is just not cleaned up
+		checkFs(outDir, 2, 0, 1, 1);
+
+		testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
+		testHarness2.setup();
+		testHarness2.initializeState(mergedSnapshot);
+		testHarness2.open();
+
+		checkFs(outDir, 1, 0, 2, 2);
+
+		testHarness1.close();
+		testHarness2.close();
+
+		// the 1 in-progress can be discarded.
+		checkFs(outDir, 1, 0, 2, 2);
+	}
+
+	@Test
+	public void testScalingDown() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0, 100);
+		testHarness1.setup();
+		testHarness1.open();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1, 100);
+		testHarness2.setup();
+		testHarness2.open();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100);
+		testHarness3.setup();
+		testHarness3.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		checkFs(outDir, 1, 0, 0, 0);
+
+		testHarness2.processElement(new StreamRecord<>("test2", 0L));
+		checkFs(outDir, 2, 0, 0, 0);
+
+		testHarness3.processElement(new StreamRecord<>("test3", 0L));
+		testHarness3.processElement(new StreamRecord<>("test4", 0L));
+		checkFs(outDir, 4, 0, 0, 0);
+
+		// intentionally we snapshot them in the reverse order so that the states are shuffled
+		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
+			testHarness3.snapshot(0, 0),
+			testHarness1.snapshot(0, 0),
+			testHarness2.snapshot(0, 0)
+		);
+
+		testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
+		testHarness1.setup();
+		testHarness1.initializeState(mergedSnapshot);
+		testHarness1.open();
+
+		checkFs(outDir, 1, 0, 3, 3);
+
+		testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
+		testHarness2.setup();
+		testHarness2.initializeState(mergedSnapshot);
+		testHarness2.open();
+
+		checkFs(outDir, 0, 0, 4, 4);
+	}
+
+	@Test
+	public void testScalingUp() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
+		testHarness1.setup();
+		testHarness1.open();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0, 100);
+		testHarness2.setup();
+		testHarness2.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 1L));
+		testHarness1.processElement(new StreamRecord<>("test2", 1L));
+
+		checkFs(outDir, 2, 0, 0, 0);
+
+		testHarness2.processElement(new StreamRecord<>("test3", 1L));
+		testHarness2.processElement(new StreamRecord<>("test4", 1L));
+		testHarness2.processElement(new StreamRecord<>("test5", 1L));
+
+		checkFs(outDir, 5, 0, 0, 0);
+
+		// intentionally we snapshot them in the reverse order so that the states are shuffled
+		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
+			testHarness2.snapshot(0, 0),
+			testHarness1.snapshot(0, 0)
+		);
+
+		testHarness1 = createRescalingTestSink(outDir, 3, 0, 100);
+		testHarness1.setup();
+		testHarness1.initializeState(mergedSnapshot);
+		testHarness1.open();
+
+		checkFs(outDir, 2, 0, 3, 3);
+
+		testHarness2 = createRescalingTestSink(outDir, 3, 1, 100);
+		testHarness2.setup();
+		testHarness2.initializeState(mergedSnapshot);
+		testHarness2.open();
+
+		checkFs(outDir, 0, 0, 5, 5);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100);
+		testHarness3.setup();
+		testHarness3.initializeState(mergedSnapshot);
+		testHarness3.open();
+
+		checkFs(outDir, 0, 0, 5, 5);
+
+		testHarness1.processElement(new StreamRecord<>("test6", 0));
+		testHarness2.processElement(new StreamRecord<>("test6", 0));
+		testHarness3.processElement(new StreamRecord<>("test6", 0));
+
+		checkFs(outDir, 3, 0, 5, 5);
+
+		testHarness1.snapshot(1, 0);
+		testHarness2.snapshot(1, 0);
+		testHarness3.snapshot(1, 0);
+
+		testHarness1.close();
+		testHarness2.close();
+		testHarness3.close();
+
+		checkFs(outDir, 0, 3, 5, 5);
+	}
+
+	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+		int inProg = 0;
+		int pend = 0;
+		int compl = 0;
+		int val = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			String path = file.getPath();
+			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+				inProg++;
+			} else if (path.endsWith(PENDING_SUFFIX)) {
+				pend++;
+			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+				val++;
+			} else if (path.contains(PART_PREFIX)) {
+				compl++;
+			}
+		}
+
+		Assert.assertEquals(inprogress, inProg);
+		Assert.assertEquals(pending, pend);
+		Assert.assertEquals(completed, compl);
+		Assert.assertEquals(valid, val);
+	}
+
+	/**
+	 * This tests {@link StringWriter} with
+	 * non-bucketing output.
+	 */
+	@Test
+	public void testNonRollingStringWriter() throws Exception {
+		final String outPath = hdfsURI + "/string-non-rolling-out";
+
+		final int numElements = 20;
+
+		BucketingSink<String> sink = new BucketingSink<String>(outPath)
+			.setBucketer(new BasePathBucketer<String>())
+			.setPartPrefix(PART_PREFIX)
+			.setPendingPrefix("")
+			.setPendingSuffix("");
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.setup();
+		testHarness.open();
+
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i)));
+		}
+
+		testHarness.close();
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0"));
+
+		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 0; i < numElements; i++) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+	}
+
+	/**
+	 * This tests {@link SequenceFileWriter}
+	 * with non-rolling output and without compression.
+	 */
+	@Test
+	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
+		final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
+
+		final int numElements = 20;
+
+		BucketingSink<Tuple2<IntWritable, Text>> sink = new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
+			.setWriter(new SequenceFileWriter<IntWritable, Text>())
+			.setBucketer(new BasePathBucketer<Tuple2<IntWritable, Text>>())
+			.setPartPrefix(PART_PREFIX)
+			.setPendingPrefix("")
+			.setPendingSuffix("");
+
+		sink.setInputType(TypeInformation.of(new TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, Object> testHarness =
+			createTestSink(sink, 1, 0);
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.setup();
+		testHarness.open();
+
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>(Tuple2.of(
+				new IntWritable(i),
+				new Text("message #" + Integer.toString(i))
+			)));
+		}
+
+		testHarness.close();
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0"));
+
+		SequenceFile.Reader reader = new SequenceFile.Reader(inStream, 1000, 0, 100000, new Configuration());
+
+		IntWritable intWritable = new IntWritable();
+		Text txt = new Text();
+
+		for (int i = 0; i < numElements; i++) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+	}
+
+	/**
+	 * This tests {@link AvroKeyValueSinkWriter}
+	 * with non-rolling output and with compression.
+	 */
+	@Test
+	public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {
+		final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
+
+		final int numElements = 20;
+
+		Map<String, String> properties = new HashMap<>();
+		Schema keySchema = Schema.create(Schema.Type.INT);
+		Schema valueSchema = Schema.create(Schema.Type.STRING);
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true));
+		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
+
+		BucketingSink<Tuple2<Integer, String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath)
+			.setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties))
+			.setBucketer(new BasePathBucketer<Tuple2<Integer, String>>())
+			.setPartPrefix(PART_PREFIX)
+			.setPendingPrefix("")
+			.setPendingSuffix("");
+
+		OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness =
+			createTestSink(sink, 1, 0);
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.setup();
+		testHarness.open();
+
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>(Tuple2.of(
+				i, "message #" + Integer.toString(i)
+			)));
+		}
+
+		testHarness.close();
+
+		GenericData.setStringType(valueSchema, GenericData.StringType.String);
+		Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0"));
+
+		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema);
+		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader);
+		for (int i = 0; i < numElements; i++) {
+			AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry =
+				new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
+			int key = wrappedEntry.getKey();
+			Assert.assertEquals(i, key);
+			String value = wrappedEntry.getValue();
+			Assert.assertEquals("message #" + i, value);
+		}
+
+		dataFileStream.close();
+		inStream.close();
+	}
+
+	/**
+	 * This uses {@link DateTimeBucketer} to
+	 * produce rolling files. We use {@link OneInputStreamOperatorTestHarness} to manually
+	 * advance processing time.
+	 */
+	@Test
+	public void testDateTimeRollingStringWriter() throws Exception {
+		final int numElements = 20;
+
+		final String outPath = hdfsURI + "/rolling-out";
+
+		BucketingSink<String> sink = new BucketingSink<String>(outPath)
+			.setBucketer(new DateTimeBucketer<String>("ss"))
+			.setPartPrefix(PART_PREFIX)
+			.setPendingPrefix("")
+			.setPendingSuffix("");
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.setup();
+		testHarness.open();
+
+		for (int i = 0; i < numElements; i++) {
+			// Every 5 elements, increase the clock time. We should end up with 5 elements per bucket.
+			if (i % 5 == 0) {
+				testHarness.setProcessingTime(i * 1000L);
+			}
+			testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i)));
+		}
+
+		testHarness.close();
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true);
+
+		// We should have 4 rolling files across 4 time intervals
+		int numFiles = 0;
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+			numFiles++;
+			if (file.getPath().toString().contains("rolling-out/00")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 0; i < 5; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/05")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 5; i < 10; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/10")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 10; i < 15; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/15")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 15; i < 20; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else {
+				Assert.fail("File " + file + " does not match any expected roll pattern.");
+			}
+		}
+
+		Assert.assertEquals(4, numFiles);
+	}
+
+	/**
+	 * This uses a custom bucketing function which determines the bucket from the input.
+	 */
+	@Test
+	public void testCustomBucketing() throws Exception {
+		File dataDir = tempFolder.newFolder();
+
+		final int numIds = 4;
+		final int numElements = 20;
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, 1, 0);
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.setup();
+		testHarness.open();
+
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>(Integer.toString(i % numIds)));
+		}
+
+		testHarness.close();
+
+		// we should have 4 buckets, with 1 file each
+		int numFiles = 0;
+		for (File file: FileUtils.listFiles(dataDir, null, true)) {
+			if (file.getName().startsWith(PART_PREFIX)) {
+				numFiles++;
+			}
+		}
+
+		Assert.assertEquals(4, numFiles);
+	}
+
+	/**
+	 * This uses a custom bucketing function which determines the bucket from the input.
+	 * We use a simulated clock to reduce the number of buckets being written to over time.
+	 * This causes buckets to become 'inactive' and their file parts 'closed' by the sink.
+	 */
+	@Test
+	public void testCustomBucketingInactiveBucketCleanup() throws Exception {
+		File dataDir = tempFolder.newFolder();
+
+		final int step1NumIds = 4;
+		final int step2NumIds = 2;
+		final int numElementsPerStep = 20;
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, 1, 0);
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.setup();
+		testHarness.open();
+
+		for (int i = 0; i < numElementsPerStep; i++) {
+			testHarness.processElement(new StreamRecord<>(Integer.toString(i % step1NumIds)));
+		}
+
+		testHarness.setProcessingTime(2*60*1000L);
+
+		for (int i = 0; i < numElementsPerStep; i++) {
+			testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds)));
+		}
+
+		testHarness.setProcessingTime(6*60*1000L);
+
+		for (int i = 0; i < numElementsPerStep; i++) {
+			testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds)));
+		}
+
+		// we should have 4 buckets, with 1 file each
+		// 2 of these buckets should have been finalised due to becoming inactive
+		int numFiles = 0;
+		int numInProgress = 0;
+		for (File file: FileUtils.listFiles(dataDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			if (file.getPath().endsWith(IN_PROGRESS_SUFFIX)) {
+				numInProgress++;
+			}
+			numFiles++;
+		}
+
+		testHarness.close();
+
+		Assert.assertEquals(4, numFiles);
+		Assert.assertEquals(2, numInProgress);
+	}
+
+	/**
+	 * This tests user defined hdfs configuration
+	 * @throws Exception
+	 */
+	@Test
+	public void testUserDefinedConfiguration() throws Exception {
+		final String outPath = hdfsURI + "/string-non-rolling-with-config";
+		final int numElements = 20;
+
+		Map<String, String> properties = new HashMap<>();
+		Schema keySchema = Schema.create(Schema.Type.INT);
+		Schema valueSchema = Schema.create(Schema.Type.STRING);
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true));
+		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
+
+		Configuration conf = new Configuration();
+		conf.set("io.file.buffer.size", "40960");
+
+		BucketingSink<Tuple2<Integer,String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath)
+			.setFSConfig(conf)
+			.setWriter(new StreamWriterWithConfigCheck<Integer, String>(properties, "io.file.buffer.size", "40960"))
+			.setBucketer(new BasePathBucketer<Tuple2<Integer,String>>())
+			.setPartPrefix(PART_PREFIX)
+			.setPendingPrefix("")
+			.setPendingSuffix("");
+
+		OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness =
+			createTestSink(sink, 1, 0);
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.setup();
+		testHarness.open();
+
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>(Tuple2.of(
+				i, "message #" + Integer.toString(i)
+			)));
+		}
+
+		testHarness.close();
+
+		GenericData.setStringType(valueSchema, GenericData.StringType.String);
+		Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0"));
+
+		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema);
+		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader);
+		for (int i = 0; i < numElements; i++) {
+			AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry =
+				new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
+			int key = wrappedEntry.getKey();
+			Assert.assertEquals(i, key);
+			String value = wrappedEntry.getValue();
+			Assert.assertEquals("message #" + i, value);
+		}
+
+		dataFileStream.close();
+		inStream.close();
+	}
+
+	private static class StreamWriterWithConfigCheck<K, V> extends AvroKeyValueSinkWriter<K, V> {
+		private Map<String, String> properties;
+		private String key;
+		private String expect;
+		public StreamWriterWithConfigCheck(Map<String, String> properties, String key, String expect) {
+			super(properties);
+			this.properties = properties;
+			this.key = key;
+			this.expect = expect;
+		}
+
+		@Override
+		public void open(FileSystem fs, Path path) throws IOException {
+			super.open(fs, path);
+			Assert.assertEquals(expect, fs.getConf().get(key));
+		}
+
+		@Override
+		public Writer<Tuple2<K, V>> duplicate() {
+			return new StreamWriterWithConfigCheck<>(properties, key, expect);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..5c22851
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file