You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/16 21:40:18 UTC
[flink] 07/11: [FLINK-24635][examples] Fix deprecations in iterations example
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e3273d198ed0f69ffda44dd15daf0290a9a81c9
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:52:59 2021 -0600
[FLINK-24635][examples] Fix deprecations in iterations example
---
.../streaming/examples/iteration/IterateExample.java | 16 +++++++++++++++-
.../scala/examples/iteration/IterateExample.scala | 19 +++++++++++++++++--
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index d8fed37..fa261cc 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -18,18 +18,24 @@
package org.apache.flink.streaming.examples.iteration;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
+import java.time.Duration;
import java.util.Random;
/**
@@ -103,7 +109,15 @@ public class IterateExample {
// emit results
if (params.has("output")) {
- numbers.writeAsText(params.get("output"));
+ numbers.sinkTo(
+ FileSink.<Tuple2<Tuple2<Integer, Integer>, Integer>>forRowFormat(
+ new Path(params.get("output")), new SimpleStringEncoder<>())
+ .withRollingPolicy(
+ DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build());
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
numbers.print();
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
index 1fa3ace..4812ec4 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
@@ -18,14 +18,21 @@
package org.apache.flink.streaming.scala.examples.iteration
-import java.util.Random
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import java.util.Random
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import java.time.Duration
+
/**
* Example illustrating iterations in Flink streaming.
*
@@ -95,7 +102,15 @@ object IterateExample {
)
if (params.has("output")) {
- numbers.writeAsText(params.get("output"))
+ numbers.sinkTo(FileSink.forRowFormat[((Int, Int), Int)](
+ new Path(params.get("output")),
+ new SimpleStringEncoder())
+ .withRollingPolicy(DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink")
} else {
println("Printing result to stdout. Use --output to specify output path.")
numbers.print()