You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/28 08:59:00 UTC

[1/2] flink git commit: [FLINK-4378] Allow Setting Custom Configuration in BucketingSink

Repository: flink
Updated Branches:
  refs/heads/master fd08ad2e7 -> e9b20ec21


[FLINK-4378] Allow Setting Custom Configuration in BucketingSink


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9b20ec2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9b20ec2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9b20ec2

Branch: refs/heads/master
Commit: e9b20ec21ddd5f7d1440bccfb8622dcb47443fef
Parents: f8b162e
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Thu Oct 27 11:44:41 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 28 10:56:24 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  | 63 ++++++++++++--
 .../fs/bucketing/BucketingSinkTest.java         | 90 ++++++++++++++++++++
 2 files changed, 144 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9b20ec2/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 52de00d..0791fb5 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -281,7 +281,15 @@ public class BucketingSink<T>
 	 */
 	private transient State<T> state;
 
-	private transient org.apache.hadoop.conf.Configuration hadoopConf;
+	/**
+	 * User-defined FileSystem parameters
+	 */
+	private Configuration fsConfig = null;
+
+	/**
+	 * The FileSystem reference.
+	 */
+	private transient FileSystem fs;
 
 	private transient Clock clock;
 
@@ -302,6 +310,28 @@ public class BucketingSink<T>
 		this.writerTemplate = new StringWriter<>();
 	}
 
+	/**
+	 * Specify a custom {@code Configuration} that will be used when creating
+	 * the {@link FileSystem} for writing.
+	 */
+	public BucketingSink<T> setFSConfig(Configuration config) {
+		this.fsConfig = new Configuration();
+		fsConfig.addAll(config);
+		return this;
+	}
+
+	/**
+	 * Specify a custom {@code Configuration} that will be used when creating
+	 * the {@link FileSystem} for writing.
+	 */
+	public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
+		this.fsConfig = new Configuration();
+		for(Map.Entry<String, String> entry : config) {
+			fsConfig.setString(entry.getKey(), entry.getValue());
+		};
+		return this;
+	}
+
 	@Override
 	@SuppressWarnings("unchecked")
 	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
@@ -319,8 +349,7 @@ public class BucketingSink<T>
 		state = new State<T>();
 
 		Path baseDirectory = new Path(basePath);
-		hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-		FileSystem fs = baseDirectory.getFileSystem(hadoopConf);
+		initFileSystem();
 		refTruncate = reflectTruncate(fs);
 
 		processingTimeService =
@@ -369,6 +398,27 @@ public class BucketingSink<T>
 		}
 	}
 
