You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:51 UTC

[flink] 06/08: Revert "[FLINK-30166][Tests] Remove StreamingFileSink as option for tests"

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37258fbe6305d10a62a977e398018f79cb51d080
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Tests] Remove StreamingFileSink as option for tests"
    
    This reverts commit 440a275cad1092be7245f0438b7e5b5b1691f708.
---
 .../flink/connector/file/sink/FileSinkProgram.java  | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java b/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
index 85064adb6ce..8ec6d013dfa 100644
--- a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
+++ b/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -42,7 +43,7 @@ import java.io.PrintStream;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Test program for the {@link FileSink}.
+ * Test program for the {@link StreamingFileSink} and {@link FileSink}.
  *
  * <p>Uses a source that steadily emits a deterministic set of records over 60 seconds, after which
  * it idles and waits for job cancellation. Every record has a unique index that is written to the
@@ -71,7 +72,21 @@ public enum FileSinkProgram {
         // generate data, shuffle, sink
         DataStream<Tuple2<Integer, Integer>> source = env.addSource(new Generator(10, 10, 60));
 
-        if (sinkToTest.equalsIgnoreCase("FileSink")) {
+        if (sinkToTest.equalsIgnoreCase("StreamingFileSink")) {
+            final StreamingFileSink<Tuple2<Integer, Integer>> sink =
+                    StreamingFileSink.forRowFormat(
+                                    new Path(outputPath),
+                                    (Encoder<Tuple2<Integer, Integer>>)
+                                            (element, stream) -> {
+                                                PrintStream out = new PrintStream(stream);
+                                                out.println(element.f1);
+                                            })
+                            .withBucketAssigner(new KeyBucketAssigner())
+                            .withRollingPolicy(OnCheckpointRollingPolicy.build())
+                            .build();
+
+            source.keyBy(0).addSink(sink);
+        } else if (sinkToTest.equalsIgnoreCase("FileSink")) {
             FileSink<Tuple2<Integer, Integer>> sink =
                     FileSink.forRowFormat(
                                     new Path(outputPath),
@@ -88,7 +103,7 @@ public enum FileSinkProgram {
             throw new UnsupportedOperationException("Unsupported sink type: " + sinkToTest);
         }
 
-        env.execute("FileSinkProgram");
+        env.execute("StreamingFileSinkProgram");
     }
 
     /** Use first field for buckets. */