You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/16 21:40:12 UTC
[flink] 01/11: [FLINK-24831][examples] Update DataStream Window examples
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 443b7121373a7f7c4b7a25660a50b06cbee8cb01
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:42:43 2021 -0600
[FLINK-24831][examples] Update DataStream Window examples
---
flink-examples/flink-examples-streaming/pom.xml | 2 +
.../GroupedProcessingTimeWindowExample.java | 13 +-
.../examples/windowing/SessionWindowing.java | 20 ++-
.../examples/windowing/TopSpeedWindowing.java | 151 ++++++++++-----------
.../examples/windowing/WindowWordCount.java | 111 +++++++++++----
.../examples/windowing/util/CarSource.java | 77 +++++++++++
.../GroupedProcessingTimeWindowExample.scala | 7 +-
.../examples/windowing/SessionWindowing.scala | 20 ++-
.../examples/windowing/TopSpeedWindowing.scala | 133 ++++++++++--------
.../scala/examples/windowing/WindowWordCount.scala | 126 ++++++++++++-----
.../scala/examples/windowing/util/CarSource.scala | 62 +++++++++
.../streaming/test/StreamingExamplesITCase.java | 6 +-
.../windowing/TopSpeedWindowingExampleITCase.java | 9 +-
.../windowing/TopSpeedWindowingExampleITCase.java | 3 +-
.../scala/examples/StreamingExamplesITCase.scala | 9 +-
15 files changed, 518 insertions(+), 231 deletions(-)
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index b4ab863..705f063 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -327,6 +327,8 @@ under the License.
<includes>
<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include>
<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include>
+ <include>org/apache/flink/streaming/examples/windowing/util/CarSource.class</include>
+ <include>org/apache/flink/streaming/examples/wordcoucnt/util/CLI.class</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
</includes>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index c39166a..0662e70 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
@@ -33,15 +33,14 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
/**
- * An example of grouped stream windowing into sliding time windows. This example uses
- * [[RichParallelSourceFunction]] to generate a list of key-value pairs.
+ * An example of grouped stream windowing into sliding time windows. This example uses {@link
+ * RichParallelSourceFunction} to generate a list of key-value pairs.
*/
public class GroupedProcessingTimeWindowExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());
@@ -57,11 +56,7 @@ public class GroupedProcessingTimeWindowExample {
// Time.milliseconds(500)))
// .apply(new SummingWindowFunction())
- .addSink(
- new SinkFunction<Tuple2<Long, Long>>() {
- @Override
- public void invoke(Tuple2<Long, Long> value) {}
- });
+ .addSink(new DiscardingSink<>());
env.execute();
}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index b36c5b2..ac59d89 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,15 +17,21 @@
package org.apache.flink.streaming.examples.windowing;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +41,6 @@ import java.util.List;
*/
public class SessionWindowing {
- @SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
@@ -85,7 +90,18 @@ public class SessionWindowing {
.sum(2);
if (fileOutput) {
- aggregated.writeAsText(params.get("output"));
+ aggregated
+ .sinkTo(
+ FileSink.<Tuple3<String, Long, Integer>>forRowFormat(
+ new Path(params.get("output")),
+ new SimpleStringEncoder<>())
+ .withRollingPolicy(
+ DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("output");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
aggregated.print();
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 02180d3..7808e4b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -17,21 +17,26 @@
package org.apache.flink.streaming.examples.windowing;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.examples.windowing.util.CarSource;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
-import java.util.Arrays;
-import java.util.Random;
+import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
@@ -47,26 +52,64 @@ public class TopSpeedWindowing {
// *************************************************************************
public static void main(String[] args) throws Exception {
+ final CLI params = CLI.fromArgs(args);
- final ParameterTool params = ParameterTool.fromArgs(args);
-
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+ // application executed over bounded input will produce the same final results regardless
+ // of the configured execution mode. It is important to note what final means here: a job
+ // executing in STREAMING mode might produce incremental updates (think upserts in
+ // a database) while a BATCH job would only produce one final result at the end. The final
+ // result will be the same if interpreted correctly, but getting there can be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs that require
+ // continuous incremental processing and are expected to stay online indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+ // can only do when we know that our input is bounded. For example, different
+ // join/aggregation strategies can be used, in addition to a different shuffle
+ // implementation that allows more efficient task scheduling and failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.getExecutionMode());
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig().setGlobalJobParameters(params);
- @SuppressWarnings({"rawtypes", "serial"})
DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
- if (params.has("input")) {
- carData = env.readTextFile(params.get("input")).map(new ParseCarData());
+ if (params.getInputs().isPresent()) {
+ // Create a new file source that will read files from a given set of directories.
+ // Each file will be processed as plain text and split based on newlines.
+ FileSource.FileSourceBuilder<String> builder =
+ FileSource.forRecordStreamFormat(
+ new TextLineFormat(), params.getInputs().get());
+
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+ params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+ carData =
+ env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+ .map(new ParseCarData())
+ .name("parse-input");
} else {
- System.out.println("Executing TopSpeedWindowing example with default input data set.");
- System.out.println("Use --input to specify file input.");
- carData = env.addSource(CarSource.create(2));
+ carData = env.addSource(CarSource.create(2)).name("in-memory-source");
}
int evictionSec = 10;
double triggerMeters = 50;
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds =
- carData.assignTimestampsAndWatermarks(new CarTimestamp())
+ carData.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .<Tuple4<Integer, Integer, Double, Long>>
+ forMonotonousTimestamps()
+ .withTimestampAssigner((car, ts) -> car.f3))
.keyBy(value -> value.f0)
.window(GlobalWindows.create())
.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
@@ -89,10 +132,22 @@ public class TopSpeedWindowing {
carData.getType().createSerializer(env.getConfig())))
.maxBy(1);
- if (params.has("output")) {
- topSpeeds.writeAsText(params.get("output"));
+ if (params.getOutput().isPresent()) {
+ // Given an output directory, Flink will write the results to a file
+ // using a simple string encoding. In a production environment, this might
+ // be something more structured like CSV, Avro, JSON, or Parquet.
+ topSpeeds
+ .sinkTo(
+ FileSink.<Tuple4<Integer, Integer, Double, Long>>forRowFormat(
+ params.getOutput().get(), new SimpleStringEncoder<>())
+ .withRollingPolicy(
+ DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink");
} else {
- System.out.println("Printing result to stdout. Use --output to specify output path.");
topSpeeds.print();
}
@@ -103,58 +158,6 @@ public class TopSpeedWindowing {
// USER FUNCTIONS
// *************************************************************************
- private static class CarSource
- implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
-
- private static final long serialVersionUID = 1L;
- private Integer[] speeds;
- private Double[] distances;
-
- private Random rand = new Random();
-
- private volatile boolean isRunning = true;
-
- private CarSource(int numOfCars) {
- speeds = new Integer[numOfCars];
- distances = new Double[numOfCars];
- Arrays.fill(speeds, 50);
- Arrays.fill(distances, 0d);
- }
-
- public static CarSource create(int cars) {
- return new CarSource(cars);
- }
-
- @Override
- public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
- throws Exception {
-
- while (isRunning) {
- Thread.sleep(100);
- for (int carId = 0; carId < speeds.length; carId++) {
- if (rand.nextBoolean()) {
- speeds[carId] = Math.min(100, speeds[carId] + 5);
- } else {
- speeds[carId] = Math.max(0, speeds[carId] - 5);
- }
- distances[carId] += speeds[carId] / 3.6d;
- Tuple4<Integer, Integer, Double, Long> record =
- new Tuple4<>(
- carId,
- speeds[carId],
- distances[carId],
- System.currentTimeMillis());
- ctx.collect(record);
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- }
-
private static class ParseCarData
extends RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
private static final long serialVersionUID = 1L;
@@ -170,14 +173,4 @@ public class TopSpeedWindowing {
Long.valueOf(data[3]));
}
}
-
- private static class CarTimestamp
- extends AscendingTimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long extractAscendingTimestamp(Tuple4<Integer, Integer, Double, Long> element) {
- return element.f3;
- }
- }
}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index 9cdecdb..cbb4acc 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -17,13 +17,22 @@
package org.apache.flink.streaming.examples.windowing;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.examples.wordcount.WordCount;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
+import java.time.Duration;
+
/**
* Implements a windowed version of the streaming "WordCount" program.
*
@@ -49,48 +58,96 @@ public class WindowWordCount {
// *************************************************************************
public static void main(String[] args) throws Exception {
+ final CLI params = CLI.fromArgs(args);
- final ParameterTool params = ParameterTool.fromArgs(args);
-
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // get input data
+ // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+ // application executed over bounded input will produce the same final results regardless
+ // of the configured execution mode. It is important to note what final means here: a job
+ // executing in STREAMING mode might produce incremental updates (think upserts in
+ // a database) while a BATCH job would only produce one final result at the end. The final
+ // result will be the same if interpreted correctly, but getting there can be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs that require
+ // continuous incremental processing and are expected to stay online indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+ // can only do when we know that our input is bounded. For example, different
+ // join/aggregation strategies can be used, in addition to a different shuffle
+ // implementation that allows more efficient task scheduling and failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.getExecutionMode());
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
+ env.getConfig().setGlobalJobParameters(params);
+
DataStream<String> text;
- if (params.has("input")) {
- // read the text file from given input path
- text = env.readTextFile(params.get("input"));
+ if (params.getInputs().isPresent()) {
+ // Create a new file source that will read files from a given set of directories.
+ // Each file will be processed as plain text and split based on newlines.
+ FileSource.FileSourceBuilder<String> builder =
+ FileSource.forRecordStreamFormat(
+ new TextLineFormat(), params.getInputs().get());
+
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+ params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+ text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
} else {
- System.out.println("Executing WindowWordCount example with default input data set.");
- System.out.println("Use --input to specify file input.");
- // get default test text data
- text = env.fromElements(WordCountData.WORDS);
+ text = env.fromElements(WordCountData.WORDS).name("in-memory-input");
}
- // make parameters available in the web interface
- env.getConfig().setGlobalJobParameters(params);
-
- final int windowSize = params.getInt("window", 10);
- final int slideSize = params.getInt("slide", 5);
+ int windowSize = params.getInt("window").orElse(250);
+ int slideSize = params.getInt("slide").orElse(150);
DataStream<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
+ // The text lines read from the source are split into words
+ // using a user-defined function. The tokenizer, implemented below,
+ // will output each words as a (2-tuple) containing (word, 1)
text.flatMap(new WordCount.Tokenizer())
- // create windows of windowSize records slided every slideSize records
+ .name("tokenizer")
+ // keyBy groups tuples based on the "0" field, the word.
+ // Using a keyBy allows performing aggregations and other
+ // stateful transformations over data on a per-key basis.
+ // This is similar to a GROUP BY clause in a SQL query.
.keyBy(value -> value.f0)
+ // create windows of windowSize records slided every slideSize records
.countWindow(windowSize, slideSize)
- // group by the tuple field "0" and sum up tuple field "1"
- .sum(1);
+ // For each key, we perform a simple sum of the "1" field, the count.
+ // If the input data set is bounded, sum will output a final count for
+ // each word. If it is unbounded, it will continuously output updates
+ // each time it sees a new instance of each word in the stream.
+ .sum(1)
+ .name("counter");
- // emit result
- if (params.has("output")) {
- counts.writeAsText(params.get("output"));
+ if (params.getOutput().isPresent()) {
+ // Given an output directory, Flink will write the results to a file
+ // using a simple string encoding. In a production environment, this might
+ // be something more structured like CSV, Avro, JSON, or Parquet.
+ counts.sinkTo(
+ FileSink.<Tuple2<String, Integer>>forRowFormat(
+ params.getOutput().get(), new SimpleStringEncoder<>())
+ .withRollingPolicy(
+ DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink");
} else {
- System.out.println("Printing result to stdout. Use --output to specify output path.");
- counts.print();
+ counts.print().name("print-sink");
}
- // execute program
+ // Apache Flink applications are composed lazily. Calling execute
+ // submits the Job and begins processing.
env.execute("WindowWordCount");
}
}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java
new file mode 100644
index 0000000..e1c4d23
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.Arrays;
+import java.util.Random;
+
+/** A simple in-memory source. */
+public class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+
+ private static final long serialVersionUID = 1L;
+ private Integer[] speeds;
+ private Double[] distances;
+
+ private Random rand = new Random();
+
+ private volatile boolean isRunning = true;
+
+ private CarSource(int numOfCars) {
+ speeds = new Integer[numOfCars];
+ distances = new Double[numOfCars];
+ Arrays.fill(speeds, 50);
+ Arrays.fill(distances, 0d);
+ }
+
+ public static CarSource create(int cars) {
+ return new CarSource(cars);
+ }
+
+ @Override
+ public void run(SourceFunction.SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
+ throws Exception {
+
+ while (isRunning) {
+ Thread.sleep(100);
+ for (int carId = 0; carId < speeds.length; carId++) {
+ if (rand.nextBoolean()) {
+ speeds[carId] = Math.min(100, speeds[carId] + 5);
+ } else {
+ speeds[carId] = Math.max(0, speeds[carId] - 5);
+ }
+ distances[carId] += speeds[carId] / 3.6d;
+ Tuple4<Integer, Integer, Double, Long> record =
+ new Tuple4<>(
+ carId, speeds[carId], distances[carId], System.currentTimeMillis());
+ ctx.collectWithTimestamp(record, record.f3);
+ }
+
+ ctx.emitWatermark(new Watermark(System.currentTimeMillis()));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
index c12b118..a982feb 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.scala.examples.windowing
import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.sink.{DiscardingSink, SinkFunction}
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -35,7 +35,6 @@ object GroupedProcessingTimeWindowExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(4)
val stream: DataStream[(Long, Long)] = env.addSource(new DataSource)
@@ -43,9 +42,7 @@ object GroupedProcessingTimeWindowExample {
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.milliseconds(2500), Time.milliseconds(500)))
.reduce((value1, value2) => (value1._1, value1._2 + value2._2))
- .addSink(new SinkFunction[(Long, Long)]() {
- override def invoke(in: (Long, Long)): Unit = {}
- })
+ .addSink(new DiscardingSink[(Long, Long)])
env.execute()
}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
index 7fe483c..a5d0b7e 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
@@ -18,9 +18,13 @@
package org.apache.flink.streaming.scala.examples.windowing
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -28,6 +32,8 @@ import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
+import java.time.Duration
+
/**
* An example of grouped stream windowing in session windows with session timeout of 3 msec.
* A source fetches elements with key, timestamp, and count.
@@ -40,8 +46,6 @@ object SessionWindowing {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
val fileOutput = params.has("output")
@@ -80,7 +84,15 @@ object SessionWindowing {
.sum(2)
if (fileOutput) {
- aggregated.writeAsText(params.get("output"))
+ aggregated.sinkTo(FileSink.forRowFormat[(String, Long, Int)](
+ new Path(params.get("output")),
+ new SimpleStringEncoder())
+ .withRollingPolicy(DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink")
} else {
print("Printing result to stdout. Use --output to specify output path.")
aggregated.print()
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 33f9076..00cff26 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -19,22 +19,27 @@
package org.apache.flink.streaming.scala.examples.windowing
-import java.beans.Transient
-import java.util.concurrent.TimeUnit
-
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.windowing.util.CarSource
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
-import scala.language.postfixOps
-import scala.util.Random
+import java.time.Duration
+import java.util.concurrent.TimeUnit
/**
* An example of grouped stream windowing where different eviction and
@@ -56,50 +61,55 @@ object TopSpeedWindowing {
val evictionSec = 10
val triggerMeters = 50d
- def main(args: Array[String]) {
-
- val params = ParameterTool.fromArgs(args)
+ def main(args: Array[String]): Unit = {
+ val params = CLI.fromArgs(args)
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+ // application executed over bounded input will produce the same final results regardless
+ // of the configured execution mode. It is important to note what final means here: a job
+ // executing in STREAMING mode might produce incremental updates (think upserts in
+ // a database) while in BATCH mode, it would only produce one final result at the end. The
+ // final result will be the same if interpreted correctly, but getting there can be
+ // different.
+ //
+ // The “classic” execution behavior of the DataStream API is called STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs that require
+ // continuous incremental processing and are expected to stay online indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+ // can only do when we know that our input is bounded. For example, different
+ // join/aggregation strategies can be used, in addition to a different shuffle
+ // implementation that allows more efficient task scheduling and failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.executionMode)
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig.setGlobalJobParameters(params)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
-
- val cars =
- if (params.has("input")) {
- env.readTextFile(params.get("input"))
- .map(parseMap(_))
- .map(x => CarEvent(x._1, x._2, x._3, x._4))
- } else {
- println("Executing TopSpeedWindowing example with default inputs data set.")
- println("Use --input to specify file input.")
- env.addSource(new SourceFunction[CarEvent]() {
-
- val speeds = Array.fill[Integer](numOfCars)(50)
- val distances = Array.fill[Double](numOfCars)(0d)
- @Transient lazy val rand = new Random()
-
- var isRunning:Boolean = true
-
- override def run(ctx: SourceContext[CarEvent]) = {
- while (isRunning) {
- Thread.sleep(100)
-
- for (carId <- 0 until numOfCars) {
- if (rand.nextBoolean) speeds(carId) = Math.min(100, speeds(carId) + 5)
- else speeds(carId) = Math.max(0, speeds(carId) - 5)
-
- distances(carId) += speeds(carId) / 3.6d
- val record = CarEvent(carId, speeds(carId),
- distances(carId), System.currentTimeMillis)
- ctx.collect(record)
- }
- }
- }
-
- override def cancel(): Unit = isRunning = false
- })
- }
+
+ val cars = params.input match {
+ case Some(input) =>
+ // Create a new file source that will read files from a given set of directories.
+ // Each file will be processed as plain text and split based on newlines.
+ val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+ params.discoveryInterval.foreach { duration =>
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+ builder.monitorContinuously(duration)
+ }
+ env
+ .fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+ .map(line => parseMap(line))
+ .name("parse-input")
+ case None =>
+ env.addSource(CarSource(2)).name("in-memory-input")
+ }
val topSpeeds = cars
.assignAscendingTimestamps( _.time )
@@ -108,17 +118,26 @@ object TopSpeedWindowing {
.evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
.trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
- }, cars.getType().createSerializer(env.getConfig)))
+ }, cars.dataType.createSerializer(env.getConfig)))
// .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
// .every(Delta.of[CarEvent](triggerMeters,
// (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
.maxBy("speed")
- if (params.has("output")) {
- topSpeeds.writeAsText(params.get("output"))
- } else {
- println("Printing result to stdout. Use --output to specify output path.")
- topSpeeds.print()
+ params.output match {
+ case Some(output) =>
+ // Given an output directory, Flink will write the results to a file
+ // using a simple string encoding. In a production environment, this might
+ // be something more structured like CSV, Avro, JSON, or Parquet.
+ topSpeeds.sinkTo(FileSink.forRowFormat[CarEvent](output, new SimpleStringEncoder())
+ .withRollingPolicy(DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink")
+
+ case None => topSpeeds.print().name("print-sink")
}
env.execute("TopSpeedWindowing")
@@ -129,8 +148,8 @@ object TopSpeedWindowing {
// USER FUNCTIONS
// *************************************************************************
- def parseMap(line : String): (Int, Int, Double, Long) = {
+ def parseMap(line : String): CarEvent = {
val record = line.substring(1, line.length - 1).split(",")
- (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
+ CarEvent(record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
}
}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
index 07efd90..79ed824 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
@@ -18,10 +18,20 @@
package org.apache.flink.streaming.scala.examples.windowing
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.WordCount.Tokenizer
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+
+import java.time.Duration
/**
* Implements a windowed version of the streaming "WordCount" program.
@@ -50,51 +60,93 @@ import org.apache.flink.streaming.examples.wordcount.util.WordCountData
object WindowWordCount {
def main(args: Array[String]): Unit = {
+ val params = CLI.fromArgs(args)
- val params = ParameterTool.fromArgs(args)
-
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
val env = StreamExecutionEnvironment.getExecutionEnvironment
- // get input data
- val text =
- if (params.has("input")) {
- // read the text file from given input path
- env.readTextFile(params.get("input"))
- } else {
- println("Executing WindowWordCount example with default input data set.")
- println("Use --input to specify file input.")
- // get default test text data
- env.fromElements(WordCountData.WORDS: _*)
- }
+ // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+ // application executed over bounded input will produce the same final results regardless
+ // of the configured execution mode. It is important to note what final means here: a job
+ // executing in STREAMING mode might produce incremental updates (think upserts in
+ // a database) while in BATCH mode, it would only produce one final result at the end. The
+ // final result will be the same if interpreted correctly, but getting there can be
+ // different.
+ //
+ // The “classic” execution behavior of the DataStream API is called STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs that require
+ // continuous incremental processing and are expected to stay online indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+ // can only do when we know that our input is bounded. For example, different
+ // join/aggregation strategies can be used, in addition to a different shuffle
+ // implementation that allows more efficient task scheduling and failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.executionMode)
- // make parameters available in the web interface
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig.setGlobalJobParameters(params)
- val windowSize = params.getInt("window", 250)
- val slideSize = params.getInt("slide", 150)
+ val text = params.input match {
+ case Some(input) =>
+ // Create a new file source that will read files from a given set of directories.
+ // Each file will be processed as plain text and split based on newlines.
+ val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+ params.discoveryInterval.foreach { duration =>
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+ builder.monitorContinuously(duration)
+ }
+ env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+ case None =>
+ env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
+ }
- val counts: DataStream[(String, Int)] = text
- // split up the lines in pairs (2-tuple) containing: (word,1)
- .flatMap(_.toLowerCase.split("\\W+"))
- .filter(_.nonEmpty)
- .map((_, 1))
- .keyBy(_._1)
- // create windows of windowSize records slided every slideSize records
- .countWindow(windowSize, slideSize)
- // group by the tuple field "0" and sum up tuple field "1"
- .sum(1)
+ val windowSize = params.getInt("window").getOrElse(250)
+ val slideSize = params.getInt("slide").getOrElse(150)
- // emit result
- if (params.has("output")) {
- counts.writeAsText(params.get("output"))
- } else {
- println("Printing result to stdout. Use --output to specify output path.")
- counts.print()
+ val counts =
+ // The text lines read from the source are split into words
+ // using a user-defined function. The tokenizer, implemented below,
+ // will output each words as a (2-tuple) containing (word, 1)
+ text.flatMap(new Tokenizer)
+ .name("tokenizer")
+ // keyBy groups tuples based on the "_1" field, the word.
+ // Using a keyBy allows performing aggregations and other
+ // stateful transformations over data on a per-key basis.
+ // This is similar to a GROUP BY clause in a SQL query.
+ .keyBy(_._1)
+ // create windows of windowSize records slided every slideSize records
+ .countWindow(windowSize, slideSize)
+ // For each key, we perform a simple sum of the "1" field, the count.
+ // If the input data set is bounded, sum will output a final count for
+ // each word. If it is unbounded, it will continuously output updates
+ // each time it sees a new instance of each word in the stream.
+ .sum(1)
+ .name("counter")
+
+ params.output match {
+ case Some(output) =>
+ // Given an output directory, Flink will write the results to a file
+ // using a simple string encoding. In a production environment, this might
+ // be something more structured like CSV, Avro, JSON, or Parquet.
+ counts.sinkTo(FileSink.forRowFormat[(String, Int)](output, new SimpleStringEncoder())
+ .withRollingPolicy(DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink")
+
+ case None => counts.print().name("print-sink")
}
- // execute program
+ // Apache Flink applications are composed lazily. Calling execute
+ // submits the Job and begins processing.
env.execute("WindowWordCount")
}
-
}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala
new file mode 100644
index 0000000..8f1d4c4
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing.util
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.examples.windowing.util.{CarSource => JCarSource}
+import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing.CarEvent
+import org.apache.flink.api.java.tuple.{Tuple4 => JTuple4}
+
+import java.lang.{Integer => JInt, Double => JDouble, Long => JLong}
+
+/** A simple in-memory source. */
+object CarSource {
+ def apply(cars: Int): CarSource =
+ new CarSource(JCarSource.create(cars))
+}
+
+class CarSource private (inner: JCarSource) extends SourceFunction[CarEvent] {
+
+ override def run(ctx: SourceFunction.SourceContext[CarEvent]): Unit = {
+ inner.run(new WrappingCollector(ctx))
+ }
+
+ override def cancel(): Unit = inner.cancel()
+}
+
+private class WrappingCollector(ctx: SourceFunction.SourceContext[CarEvent])
+ extends SourceFunction.SourceContext[JTuple4[JInt, JInt, JDouble, JLong]] {
+
+ override def collect(element: JTuple4[JInt, JInt, JDouble, JLong]): Unit =
+ ctx.collect(CarEvent(element.f0, element.f1, element.f2, element.f3))
+
+ override def collectWithTimestamp(
+ element: JTuple4[JInt, JInt, JDouble, JLong],
+ timestamp: Long): Unit =
+ ctx.collectWithTimestamp(CarEvent(element.f0, element.f1, element.f2, element.f3), timestamp)
+
+ override def emitWatermark(mark: Watermark): Unit = ctx.emitWatermark(mark)
+
+ override def markAsTemporarilyIdle(): Unit = ctx.markAsTemporarilyIdle()
+
+ override def getCheckpointLock: AnyRef = ctx.getCheckpointLock
+
+ override def close(): Unit = ctx.close()
+}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index e45994f8..80776b9 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
-import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
import org.apache.flink.streaming.test.examples.join.WindowJoinData;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.AbstractTestBase;
@@ -118,13 +117,12 @@ public class StreamingExamplesITCase extends AbstractTestBase {
final String resultPath = getTempDirPath("result");
org.apache.flink.streaming.examples.windowing.SessionWindowing.main(
new String[] {"--output", resultPath});
- compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
}
@Test
public void testWindowWordCount() throws Exception {
- final String windowSize = "250";
- final String slideSize = "150";
+ final String windowSize = "25";
+ final String slideSize = "15";
final String textPath = createTempFile("text.txt", WordCountData.TEXT);
final String resultPath = getTempDirPath("result");
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index f980a1a..72dd508 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -53,7 +53,14 @@ public class TopSpeedWindowingExampleITCase extends TestLogger {
final String resultPath = temporaryFolder.newFolder().toURI().toString();
TopSpeedWindowing.main(
- new String[] {"--input", inputFile.getAbsolutePath(), "--output", resultPath});
+ new String[] {
+ "--input",
+ inputFile.getAbsolutePath(),
+ "--output",
+ resultPath,
+ "--execution-mode",
+ "AUTOMATIC"
+ });
compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
index 1e33899..0c993c7 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -34,7 +34,8 @@ public class TopSpeedWindowingExampleITCase extends AbstractTestBase {
TopSpeedWindowing.main(
new String[] {
"--input", textPath,
- "--output", resultPath
+ "--output", resultPath,
+ "--execution-mode", "AUTOMATIC"
});
compareResultsByLinesInMemory(
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index 4d6fe8b..d55405f 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
-import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData
import org.apache.flink.streaming.scala.examples.iteration.IterateExample
import org.apache.flink.streaming.scala.examples.join.WindowJoin
import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
@@ -106,13 +105,12 @@ class StreamingExamplesITCase extends AbstractTestBase {
def testSessionWindowing(): Unit = {
val resultPath = getTempDirPath("result")
SessionWindowing.main(Array("--output", resultPath))
- TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
}
@Test
def testWindowWordCount(): Unit = {
- val windowSize = "250"
- val slideSize = "150"
+ val windowSize = "25"
+ val slideSize = "15"
val textPath = createTempFile("text.txt", WordCountData.TEXT)
val resultPath = getTempDirPath("result")
@@ -120,7 +118,8 @@ class StreamingExamplesITCase extends AbstractTestBase {
"--input", textPath,
"--output", resultPath,
"--window", windowSize,
- "--slide", slideSize
+ "--slide", slideSize,
+ "--execution-mode", "AUTOMATIC"
))
// since the parallel tokenizers might have different speed