You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/15 21:23:01 UTC
[flink] 04/04: [FLINK-24830][examples] Update DataStream WordCount example
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 83a2541475228a4ff9e9a9def4049fb742353549
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Wed Nov 10 15:16:54 2021 -0600
[FLINK-24830][examples] Update DataStream WordCount example
This closes #17760
---
flink-examples/flink-examples-streaming/pom.xml | 44 +-----
.../streaming/examples/wordcount/WordCount.java | 142 ++++++++++++-----
.../streaming/examples/wordcount/util/CLI.java | 149 ++++++++++++++++++
.../scala/examples/wordcount/WordCount.scala | 169 +++++++++++++++------
.../scala/examples/wordcount/util/CLI.scala | 65 ++++++++
.../streaming/test/StreamingExamplesITCase.java | 5 +-
.../scala/examples/StreamingExamplesITCase.scala | 5 +-
7 files changed, 454 insertions(+), 125 deletions(-)
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index cff0fea..b4ab863 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -69,6 +69,12 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${project.version}</version>
</dependency>
@@ -134,7 +140,6 @@ under the License.
</configuration>
</plugin>
- <!-- get default data from flink-examples-batch package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
@@ -147,16 +152,6 @@ under the License.
</goals>
<configuration>
<artifactItems>
- <!-- For WordCount example data -->
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- <includes>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</includes>
- </artifactItem>
<!-- For JSON utilities -->
<artifactItem>
<groupId>org.apache.flink</groupId>
@@ -261,32 +256,6 @@ under the License.
</configuration>
</execution>
- <!-- WordCountPOJO -->
- <execution>
- <id>WordCountPOJO</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>WordCountPOJO</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</include>
- <include>META-INF/LICENSE</include>
- <include>META-INF/NOTICE</include>
- </includes>
- </configuration>
- </execution>
-
<!-- WordCount -->
<execution>
<id>WordCount</id>
@@ -307,6 +276,7 @@ under the License.
<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
<include>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/util/CLI.class</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
</includes>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index aa2323e..402ac8b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -17,30 +17,50 @@
package org.apache.flink.streaming.examples.wordcount;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text
- * files in a streaming fashion.
+ * files. This Job can be executed in both streaming and batch execution modes.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a newline character.
*
- * <p>The input is a plain text file with lines separated by newline characters.
+ * <p>Usage:
*
- * <p>Usage: <code>WordCount --input <path> --output <path></code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <ul>
+ * <li><code>--input <path></code>A list of input files and / or directories to read. If no
+ * input is provided, the program is run with default data from {@link WordCountData}.
+ * <li><code>--discovery-interval <duration></code>Turns the file reader into a continuous
+ * source that will monitor the provided input directories every interval and read any new
+ * files.
+ * <li><code>--output <path></code>The output directory where the Job will write the
+ * results. If no output path is provided, the Job will print the results to <code>stdout
+ * </code>.
+ * <li><code>--execution-mode <mode></code>The execution mode (BATCH, STREAMING, or
+ * AUTOMATIC) of this pipeline.
+ * </ul>
*
* <p>This example shows how to:
*
* <ul>
- * <li>write a simple Flink Streaming program,
- * <li>use tuple data types,
- * <li>write and use user-defined functions.
+ * <li>Write a simple Flink DataStream program
+ * <li>Use tuple data types
+ * <li>Write and use a user-defined function
* </ul>
*/
public class WordCount {
@@ -50,51 +70,93 @@ public class WordCount {
// *************************************************************************
public static void main(String[] args) throws Exception {
+ final CLI params = CLI.fromArgs(args);
- // Checking input parameters
- final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
-
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+ // application executed over bounded input will produce the same final results regardless
+ // of the configured execution mode. It is important to note what final means here: a job
+ // executing in STREAMING mode might produce incremental updates (think upserts in
+ // a database) while in BATCH mode, it would only produce one final result at the end. The
+ // final result will be the same if interpreted correctly, but getting there can be
+ // different.
+ //
+ // The “classic” execution behavior of the DataStream API is called STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs that require
+ // continuous incremental processing and are expected to stay online indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+ // can only do when we know that our input is bounded. For example, different
+ // join/aggregation strategies can be used, in addition to a different shuffle
+ // implementation that allows more efficient task scheduling and failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.getExecutionMode());
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig().setGlobalJobParameters(params);
- // get input data
- DataStream<String> text = null;
- if (params.has("input")) {
- // union all the inputs from text files
- for (String input : params.getMultiParameterRequired("input")) {
- if (text == null) {
- text = env.readTextFile(input);
- } else {
- text = text.union(env.readTextFile(input));
- }
- }
- Preconditions.checkNotNull(text, "Input DataStream should not be null.");
+ DataStream<String> text;
+ if (params.getInputs().isPresent()) {
+ // Create a new file source that will read files from a given set of directories.
+ // Each file will be processed as plain text and split based on newlines.
+ FileSource.FileSourceBuilder<String> builder =
+ FileSource.forRecordStreamFormat(
+ new TextLineFormat(), params.getInputs().get());
+
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+ params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+ text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
} else {
- System.out.println("Executing WordCount example with default input data set.");
- System.out.println("Use --input to specify file input.");
- // get default test text data
- text = env.fromElements(WordCountData.WORDS);
+ text = env.fromElements(WordCountData.WORDS).name("in-memory-input");
}
DataStream<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
+ // The text lines read from the source are split into words
+ // using a user-defined function. The tokenizer, implemented below,
+ // will output each word as a (2-tuple) containing (word, 1)
text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field "1"
+ .name("tokenizer")
+ // keyBy groups tuples based on the "0" field, the word.
+ // Using a keyBy allows performing aggregations and other
+ // stateful transformations over data on a per-key basis.
+ // This is similar to a GROUP BY clause in a SQL query.
.keyBy(value -> value.f0)
- .sum(1);
+ // For each key, we perform a simple sum of the "1" field, the count.
+ // If the input data stream is bounded, sum will output a final count for
+ // each word. If it is unbounded, it will continuously output updates
+ // each time it sees a new instance of each word in the stream.
+ .sum(1)
+ .name("counter");
- // emit result
- if (params.has("output")) {
- counts.writeAsText(params.get("output"));
+ if (params.getOutput().isPresent()) {
+ // Given an output directory, Flink will write the results to a file
+ // using a simple string encoding. In a production environment, this might
+ // be something more structured like CSV, Avro, JSON, or Parquet.
+ counts.sinkTo(
+ FileSink.<Tuple2<String, Integer>>forRowFormat(
+ params.getOutput().get(), new SimpleStringEncoder<>())
+ .withRollingPolicy(
+ DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink");
} else {
- System.out.println("Printing result to stdout. Use --output to specify output path.");
- counts.print();
+ counts.print().name("print-sink");
}
- // execute program
- env.execute("Streaming WordCount");
+
+ // Apache Flink applications are composed lazily. Calling execute
+ // submits the Job and begins processing.
+ env.execute("WordCount");
}
// *************************************************************************
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
new file mode 100644
index 0000000..ddf1111
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * A simple CLI parser for the {@link org.apache.flink.streaming.examples.wordcount.WordCount}
+ * example application.
+ */
+public class CLI extends ExecutionConfig.GlobalJobParameters {
+
+ public static final String INPUT_KEY = "input";
+ public static final String OUTPUT_KEY = "output";
+ public static final String DISCOVERY_INTERVAL = "discovery-interval";
+ public static final String EXECUTION_MODE = "execution-mode";
+
+ public static CLI fromArgs(String[] args) throws Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+ Path[] inputs = null;
+ if (params.has(INPUT_KEY)) {
+ inputs =
+ params.getMultiParameterRequired(INPUT_KEY).stream()
+ .map(Path::new)
+ .toArray(Path[]::new);
+ } else {
+ System.out.println("Executing example with default input data.");
+ System.out.println("Use --input to specify file input.");
+ }
+
+ Path output = null;
+ if (params.has(OUTPUT_KEY)) {
+ output = new Path(params.get(OUTPUT_KEY));
+ } else {
+ System.out.println("Printing result to stdout. Use --output to specify output path.");
+ }
+
+ Duration watchInterval = null;
+ if (params.has(DISCOVERY_INTERVAL)) {
+ watchInterval = TimeUtils.parseDuration(params.get(DISCOVERY_INTERVAL));
+ }
+
+ RuntimeExecutionMode executionMode = ExecutionOptions.RUNTIME_MODE.defaultValue();
+ if (params.has(EXECUTION_MODE)) {
+ executionMode = RuntimeExecutionMode.valueOf(params.get(EXECUTION_MODE).toUpperCase());
+ }
+
+ return new CLI(inputs, output, watchInterval, executionMode, params);
+ }
+
+ private final Path[] inputs;
+ private final Path output;
+ private final Duration discoveryInterval;
+ private final RuntimeExecutionMode executionMode;
+ private final MultipleParameterTool params;
+
+ private CLI(
+ Path[] inputs,
+ Path output,
+ Duration discoveryInterval,
+ RuntimeExecutionMode executionMode,
+ MultipleParameterTool params) {
+ this.inputs = inputs;
+ this.output = output;
+ this.discoveryInterval = discoveryInterval;
+ this.executionMode = executionMode;
+ this.params = params;
+ }
+
+ public Optional<Path[]> getInputs() {
+ return Optional.ofNullable(inputs);
+ }
+
+ public Optional<Duration> getDiscoveryInterval() {
+ return Optional.ofNullable(discoveryInterval);
+ }
+
+ public Optional<Path> getOutput() {
+ return Optional.ofNullable(output);
+ }
+
+ public RuntimeExecutionMode getExecutionMode() {
+ return executionMode;
+ }
+
+ public OptionalInt getInt(String key) {
+ if (params.has(key)) {
+ return OptionalInt.of(params.getInt(key));
+ }
+
+ return OptionalInt.empty();
+ }
+
+ @Override
+ public Map<String, String> toMap() {
+ return params.toMap();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ CLI cli = (CLI) o;
+ return Arrays.equals(inputs, cli.inputs)
+ && Objects.equals(output, cli.output)
+ && Objects.equals(discoveryInterval, cli.discoveryInterval);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(output, discoveryInterval);
+ result = 31 * result + Arrays.hashCode(inputs);
+ return result;
+ }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
index 271d737..46350a5 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
@@ -18,74 +18,155 @@
package org.apache.flink.streaming.scala.examples.wordcount
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
*
- * The input is a plain text file with lines separated by newline characters.
+ * The input is a [list of] plain text file[s] with lines separated by a newline character.
*
* Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
*
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no input is provided, the program is run with default data from [[WordCountData]].
*
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the results
+ * to `stdout`
*
- * - write a simple Flink Streaming program,
- * - use tuple data types,
- * - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or AUTOMATIC) of this
+ * pipeline.
*
+ * This example shows how to:
+ *
+ * - Write a simple Flink DataStream program
+ * - Use tuple data types
+ * - Write and use a user-defined function
*/
object WordCount {
- def main(args: Array[String]) {
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
- // Checking input parameters
- val params = ParameterTool.fromArgs(args)
+ def main(args: Array[String]): Unit = {
+ val params = CLI.fromArgs(args)
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
val env = StreamExecutionEnvironment.getExecutionEnvironment
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+ // application executed over bounded input will produce the same final results regardless
+ // of the configured execution mode. It is important to note what final means here: a job
+ // executing in STREAMING mode might produce incremental updates (think upserts in
+ // a database) while in BATCH mode, it would only produce one final result at the end. The
+ // final result will be the same if interpreted correctly, but getting there can be
+ // different.
+ //
+ // The “classic” execution behavior of the DataStream API is called STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs that require
+ // continuous incremental processing and are expected to stay online indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+ // can only do when we know that our input is bounded. For example, different
+ // join/aggregation strategies can be used, in addition to a different shuffle
+ // implementation that allows more efficient task scheduling and failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.executionMode)
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig.setGlobalJobParameters(params)
// get input data
- val text =
- // read the text file from given input path
- if (params.has("input")) {
- env.readTextFile(params.get("input"))
- } else {
- println("Executing WordCount example with default inputs data set.")
- println("Use --input to specify file input.")
- // get default test text data
- env.fromElements(WordCountData.WORDS: _*)
+ val text = params.input match {
+ case Some(input) =>
+ // Create a new file source that will read files from a given set of directories.
+ // Each file will be processed as plain text and split based on newlines.
+ val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+ params.discoveryInterval.foreach { duration =>
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+ builder.monitorContinuously(duration)
+ }
+ env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+ case None =>
+ env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
}
- val counts: DataStream[(String, Int)] = text
- // split up the lines in pairs (2-tuples) containing: (word,1)
- .flatMap(_.toLowerCase.split("\\W+"))
- .filter(_.nonEmpty)
- .map((_, 1))
- // group by the tuple field "0" and sum up tuple field "1"
- .keyBy(_._1)
- .sum(1)
-
- // emit result
- if (params.has("output")) {
- counts.writeAsText(params.get("output"))
- } else {
- println("Printing result to stdout. Use --output to specify output path.")
- counts.print()
+ val counts =
+ // The text lines read from the source are split into words
+ // using a user-defined function. The tokenizer, implemented below,
+ // will output each word as a (2-tuple) containing (word, 1)
+ text.flatMap(new Tokenizer)
+ .name("tokenizer")
+ // keyBy groups tuples based on the "_1" field, the word.
+ // Using a keyBy allows performing aggregations and other
+ // stateful transformations over data on a per-key basis.
+ // This is similar to a GROUP BY clause in a SQL query.
+ .keyBy(_._1)
+ // For each key, we perform a simple sum of the "1" field, the count.
+ // If the input data stream is bounded, sum will output a final count for
+ // each word. If it is unbounded, it will continuously output updates
+ // each time it sees a new instance of each word in the stream.
+ .sum(1)
+ .name("counter")
+
+ params.output match {
+ case Some(output) =>
+ // Given an output directory, Flink will write the results to a file
+ // using a simple string encoding. In a production environment, this might
+ // be something more structured like CSV, Avro, JSON, or Parquet.
+ counts.sinkTo(FileSink.forRowFormat[(String, Int)](output, new SimpleStringEncoder())
+ .withRollingPolicy(DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink")
+
+ case None => counts.print().name("print-sink")
}
- // execute program
- env.execute("Streaming WordCount")
+ // Apache Flink applications are composed lazily. Calling execute
+ // submits the Job and begins processing.
+ env.execute("WordCount")
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits a sentence into words as a user-defined
+ * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
+ * form of "(word,1)".
+ */
+ final class Tokenizer extends FlatMapFunction[String, (String, Int)] {
+ override def flatMap(value: String, out: Collector[(String, Int)]): Unit = for {
+ token <- value.toLowerCase.split("\\W+")
+ if token.nonEmpty
+ } out.collect((token, 1))
}
}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala
new file mode 100644
index 0000000..f7f7b0b
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.wordcount.util
+
+import org.apache.flink.api.common.{ExecutionConfig, RuntimeExecutionMode}
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.examples.wordcount.util.{CLI => JCLI}
+
+import java.time.Duration
+import java.util
+import java.util.Optional
+
+/**
+ * A simple CLI parser for the [[org.apache.flink.streaming.scala.examples.wordcount.WordCount]]
+ * example application.
+ */
+object CLI {
+ def fromArgs(args: Array[String]) = new CLI(JCLI.fromArgs(args))
+}
+
+class CLI private (val inner: JCLI) extends ExecutionConfig.GlobalJobParameters {
+
+ def input: Option[Array[Path]] = asScala(inner.getInputs)
+
+ def discoveryInterval: Option[Duration] = asScala(inner.getDiscoveryInterval)
+
+ def output: Option[Path] = asScala(inner.getOutput)
+
+ def executionMode: RuntimeExecutionMode = inner.getExecutionMode
+
+ def getInt(key: String): Option[Int] = {
+ val result = inner.getInt(key)
+ if (result.isPresent) {
+ Option(result.getAsInt)
+ } else {
+ None
+ }
+ }
+
+ override def equals(obj: Any): Boolean =
+ obj.isInstanceOf[CLI] && inner.equals(obj.asInstanceOf[CLI].inner)
+
+ override def hashCode(): Int = inner.hashCode()
+
+ override def toMap: util.Map[String, String] = inner.toMap
+
+ private def asScala[T](optional: Optional[T]): Option[T] =
+ Option(optional.orElse(null.asInstanceOf[T]))
+}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 7598f98..e45994f8 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -150,10 +150,11 @@ public class StreamingExamplesITCase extends AbstractTestBase {
org.apache.flink.streaming.examples.wordcount.WordCount.main(
new String[] {
"--input", textPath,
- "--output", resultPath
+ "--output", resultPath,
+ "--execution-mode", "automatic"
});
- compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+ compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath);
}
/**
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index de00b95..4d6fe8b 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -136,11 +136,12 @@ class StreamingExamplesITCase extends AbstractTestBase {
WordCount.main(Array(
"--input", textPath,
- "--output", resultPath
+ "--output", resultPath,
+ "--execution-mode", "automatic"
))
TestBaseUtils.compareResultsByLinesInMemory(
- WordCountData.STREAMING_COUNTS_AS_TUPLES,
+ WordCountData.COUNTS_AS_TUPLES,
resultPath)
}
}