You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:51 UTC
[flink] 06/08: Revert "[FLINK-30166][Tests] Remove StreamingFileSink as option for tests"
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37258fbe6305d10a62a977e398018f79cb51d080
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100
Revert "[FLINK-30166][Tests] Remove StreamingFileSink as option for tests"
This reverts commit 440a275cad1092be7245f0438b7e5b5b1691f708.
---
.../flink/connector/file/sink/FileSinkProgram.java | 21 ++++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
diff --git a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java b/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
index 85064adb6ce..8ec6d013dfa 100644
--- a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
+++ b/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -42,7 +43,7 @@ import java.io.PrintStream;
import java.util.concurrent.TimeUnit;
/**
- * Test program for the {@link FileSink}.
+ * Test program for the {@link StreamingFileSink} and {@link FileSink}.
*
* <p>Uses a source that steadily emits a deterministic set of records over 60 seconds, after which
* it idles and waits for job cancellation. Every record has a unique index that is written to the
@@ -71,7 +72,21 @@ public enum FileSinkProgram {
// generate data, shuffle, sink
DataStream<Tuple2<Integer, Integer>> source = env.addSource(new Generator(10, 10, 60));
- if (sinkToTest.equalsIgnoreCase("FileSink")) {
+ if (sinkToTest.equalsIgnoreCase("StreamingFileSink")) {
+ final StreamingFileSink<Tuple2<Integer, Integer>> sink =
+ StreamingFileSink.forRowFormat(
+ new Path(outputPath),
+ (Encoder<Tuple2<Integer, Integer>>)
+ (element, stream) -> {
+ PrintStream out = new PrintStream(stream);
+ out.println(element.f1);
+ })
+ .withBucketAssigner(new KeyBucketAssigner())
+ .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .build();
+
+ source.keyBy(0).addSink(sink);
+ } else if (sinkToTest.equalsIgnoreCase("FileSink")) {
FileSink<Tuple2<Integer, Integer>> sink =
FileSink.forRowFormat(
new Path(outputPath),
@@ -88,7 +103,7 @@ public enum FileSinkProgram {
throw new UnsupportedOperationException("Unsupported sink type: " + sinkToTest);
}
- env.execute("FileSinkProgram");
+ env.execute("StreamingFileSinkProgram");
}
/** Use first field for buckets. */