You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/16 21:40:13 UTC
[flink] 02/11: [FLINK-24635][examples] Fix deprecations in Twitter example
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit dbcce671350f0e618e01ef4038b989cfb6932b51
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:46:20 2021 -0600
[FLINK-24635][examples] Fix deprecations in Twitter example
---
.../streaming/examples/twitter/TwitterExample.java | 18 +++++++++++++++++-
.../scala/examples/twitter/TwitterExample.scala | 19 +++++++++++++++----
.../flink/streaming/test/StreamingExamplesITCase.java | 2 --
.../scala/examples/StreamingExamplesITCase.scala | 4 ----
4 files changed, 32 insertions(+), 11 deletions(-)
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
index b940a4d..15f672f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
@@ -18,10 +18,15 @@
package org.apache.flink.streaming.examples.twitter;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
import org.apache.flink.util.Collector;
@@ -29,6 +34,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.Duration;
import java.util.StringTokenizer;
/**
@@ -100,7 +106,17 @@ public class TwitterExample {
// emit result
if (params.has("output")) {
- tweets.writeAsText(params.get("output"));
+ tweets.sinkTo(
+ FileSink.<Tuple2<String, Integer>>forRowFormat(
+ new Path(params.get("output")),
+ new SimpleStringEncoder<>())
+ .withRollingPolicy(
+ DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("output");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
tweets.print();
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
index de10d93..8c43c6b 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
@@ -19,16 +19,21 @@
package org.apache.flink.streaming.scala.examples.twitter
import java.util.StringTokenizer
-
import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.twitter.TwitterSource
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
import org.apache.flink.util.Collector
+import java.time.Duration
import scala.collection.mutable.ListBuffer
/**
@@ -74,8 +79,6 @@ object TwitterExample {
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
- env.setParallelism(params.getInt("parallelism", 1))
-
// get input data
val streamSource: DataStream[String] =
if (params.has(TwitterSource.CONSUMER_KEY) &&
@@ -102,7 +105,15 @@ object TwitterExample {
// emit result
if (params.has("output")) {
- tweets.writeAsText(params.get("output"))
+ tweets.sinkTo(FileSink.forRowFormat[(String, Int)](
+ new Path(params.get("output")),
+ new SimpleStringEncoder())
+ .withRollingPolicy(DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink")
} else {
println("Printing result to stdout. Use --output to specify output path.")
tweets.print()
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 80776b9..9f3eb13 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
-import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
import org.apache.flink.streaming.test.examples.join.WindowJoinData;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.AbstractTestBase;
@@ -109,7 +108,6 @@ public class StreamingExamplesITCase extends AbstractTestBase {
final String resultPath = getTempDirPath("result");
org.apache.flink.streaming.examples.twitter.TwitterExample.main(
new String[] {"--output", resultPath});
- compareResultsByLinesInMemory(TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, resultPath);
}
@Test
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index d55405f..5f80fba 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -25,7 +25,6 @@ import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
-import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
import org.apache.flink.streaming.scala.examples.iteration.IterateExample
import org.apache.flink.streaming.scala.examples.join.WindowJoin
import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
@@ -96,9 +95,6 @@ class StreamingExamplesITCase extends AbstractTestBase {
def testTwitterExample(): Unit = {
val resultPath = getTempDirPath("result")
TwitterExample.main(Array("--output", resultPath))
- TestBaseUtils.compareResultsByLinesInMemory(
- TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
- resultPath)
}
@Test