You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/15 21:23:01 UTC

[flink] 04/04: [FLINK-24830][examples] Update DataStream WordCount example

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83a2541475228a4ff9e9a9def4049fb742353549
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Wed Nov 10 15:16:54 2021 -0600

    [FLINK-24830][examples] Update DataStream WordCount example
    
    This closes #17760
---
 flink-examples/flink-examples-streaming/pom.xml    |  44 +-----
 .../streaming/examples/wordcount/WordCount.java    | 142 ++++++++++++-----
 .../streaming/examples/wordcount/util/CLI.java     | 149 ++++++++++++++++++
 .../scala/examples/wordcount/WordCount.scala       | 169 +++++++++++++++------
 .../scala/examples/wordcount/util/CLI.scala        |  65 ++++++++
 .../streaming/test/StreamingExamplesITCase.java    |   5 +-
 .../scala/examples/StreamingExamplesITCase.scala   |   5 +-
 7 files changed, 454 insertions(+), 125 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index cff0fea..b4ab863 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -69,6 +69,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-files</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -134,7 +140,6 @@ under the License.
 				</configuration>
 			</plugin>
 			
-			<!-- get default data from flink-examples-batch package -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-dependency-plugin</artifactId>
@@ -147,16 +152,6 @@ under the License.
 						</goals>
 						<configuration>
 							<artifactItems>
-								<!-- For WordCount example data -->
-								<artifactItem>
-									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
-									<version>${project.version}</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<includes>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</includes>
-								</artifactItem>
 								<!-- For JSON utilities -->
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
@@ -261,32 +256,6 @@ under the License.
 						</configuration>
 					</execution>
 
-					<!-- WordCountPOJO -->
-					<execution>
-						<id>WordCountPOJO</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>WordCountPOJO</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</include>
-								<include>META-INF/LICENSE</include>
-								<include>META-INF/NOTICE</include>
-							</includes>
-						</configuration>
-					</execution>
-
 					<!-- WordCount -->
 					<execution>
 						<id>WordCount</id>
@@ -307,6 +276,7 @@ under the License.
 								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
 								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
 								<include>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/util/CLI.class</include>
 								<include>META-INF/LICENSE</include>
 								<include>META-INF/NOTICE</include>
 							</includes>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index aa2323e..402ac8b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -17,30 +17,50 @@
 
 package org.apache.flink.streaming.examples.wordcount;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence histogram over text
- * files in a streaming fashion.
+ * files. This Job can be executed in both streaming and batch execution modes.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a newline character.
  *
- * <p>The input is a plain text file with lines separated by newline characters.
+ * <p>Usage:
  *
- * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or directories to read. If no
+ *       input is provided, the program is run with default data from {@link WordCountData}.
+ *   <li><code>--discovery-interval &lt;duration&gt;</code>Turns the file reader into a continuous
+ *       source that will monitor the provided input directories every interval and read any new
+ *       files.
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job will write the
+ *       results. If no output path is provided, the Job will print the results to <code>stdout
+ *       </code>.
+ *   <li><code>--execution-mode &lt;mode&gt;</code>The execution mode (BATCH, STREAMING, or
+ *       AUTOMATIC) of this pipeline.
+ * </ul>
  *
  * <p>This example shows how to:
  *
  * <ul>