+	/**
+	 * create a file system with the user defined hdfs config
+	 * @throws IOException
+	 */
+	private void initFileSystem() throws IOException {
+		if(fs != null) {
+			return;
+		}
+		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+		if(fsConfig != null) {
+			String disableCacheName
+				= String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()});
+			hadoopConf.setBoolean(disableCacheName, true);
+			for (String key : fsConfig.keySet()) {
+				hadoopConf.set(key, fsConfig.getString(key, null));
+			}
+		}
+
+		fs = new Path(basePath).getFileSystem(hadoopConf);
+	}
+
 	@Override
 	public void close() throws Exception {
 		for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
@@ -456,8 +506,6 @@ public class BucketingSink<T>
 	private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception {
 		closeCurrentPartFile(bucketState);
 
-		FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
-
 		if (!fs.exists(bucketPath)) {
 			try {
 				if (fs.mkdirs(bucketPath)) {
@@ -511,7 +559,6 @@ public class BucketingSink<T>
 			Path currentPartPath = new Path(bucketState.currentFile);
 			Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
 			Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
-			FileSystem fs = inProgressPath.getFileSystem(hadoopConf);
 			fs.rename(inProgressPath, pendingPath);
 			LOG.debug("Moving in-progress bucket {} to pending file {}",
 				inProgressPath,
@@ -589,7 +636,6 @@ public class BucketingSink<T>
 								Path pendingPath = new Path(finalPath.getParent(),
 									pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
 
-								FileSystem fs = pendingPath.getFileSystem(hadoopConf);
 								fs.rename(pendingPath, finalPath);
 								LOG.debug(
 									"Moving pending file {} to final location having completed checkpoint {}.",
@@ -634,9 +680,8 @@ public class BucketingSink<T>
 	public void restoreState(State<T> state) {
 		this.state = state;
 
-		FileSystem fs;
 		try {
-			fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
+			initFileSystem();
 		} catch (IOException e) {
 			LOG.error("Error while creating FileSystem in checkpoint restore.", e);
 			throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b20ec2/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index e4b0460..992c8c2 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -34,11 +34,13 @@ import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
 import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -497,4 +499,92 @@ public class BucketingSinkTest {
 		Assert.assertEquals(4, numFiles);
 		Assert.assertEquals(2, numInProgress);
 	}
+
+	/**
+	 * This tests user defined hdfs configuration
+	 * @throws Exception
+	 */
+	@Test
+	public void testUserDefinedConfiguration() throws Exception {
+		final String outPath = hdfsURI + "/string-non-rolling-with-config";
+		final int numElements = 20;
+
+		Map<String, String> properties = new HashMap<>();
+		Schema keySchema = Schema.create(Schema.Type.INT);
+		Schema valueSchema = Schema.create(Schema.Type.STRING);
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
+		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true));
+		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
+
+		Configuration conf = new Configuration();
+		conf.set("io.file.buffer.size", "40960");
+
+		BucketingSink<Tuple2<Integer,String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath)
+			.setFSConfig(conf)
+			.setWriter(new StreamWriterWithConfigCheck<Integer, String>(properties, "io.file.buffer.size", "40960"))
+			.setBucketer(new BasePathBucketer<Tuple2<Integer,String>>())
+			.setPartPrefix("part")
+			.setPendingPrefix("")
+			.setPendingSuffix("");
+
+		OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness =
+			createTestSink(sink);
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.setup();
+		testHarness.open();
+
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>(Tuple2.of(
+				i, "message #" + Integer.toString(i)
+			)));
+		}
+
+		testHarness.close();
+
+		GenericData.setStringType(valueSchema, GenericData.StringType.String);
+		Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema);
+		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader);
+		for (int i = 0; i < numElements; i++) {
+			AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry =
+				new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
+			int key = wrappedEntry.getKey();
+			Assert.assertEquals(i, key);
+			String value = wrappedEntry.getValue();
+			Assert.assertEquals("message #" + i, value);
+		}
+
+		dataFileStream.close();
+		inStream.close();
+	}
+
+	private static class StreamWriterWithConfigCheck<K, V> extends AvroKeyValueSinkWriter<K, V> {
+		private Map<String, String> properties;
+		private String key;
+		private String expect;
+		public StreamWriterWithConfigCheck(Map<String, String> properties, String key, String expect) {
+			super(properties);
+			this.properties = properties;
+			this.key = key;
+			this.expect = expect;
+		}
+
+		@Override
+		public void open(FileSystem fs, Path path) throws IOException {
+			super.open(fs, path);
+			Assert.assertEquals(expect, fs.getConf().get(key));
+		}
+
+		@Override
+		public Writer<Tuple2<K, V>> duplicate() {
+			return new StreamWriterWithConfigCheck<>(properties, key, expect);
+		}
+	}
+
 }


[2/2] flink git commit: [FLINK-4378] Allow Setting Custom Configuration in RollingSink

Posted by al...@apache.org.
[FLINK-4378] Allow Setting Custom Configuration in RollingSink


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8b162e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8b162e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8b162e8

Branch: refs/heads/master
Commit: f8b162e8f36373e16916798e3cc899c190561967
Parents: fd08ad2
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Thu Aug 11 21:26:01 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 28 10:56:24 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    | 67 ++++++++++++---
 .../connectors/fs/RollingSinkITCase.java        | 88 +++++++++++++++++++-
 2 files changed, 139 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8b162e8/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 738857f..b959bf8 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -284,9 +284,16 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	 * current part file path, the valid length of the in-progress files and pending part files.
 	 */
 	private transient BucketState bucketState;
-	
-	private transient org.apache.hadoop.conf.Configuration hadoopConf;
-	
+
+	/**
+	 * User-defined FileSystem parameters.
+     */
+	private Configuration fsConfig = null;
+
+	/**
+	 * The FileSystem reference.
+	 */
+	private transient FileSystem fs;
 	/**
 	 * Creates a new {@code RollingSink} that writes files to the given base directory.
 	 *
@@ -303,6 +310,28 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 		this.writerTemplate = new StringWriter<>();
 	}
 
+	/**
+	 * Specify a custom {@code Configuration} that will be used when creating
+	 * the {@link FileSystem} for writing.
+	 */
+	public RollingSink<T> setFSConfig(Configuration config) {
+		this.fsConfig = new Configuration();
+		fsConfig.addAll(config);
+		return this;
+	}
+
+	/**
+	 * Specify a custom {@code Configuration} that will be used when creating
+	 * the {@link FileSystem} for writing.
+	 */
+	public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
+		this.fsConfig = new Configuration();
+		for(Map.Entry<String, String> entry : config) {
+			fsConfig.setString(entry.getKey(), entry.getValue());
+		};
+		return this;
+	}
+
 	@Override
 	@SuppressWarnings("unchecked")
 	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
@@ -324,8 +353,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 			bucketState = new BucketState();
 		}
 
