You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/28 08:59:00 UTC
[1/2] flink git commit: [FLINK-4378] Allow Setting Custom
Configuration in BucketingSink
Repository: flink
Updated Branches:
refs/heads/master fd08ad2e7 -> e9b20ec21
[FLINK-4378] Allow Setting Custom Configuration in BucketingSink
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9b20ec2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9b20ec2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9b20ec2
Branch: refs/heads/master
Commit: e9b20ec21ddd5f7d1440bccfb8622dcb47443fef
Parents: f8b162e
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Thu Oct 27 11:44:41 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 28 10:56:24 2016 +0200
----------------------------------------------------------------------
.../connectors/fs/bucketing/BucketingSink.java | 63 ++++++++++++--
.../fs/bucketing/BucketingSinkTest.java | 90 ++++++++++++++++++++
2 files changed, 144 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e9b20ec2/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 52de00d..0791fb5 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -281,7 +281,15 @@ public class BucketingSink<T>
*/
private transient State<T> state;
- private transient org.apache.hadoop.conf.Configuration hadoopConf;
+ /**
+ * User-defined FileSystem parameters
+ */
+ private Configuration fsConfig = null;
+
+ /**
+ * The FileSystem reference.
+ */
+ private transient FileSystem fs;
private transient Clock clock;
@@ -302,6 +310,28 @@ public class BucketingSink<T>
this.writerTemplate = new StringWriter<>();
}
+ /**
+ * Specify a custom {@code Configuration} that will be used when creating
+ * the {@link FileSystem} for writing.
+ */
+ public BucketingSink<T> setFSConfig(Configuration config) {
+ this.fsConfig = new Configuration();
+ fsConfig.addAll(config);
+ return this;
+ }
+
+ /**
+ * Specify a custom {@code Configuration} that will be used when creating
+ * the {@link FileSystem} for writing.
+ */
+ public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
+ this.fsConfig = new Configuration();
+ for(Map.Entry<String, String> entry : config) {
+ fsConfig.setString(entry.getKey(), entry.getValue());
+ };
+ return this;
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
@@ -319,8 +349,7 @@ public class BucketingSink<T>
state = new State<T>();
Path baseDirectory = new Path(basePath);
- hadoopConf = HadoopFileSystem.getHadoopConfiguration();
- FileSystem fs = baseDirectory.getFileSystem(hadoopConf);
+ initFileSystem();
refTruncate = reflectTruncate(fs);
processingTimeService =
@@ -369,6 +398,27 @@ public class BucketingSink<T>
}
}
+ /**
+ * create a file system with the user defined hdfs config
+ * @throws IOException
+ */
+ private void initFileSystem() throws IOException {
+ if(fs != null) {
+ return;
+ }
+ org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+ if(fsConfig != null) {
+ String disableCacheName
+ = String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()});
+ hadoopConf.setBoolean(disableCacheName, true);
+ for (String key : fsConfig.keySet()) {
+ hadoopConf.set(key, fsConfig.getString(key, null));
+ }
+ }
+
+ fs = new Path(basePath).getFileSystem(hadoopConf);
+ }
+
@Override
public void close() throws Exception {
for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
@@ -456,8 +506,6 @@ public class BucketingSink<T>
private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception {
closeCurrentPartFile(bucketState);
- FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
-
if (!fs.exists(bucketPath)) {
try {
if (fs.mkdirs(bucketPath)) {
@@ -511,7 +559,6 @@ public class BucketingSink<T>
Path currentPartPath = new Path(bucketState.currentFile);
Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
- FileSystem fs = inProgressPath.getFileSystem(hadoopConf);
fs.rename(inProgressPath, pendingPath);
LOG.debug("Moving in-progress bucket {} to pending file {}",
inProgressPath,
@@ -589,7 +636,6 @@ public class BucketingSink<T>
Path pendingPath = new Path(finalPath.getParent(),
pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
- FileSystem fs = pendingPath.getFileSystem(hadoopConf);
fs.rename(pendingPath, finalPath);
LOG.debug(
"Moving pending file {} to final location having completed checkpoint {}.",
@@ -634,9 +680,8 @@ public class BucketingSink<T>
public void restoreState(State<T> state) {
this.state = state;
- FileSystem fs;
try {
- fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
+ initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem in checkpoint restore.", e);
throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/e9b20ec2/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index e4b0460..992c8c2 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -34,11 +34,13 @@ import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@@ -497,4 +499,92 @@ public class BucketingSinkTest {
Assert.assertEquals(4, numFiles);
Assert.assertEquals(2, numInProgress);
}
+
+ /**
+ * This tests user defined hdfs configuration
+ * @throws Exception
+ */
+ @Test
+ public void testUserDefinedConfiguration() throws Exception {
+ final String outPath = hdfsURI + "/string-non-rolling-with-config";
+ final int numElements = 20;
+
+ Map<String, String> properties = new HashMap<>();
+ Schema keySchema = Schema.create(Schema.Type.INT);
+ Schema valueSchema = Schema.create(Schema.Type.STRING);
+ properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
+ properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
+ properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true));
+ properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
+
+ Configuration conf = new Configuration();
+ conf.set("io.file.buffer.size", "40960");
+
+ BucketingSink<Tuple2<Integer,String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath)
+ .setFSConfig(conf)
+ .setWriter(new StreamWriterWithConfigCheck<Integer, String>(properties, "io.file.buffer.size", "40960"))
+ .setBucketer(new BasePathBucketer<Tuple2<Integer,String>>())
+ .setPartPrefix("part")
+ .setPendingPrefix("")
+ .setPendingSuffix("");
+
+ OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness =
+ createTestSink(sink);
+
+ testHarness.setProcessingTime(0L);
+
+ testHarness.setup();
+ testHarness.open();
+
+ for (int i = 0; i < numElements; i++) {
+ testHarness.processElement(new StreamRecord<>(Tuple2.of(
+ i, "message #" + Integer.toString(i)
+ )));
+ }
+
+ testHarness.close();
+
+ GenericData.setStringType(valueSchema, GenericData.StringType.String);
+ Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
+
+ FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+ SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema);
+ DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader);
+ for (int i = 0; i < numElements; i++) {
+ AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry =
+ new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
+ int key = wrappedEntry.getKey();
+ Assert.assertEquals(i, key);
+ String value = wrappedEntry.getValue();
+ Assert.assertEquals("message #" + i, value);
+ }
+
+ dataFileStream.close();
+ inStream.close();
+ }
+
+ private static class StreamWriterWithConfigCheck<K, V> extends AvroKeyValueSinkWriter<K, V> {
+ private Map<String, String> properties;
+ private String key;
+ private String expect;
+ public StreamWriterWithConfigCheck(Map<String, String> properties, String key, String expect) {
+ super(properties);
+ this.properties = properties;
+ this.key = key;
+ this.expect = expect;
+ }
+
+ @Override
+ public void open(FileSystem fs, Path path) throws IOException {
+ super.open(fs, path);
+ Assert.assertEquals(expect, fs.getConf().get(key));
+ }
+
+ @Override
+ public Writer<Tuple2<K, V>> duplicate() {
+ return new StreamWriterWithConfigCheck<>(properties, key, expect);
+ }
+ }
+
}
[2/2] flink git commit: [FLINK-4378] Allow Setting Custom
Configuration in RollingSink
Posted by al...@apache.org.
[FLINK-4378] Allow Setting Custom Configuration in RollingSink
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8b162e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8b162e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8b162e8
Branch: refs/heads/master
Commit: f8b162e8f36373e16916798e3cc899c190561967
Parents: fd08ad2
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Thu Aug 11 21:26:01 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 28 10:56:24 2016 +0200
----------------------------------------------------------------------
.../streaming/connectors/fs/RollingSink.java | 67 ++++++++++++---
.../connectors/fs/RollingSinkITCase.java | 88 +++++++++++++++++++-
2 files changed, 139 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f8b162e8/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 738857f..b959bf8 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -284,9 +284,16 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
* current part file path, the valid length of the in-progress files and pending part files.
*/
private transient BucketState bucketState;
-
- private transient org.apache.hadoop.conf.Configuration hadoopConf;
-
+
+ /**
+ * User-defined FileSystem parameters.
+ */
+ private Configuration fsConfig = null;
+
+ /**
+ * The FileSystem reference.
+ */
+ private transient FileSystem fs;
/**
* Creates a new {@code RollingSink} that writes files to the given base directory.
*
@@ -303,6 +310,28 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
this.writerTemplate = new StringWriter<>();
}
+ /**
+ * Specify a custom {@code Configuration} that will be used when creating
+ * the {@link FileSystem} for writing.
+ */
+ public RollingSink<T> setFSConfig(Configuration config) {
+ this.fsConfig = new Configuration();
+ fsConfig.addAll(config);
+ return this;
+ }
+
+ /**
+ * Specify a custom {@code Configuration} that will be used when creating
+ * the {@link FileSystem} for writing.
+ */
+ public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
+ this.fsConfig = new Configuration();
+ for(Map.Entry<String, String> entry : config) {
+ fsConfig.setString(entry.getKey(), entry.getValue());
+ };
+ return this;
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
@@ -324,8 +353,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
bucketState = new BucketState();
}
- hadoopConf = HadoopFileSystem.getHadoopConfiguration();
- FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
+ initFileSystem();
refTruncate = reflectTruncate(fs);
// delete pending/in-progress files that might be left if we fail while
@@ -358,6 +386,27 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
}
}
+ /**
+ * create a file system with the user defined hdfs config
+ * @throws IOException
+ */
+ private void initFileSystem() throws IOException {
+ if(fs != null) {
+ return;
+ }
+ org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+ if(fsConfig != null) {
+ String disableCacheName
+ = String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()});
+ hadoopConf.setBoolean(disableCacheName, true);
+ for (String key : fsConfig.keySet()) {
+ hadoopConf.set(key, fsConfig.getString(key, null));
+ }
+ }
+
+ fs = new Path(basePath).getFileSystem(hadoopConf);
+ }
+
@Override
public void close() throws Exception {
// boolean interrupted = Thread.interrupted();
@@ -420,8 +469,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
private void openNewPartFile() throws Exception {
closeCurrentPartFile();
- FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
-
Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
if (!newBucketDirectory.equals(currentBucketDirectory)) {
@@ -451,7 +498,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
LOG.debug("Next part path is {}", currentPartPath.toString());
Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
-
writer.open(fs, inProgressPath);
isWriterOpen = true;
}
@@ -472,7 +518,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
if (currentPartPath != null) {
Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
- FileSystem fs = inProgressPath.getFileSystem(hadoopConf);
fs.rename(inProgressPath, pendingPath);
LOG.debug("Moving in-progress bucket {} to pending file {}",
inProgressPath,
@@ -547,7 +592,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
Path pendingPath = new Path(finalPath.getParent(),
pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
- FileSystem fs = pendingPath.getFileSystem(hadoopConf);
fs.rename(pendingPath, finalPath);
LOG.debug(
"Moving pending file {} to final location after complete checkpoint {}.",
@@ -583,9 +627,8 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
// we can clean all the pending files since they where renamed to final files
// after this checkpoint was successfull
bucketState.pendingFiles.clear();
- FileSystem fs = null;
try {
- fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
+ initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem in checkpoint restore.", e);
throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/f8b162e8/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index c3c8df5..c8440ef 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -21,12 +21,10 @@ package org.apache.flink.streaming.connectors.fs;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData.StringType;
-import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -42,11 +40,11 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -457,7 +455,68 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
dataFileStream.close();
inStream.close();
}
-
+
+
+ /**
+ * This tests user defined hdfs configuration
+ * @throws Exception
+ */
+ @Test
+ public void testUserDefinedConfiguration() throws Exception {
+ final int NUM_ELEMENTS = 20;
+ final int PARALLELISM = 2;
+ final String outPath = hdfsURI + "/string-non-rolling-with-config";
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ .broadcast()
+ .filter(new OddEvenFilter());
+
+ Configuration conf = new Configuration();
+ conf.set("io.file.buffer.size", "40960");
+ RollingSink<String> sink = new RollingSink<String>(outPath)
+ .setFSConfig(conf)
+ .setWriter(new StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960"))
+ .setBucketer(new NonRollingBucketer())
+ .setPartPrefix("part")
+ .setPendingPrefix("")
+ .setPendingSuffix("");
+
+ source
+ .map(new MapFunction<Tuple2<Integer,String>, String>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public String map(Tuple2<Integer, String> value) throws Exception {
+ return value.f1;
+ }
+ })
+ .addSink(sink);
+
+ env.execute("RollingSink with configuration Test");
+
+ FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+ for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ String line = br.readLine();
+ Assert.assertEquals("message #" + i, line);
+ }
+
+ inStream.close();
+
+ inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+ br = new BufferedReader(new InputStreamReader(inStream));
+
+ for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ String line = br.readLine();
+ Assert.assertEquals("message #" + i, line);
+ }
+
+ inStream.close();
+ }
// we use this to synchronize the clock changes to elements being processed
final static MultiShotLatch latch1 = new MultiShotLatch();
@@ -639,6 +698,27 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
}
+
+ private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
+ private String key;
+ private String expect;
+ public StreamWriterWithConfigCheck(String key, String expect) {
+ this.key = key;
+ this.expect = expect;
+ }
+
+ @Override
+ public void open(FileSystem fs, Path path) throws IOException {
+ super.open(fs, path);
+ Assert.assertEquals(expect, fs.getConf().get(key));
+ }
+
+ @Override
+ public Writer<T> duplicate() {
+ return new StreamWriterWithConfigCheck<>(key, expect);
+ }
+ }
+
public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;