- *   <li>write a simple Flink Streaming program,
- *   <li>use tuple data types,
- *   <li>write and use user-defined functions.
+ *   <li>Write a simple Flink DataStream program
+ *   <li>Use tuple data types
+ *   <li>Write and use a user-defined function
  * </ul>
  */
 public class WordCount {
@@ -50,51 +70,93 @@ public class WordCount {
     // *************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        // Checking input parameters
-        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
-
-        // set up the execution environment
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // make parameters available in the web interface
+        // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+        // application executed over bounded input will produce the same final results regardless
+        // of the configured execution mode. It is important to note what final means here: a job
+        // executing in STREAMING mode might produce incremental updates (think upserts in
+        // a database) while in BATCH mode, it would only produce one final result at the end. The
+        // final result will be the same if interpreted correctly, but getting there can be
+        // different.
+        //
+        // The “classic” execution behavior of the DataStream API is called STREAMING execution
+        // mode. Applications should use streaming execution for unbounded jobs that require
+        // continuous incremental processing and are expected to stay online indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+        // can only do when we know that our input is bounded. For example, different
+        // join/aggregation strategies can be used, in addition to a different shuffle
+        // implementation that allows more efficient task scheduling and failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+        // are bounded and otherwise STREAMING.
+        env.setRuntimeMode(params.getExecutionMode());
+
+        // This optional step makes the input parameters
+        // available in the Flink UI.
         env.getConfig().setGlobalJobParameters(params);
 
-        // get input data
-        DataStream<String> text = null;
-        if (params.has("input")) {
-            // union all the inputs from text files
-            for (String input : params.getMultiParameterRequired("input")) {
-                if (text == null) {
-                    text = env.readTextFile(input);
-                } else {
-                    text = text.union(env.readTextFile(input));
-                }
-            }
-            Preconditions.checkNotNull(text, "Input DataStream should not be null.");
+        DataStream<String> text;
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set of directories.
+            // Each file will be processed as plain text and split based on newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineFormat(), params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
         } else {
-            System.out.println("Executing WordCount example with default input data set.");
-            System.out.println("Use --input to specify file input.");
-            // get default test text data
-            text = env.fromElements(WordCountData.WORDS);
+            text = env.fromElements(WordCountData.WORDS).name("in-memory-input");
         }
 
         DataStream<Tuple2<String, Integer>> counts =
-                // split up the lines in pairs (2-tuples) containing: (word,1)
+                // The text lines read from the source are split into words
+                // using a user-defined function. The tokenizer, implemented below,
+                // will output each word as a (2-tuple) containing (word, 1)
                 text.flatMap(new Tokenizer())
-                        // group by the tuple field "0" and sum up tuple field "1"
+                        .name("tokenizer")
+                        // keyBy groups tuples based on the "0" field, the word.
+                        // Using a keyBy allows performing aggregations and other
+                        // stateful transformations over data on a per-key basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
                         .keyBy(value -> value.f0)
-                        .sum(1);
+                        // For each key, we perform a simple sum of the "1" field, the count.
+                        // If the input data stream is bounded, sum will output a final count for
+                        // each word. If it is unbounded, it will continuously output updates
+                        // each time it sees a new instance of each word in the stream.
+                        .sum(1)
+                        .name("counter");
 
-        // emit result
-        if (params.has("output")) {
-            counts.writeAsText(params.get("output"));
+        if (params.getOutput().isPresent()) {
+            // Given an output directory, Flink will write the results to a file
+            // using a simple string encoding. In a production environment, this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            counts.sinkTo(
+                            FileSink.<Tuple2<String, Integer>>forRowFormat(
+                                            params.getOutput().get(), new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("file-sink");
         } else {
-            System.out.println("Printing result to stdout. Use --output to specify output path.");
-            counts.print();
+            counts.print().name("print-sink");
         }
-        // execute program
-        env.execute("Streaming WordCount");
+
+        // Apache Flink applications are composed lazily. Calling execute
+        // submits the Job and begins processing.
+        env.execute("WordCount");
     }
 
     // *************************************************************************
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
new file mode 100644
index 0000000..ddf1111
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * A simple CLI parser for the {@link org.apache.flink.streaming.examples.wordcount.WordCount}
+ * example application.
+ */
+public class CLI extends ExecutionConfig.GlobalJobParameters {
+
+    public static final String INPUT_KEY = "input";
+    public static final String OUTPUT_KEY = "output";
+    public static final String DISCOVERY_INTERVAL = "discovery-interval";
+    public static final String EXECUTION_MODE = "execution-mode";
+
+    public static CLI fromArgs(String[] args) throws Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+        Path[] inputs = null;
+        if (params.has(INPUT_KEY)) {
+            inputs =
+                    params.getMultiParameterRequired(INPUT_KEY).stream()
+                            .map(Path::new)
+                            .toArray(Path[]::new);
+        } else {
+            System.out.println("Executing example with default input data.");
+            System.out.println("Use --input to specify file input.");
+        }
+
+        Path output = null;
+        if (params.has(OUTPUT_KEY)) {
+            output = new Path(params.get(OUTPUT_KEY));
+        } else {
+            System.out.println("Printing result to stdout. Use --output to specify output path.");
+        }
+
+        Duration watchInterval = null;
+        if (params.has(DISCOVERY_INTERVAL)) {
+            watchInterval = TimeUtils.parseDuration(params.get(DISCOVERY_INTERVAL));
+        }
+
+        RuntimeExecutionMode executionMode = ExecutionOptions.RUNTIME_MODE.defaultValue();
+        if (params.has(EXECUTION_MODE)) {
+            executionMode = RuntimeExecutionMode.valueOf(params.get(EXECUTION_MODE).toUpperCase());
+        }
+
+        return new CLI(inputs, output, watchInterval, executionMode, params);
+    }
+
+    private final Path[] inputs;
+    private final Path output;
+    private final Duration discoveryInterval;
+    private final RuntimeExecutionMode executionMode;
+    private final MultipleParameterTool params;
+
+    private CLI(
+            Path[] inputs,
+            Path output,
+            Duration discoveryInterval,
+            RuntimeExecutionMode executionMode,
+            MultipleParameterTool params) {
+        this.inputs = inputs;
+        this.output = output;
+        this.discoveryInterval = discoveryInterval;
+        this.executionMode = executionMode;
+        this.params = params;
+    }
+
+    public Optional<Path[]> getInputs() {
+        return Optional.ofNullable(inputs);
+    }
+
+    public Optional<Duration> getDiscoveryInterval() {
+        return Optional.ofNullable(discoveryInterval);
+    }
+
+    public Optional<Path> getOutput() {
+        return Optional.ofNullable(output);
+    }
+
+    public RuntimeExecutionMode getExecutionMode() {
+        return executionMode;
+    }
+
+    public OptionalInt getInt(String key) {
+        if (params.has(key)) {
+            return OptionalInt.of(params.getInt(key));
+        }
+
+        return OptionalInt.empty();
+    }
+
+    @Override
+    public Map<String, String> toMap() {
+        return params.toMap();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        CLI cli = (CLI) o;
+        return Arrays.equals(inputs, cli.inputs)
+                && Objects.equals(output, cli.output)
+                && Objects.equals(discoveryInterval, cli.discoveryInterval);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(output, discoveryInterval);
+        result = 31 * result + Arrays.hashCode(inputs);
+        return result;
+    }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
index 271d737..46350a5 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
@@ -18,74 +18,155 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
- * The input is a plain text file with lines separated by newline characters.
+ * The input is a [list of] plain text file[s] with lines separated by a newline character.
  *
  * Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
  *
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no input is provided, the program is run with default data from [[WordCountData]].
  *
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the results
+ * to `stdout`
  *
- *  - write a simple Flink Streaming program,
- *  - use tuple data types,
- *  - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or AUTOMATIC) of this
+ * pipeline.
  *
+ * This example shows how to:
+ *
+ *  - Write a simple Flink DataStream program
+ *  - Use tuple data types
+ *  - Write and use a user-defined function
  */
 object WordCount {
 
-  def main(args: Array[String]) {
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
 
-    // Checking input parameters
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // make parameters available in the web interface
+    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+    // application executed over bounded input will produce the same final results regardless
+    // of the configured execution mode. It is important to note what final means here: a job
+    // executing in STREAMING mode might produce incremental updates (think upserts in
+    // a database) while in BATCH mode, it would only produce one final result at the end. The
+    // final result will be the same if interpreted correctly, but getting there can be
+    // different.
+    //
+    // The “classic” execution behavior of the DataStream API is called STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs that require
+    // continuous incremental processing and are expected to stay online indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+    // can only do when we know that our input is bounded. For example, different
+    // join/aggregation strategies can be used, in addition to a different shuffle
+    // implementation that allows more efficient task scheduling and failure recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
+
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
 
     // get input data
-    val text =
-    // read the text file from given input path
-    if (params.has("input")) {
-      env.readTextFile(params.get("input"))
-    } else {
-      println("Executing WordCount example with default inputs data set.")
-      println("Use --input to specify file input.")
-      // get default test text data
-      env.fromElements(WordCountData.WORDS: _*)
+    val text = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of directories.
+        // Each file will be processed as plain text and split based on newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+      case None =>
+        env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
     }
 
-    val counts: DataStream[(String, Int)] = text
-      // split up the lines in pairs (2-tuples) containing: (word,1)
-      .flatMap(_.toLowerCase.split("\\W+"))
-      .filter(_.nonEmpty)
-      .map((_, 1))
-      // group by the tuple field "0" and sum up tuple field "1"
-      .keyBy(_._1)
-      .sum(1)
-
-    // emit result
-    if (params.has("output")) {
-      counts.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      counts.print()
+    val counts =
+      // The text lines read from the source are split into words
+      // using a user-defined function. The tokenizer, implemented below,
+      // will output each word as a (2-tuple) containing (word, 1)
+      text.flatMap(new Tokenizer)
+        .name("tokenizer")
+        // keyBy groups tuples based on the "_1" field, the word.
+        // Using a keyBy allows performing aggregations and other
+        // stateful transformations over data on a per-key basis.
+        // This is similar to a GROUP BY clause in a SQL query.
+        .keyBy(_._1)
+        // For each key, we perform a simple sum of the "1" field, the count.
+        // If the input data stream is bounded, sum will output a final count for
+        // each word. If it is unbounded, it will continuously output updates
+        // each time it sees a new instance of each word in the stream.
+        .sum(1)
+        .name("counter")
+
+    params.output match {
+      case Some(output) =>
+        // Given an output directory, Flink will write the results to a file
+        // using a simple string encoding. In a production environment, this might
+        // be something more structured like CSV, Avro, JSON, or Parquet.
+        counts.sinkTo(FileSink.forRowFormat[(String, Int)](output, new SimpleStringEncoder())
+          .withRollingPolicy(DefaultRollingPolicy.builder()
+              .withMaxPartSize(MemorySize.ofMebiBytes(1))
+              .withRolloverInterval(Duration.ofSeconds(10))
+            .build())
+          .build())
+          .name("file-sink")
+
+      case None => counts.print().name("print-sink")
     }
 
-    // execute program
-    env.execute("Streaming WordCount")
+    // Apache Flink applications are composed lazily. Calling execute
+    // submits the Job and begins processing.
+    env.execute("WordCount")
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   * Implements the string tokenizer that splits a sentence into words as a user-defined
+   * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
+   * form of "(word,1)".
+   */
+  final class Tokenizer extends FlatMapFunction[String, (String, Int)] {
+    override def flatMap(value: String, out: Collector[(String, Int)]): Unit = for {
+      token <- value.toLowerCase.split("\\W+")
+      if token.nonEmpty
+    } out.collect((token, 1))
   }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala
new file mode 100644
index 0000000..f7f7b0b
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.wordcount.util
+
+import org.apache.flink.api.common.{ExecutionConfig, RuntimeExecutionMode}
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.examples.wordcount.util.{CLI => JCLI}
+
+import java.time.Duration
+import java.util
+import java.util.Optional
+
+/**
+ * A simple CLI parser for the [[org.apache.flink.streaming.scala.examples.wordcount.WordCount]]
+ * example application.
+ */
+object CLI {
+  def fromArgs(args: Array[String]) = new CLI(JCLI.fromArgs(args))
+}
+
+class CLI private (val inner: JCLI) extends ExecutionConfig.GlobalJobParameters {
+
+  def input: Option[Array[Path]] = asScala(inner.getInputs)
+
+  def discoveryInterval: Option[Duration] = asScala(inner.getDiscoveryInterval)
+
+  def output: Option[Path] = asScala(inner.getOutput)
+
+  def executionMode: RuntimeExecutionMode = inner.getExecutionMode
+
+  def getInt(key: String): Option[Int] = {
+    val result = inner.getInt(key)
+    if (result.isPresent) {
+      Option(result.getAsInt)
+    } else {
+      None
+    }
+  }
+
+  override def equals(obj: Any): Boolean =
+    obj.isInstanceOf[CLI] && inner.equals(obj.asInstanceOf[CLI].inner)
+
+  override def hashCode(): Int = inner.hashCode()
+
+  override def toMap: util.Map[String, String] = inner.toMap
+
+  private def asScala[T](optional: Optional[T]): Option[T] =
+    Option(optional.orElse(null.asInstanceOf[T]))
+}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 7598f98..e45994f8 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -150,10 +150,11 @@ public class StreamingExamplesITCase extends AbstractTestBase {
         org.apache.flink.streaming.examples.wordcount.WordCount.main(
                 new String[] {
                     "--input", textPath,
-                    "--output", resultPath
+                    "--output", resultPath,
+                    "--execution-mode", "automatic"
                 });
 
-        compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+        compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath);
     }
 
     /**
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index de00b95..4d6fe8b 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -136,11 +136,12 @@ class StreamingExamplesITCase extends AbstractTestBase {
 
     WordCount.main(Array(
       "--input", textPath,
-      "--output", resultPath
+      "--output", resultPath,
+      "--execution-mode", "automatic"
     ))
 
     TestBaseUtils.compareResultsByLinesInMemory(
-      WordCountData.STREAMING_COUNTS_AS_TUPLES,
+      WordCountData.COUNTS_AS_TUPLES,
       resultPath)
   }
 }