-		hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-		FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
+		initFileSystem();
 		refTruncate = reflectTruncate(fs);
 
 		// delete pending/in-progress files that might be left if we fail while
@@ -358,6 +386,27 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 		}
 	}
 
+	/**
+	 * create a file system with the user defined hdfs config
+	 * @throws IOException
+	 */
+	private void initFileSystem() throws IOException {
+		if(fs != null) {
+			return;
+		}
+		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+		if(fsConfig != null) {
+			String disableCacheName
+				= String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()});
+			hadoopConf.setBoolean(disableCacheName, true);
+			for (String key : fsConfig.keySet()) {
+				hadoopConf.set(key, fsConfig.getString(key, null));
+			}
+		}
+
+		fs = new Path(basePath).getFileSystem(hadoopConf);
+	}
+
 	@Override
 	public void close() throws Exception {
 //		boolean interrupted = Thread.interrupted();
@@ -420,8 +469,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	private void openNewPartFile() throws Exception {
 		closeCurrentPartFile();
 
-		FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
-
 		Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
 
 		if (!newBucketDirectory.equals(currentBucketDirectory)) {
@@ -451,7 +498,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 		LOG.debug("Next part path is {}", currentPartPath.toString());
 
 		Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
-
 		writer.open(fs, inProgressPath);
 		isWriterOpen = true;
 	}
@@ -472,7 +518,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 		if (currentPartPath != null) {
 			Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
 			Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
-			FileSystem fs = inProgressPath.getFileSystem(hadoopConf);
 			fs.rename(inProgressPath, pendingPath);
 			LOG.debug("Moving in-progress bucket {} to pending file {}",
 					inProgressPath,
@@ -547,7 +592,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 						Path pendingPath = new Path(finalPath.getParent(),
 								pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
 
-						FileSystem fs = pendingPath.getFileSystem(hadoopConf);
 						fs.rename(pendingPath, finalPath);
 						LOG.debug(
 								"Moving pending file {} to final location after complete checkpoint {}.",
@@ -583,9 +627,8 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 		// we can clean all the pending files since they where renamed to final files
 		// after this checkpoint was successfull
 		bucketState.pendingFiles.clear();
-		FileSystem fs = null;
 		try {
-			fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
+			initFileSystem();
 		} catch (IOException e) {
 			LOG.error("Error while creating FileSystem in checkpoint restore.", e);
 			throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/f8b162e8/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index c3c8df5..c8440ef 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -21,12 +21,10 @@ package org.apache.flink.streaming.connectors.fs;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericData.StringType;
-import org.apache.avro.io.DatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -42,11 +40,11 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -457,7 +455,68 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		dataFileStream.close();
 		inStream.close();
 	}
-	
+
+
+	/**
+	 * This tests user defined hdfs configuration
+	 * @throws Exception
+     */
+	@Test
+	public void testUserDefinedConfiguration() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/string-non-rolling-with-config";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+			.broadcast()
+			.filter(new OddEvenFilter());
+
+		Configuration conf = new Configuration();
+		conf.set("io.file.buffer.size", "40960");
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+			.setFSConfig(conf)
+			.setWriter(new StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960"))
+			.setBucketer(new NonRollingBucketer())
+			.setPartPrefix("part")
+			.setPendingPrefix("")
+			.setPendingSuffix("");
+
+		source
+			.map(new MapFunction<Tuple2<Integer,String>, String>() {
+				private static final long serialVersionUID = 1L;
+				@Override
+				public String map(Tuple2<Integer, String> value) throws Exception {
+					return value.f1;
+				}
+			})
+			.addSink(sink);
+
+		env.execute("RollingSink with configuration Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+	}
 
 	// we use this to synchronize the clock changes to elements being processed
 	final static MultiShotLatch latch1 = new MultiShotLatch();
@@ -639,6 +698,27 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
+
+	private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
+		private String key;
+		private String expect;
+		public StreamWriterWithConfigCheck(String key, String expect) {
+			this.key = key;
+			this.expect = expect;
+		}
+
+		@Override
+		public void open(FileSystem fs, Path path) throws IOException {
+			super.open(fs, path);
+			Assert.assertEquals(expect, fs.getConf().get(key));
+		}
+
+		@Override
+		public Writer<T> duplicate() {
+			return new StreamWriterWithConfigCheck<>(key, expect);
+		}
+	}
+
 	public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;