You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/16 21:40:18 UTC

[flink] 07/11: [FLINK-24635][examples] Fix deprecations in iterations example

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e3273d198ed0f69ffda44dd15daf0290a9a81c9
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:52:59 2021 -0600

    [FLINK-24635][examples] Fix deprecations in iterations example
---
 .../streaming/examples/iteration/IterateExample.java  | 16 +++++++++++++++-
 .../scala/examples/iteration/IterateExample.scala     | 19 +++++++++++++++++--
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index d8fed37..fa261cc 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -18,18 +18,24 @@
 package org.apache.flink.streaming.examples.iteration;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
+import java.time.Duration;
 import java.util.Random;
 
 /**
@@ -103,7 +109,15 @@ public class IterateExample {
 
         // emit results
         if (params.has("output")) {
-            numbers.writeAsText(params.get("output"));
+            numbers.sinkTo(
+                    FileSink.<Tuple2<Tuple2<Integer, Integer>, Integer>>forRowFormat(
+                                    new Path(params.get("output")), new SimpleStringEncoder<>())
+                            .withRollingPolicy(
+                                    DefaultRollingPolicy.builder()
+                                            .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                            .withRolloverInterval(Duration.ofSeconds(10))
+                                            .build())
+                            .build());
         } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             numbers.print();
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
index 1fa3ace..4812ec4 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
@@ -18,14 +18,21 @@
 
 package org.apache.flink.streaming.scala.examples.iteration
 
-import java.util.Random
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 
+import java.util.Random
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 
+import java.time.Duration
+
 /**
  * Example illustrating iterations in Flink streaming.
  *
@@ -95,7 +102,15 @@ object IterateExample {
       )
 
     if (params.has("output")) {
-      numbers.writeAsText(params.get("output"))
+      numbers.sinkTo(FileSink.forRowFormat[((Int, Int), Int)](
+          new Path(params.get("output")),
+          new SimpleStringEncoder())
+        .withRollingPolicy(DefaultRollingPolicy.builder()
+          .withMaxPartSize(MemorySize.ofMebiBytes(1))
+          .withRolloverInterval(Duration.ofSeconds(10))
+          .build())
+        .build())
+        .name("file-sink")
     } else {
       println("Printing result to stdout. Use --output to specify output path.")
       numbers.print()