You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/15 21:22:57 UTC

[flink] branch master updated (c40081d -> 83a2541)

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c40081d  [FLINK-24689][runtime-web] Add log's last modified time in log list view
     new df2c47d  [hotfix][testutils] Read result files recursively
     new 3762c3b  [hotfix][yarn-tests] drop assertion on streaming word count output
     new a3830ed  [hotfix][e2e] Run kerberos test under AUTOMATIC execution
     new 83a2541  [FLINK-24830][examples] Update DataStream WordCount example

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test_yarn_application_kerberos_docker.sh       |  10 --
 .../test-scripts/test_yarn_job_kerberos_docker.sh  |  10 --
 flink-examples/flink-examples-streaming/pom.xml    |  44 +-----
 .../streaming/examples/wordcount/WordCount.java    | 142 ++++++++++++-----
 .../streaming/examples/wordcount/util/CLI.java     | 149 ++++++++++++++++++
 .../scala/examples/wordcount/WordCount.scala       | 169 +++++++++++++++------
 .../scala/examples/wordcount/util/CLI.scala        |  65 ++++++++
 .../streaming/test/StreamingExamplesITCase.java    |   5 +-
 .../scala/examples/StreamingExamplesITCase.scala   |   5 +-
 .../org/apache/flink/test/util/TestBaseUtils.java  |  35 +++--
 .../yarn/YARNSessionCapacitySchedulerITCase.java   |  12 --
 11 files changed, 475 insertions(+), 171 deletions(-)
 create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
 create mode 100644 flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala

[flink] 04/04: [FLINK-24830][examples] Update DataStream WordCount example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83a2541475228a4ff9e9a9def4049fb742353549
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Wed Nov 10 15:16:54 2021 -0600

    [FLINK-24830][examples] Update DataStream WordCount example
    
    This closes #17760
---
 flink-examples/flink-examples-streaming/pom.xml    |  44 +-----
 .../streaming/examples/wordcount/WordCount.java    | 142 ++++++++++++-----
 .../streaming/examples/wordcount/util/CLI.java     | 149 ++++++++++++++++++
 .../scala/examples/wordcount/WordCount.scala       | 169 +++++++++++++++------
 .../scala/examples/wordcount/util/CLI.scala        |  65 ++++++++
 .../streaming/test/StreamingExamplesITCase.java    |   5 +-
 .../scala/examples/StreamingExamplesITCase.scala   |   5 +-
 7 files changed, 454 insertions(+), 125 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index cff0fea..b4ab863 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -69,6 +69,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-files</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -134,7 +140,6 @@ under the License.
 				</configuration>
 			</plugin>
 			
-			<!-- get default data from flink-examples-batch package -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-dependency-plugin</artifactId>
@@ -147,16 +152,6 @@ under the License.
 						</goals>
 						<configuration>
 							<artifactItems>
-								<!-- For WordCount example data -->
-								<artifactItem>
-									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
-									<version>${project.version}</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<includes>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</includes>
-								</artifactItem>
 								<!-- For JSON utilities -->
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
@@ -261,32 +256,6 @@ under the License.
 						</configuration>
 					</execution>
 
-					<!-- WordCountPOJO -->
-					<execution>
-						<id>WordCountPOJO</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>WordCountPOJO</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</include>
-								<include>META-INF/LICENSE</include>
-								<include>META-INF/NOTICE</include>
-							</includes>
-						</configuration>
-					</execution>
-
 					<!-- WordCount -->
 					<execution>
 						<id>WordCount</id>
@@ -307,6 +276,7 @@ under the License.
 								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
 								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
 								<include>org/apache/flink/streaming/examples/wordcount/util/WordCountData.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/util/CLI.class</include>
 								<include>META-INF/LICENSE</include>
 								<include>META-INF/NOTICE</include>
 							</includes>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index aa2323e..402ac8b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -17,30 +17,50 @@
 
 package org.apache.flink.streaming.examples.wordcount;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence histogram over text
- * files in a streaming fashion.
+ * files. This Job can be executed in both streaming and batch execution modes.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a newline character.
  *
- * <p>The input is a plain text file with lines separated by newline characters.
+ * <p>Usage:
  *
- * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or directories to read. If no
+ *       input is provided, the program is run with default data from {@link WordCountData}.
+ *   <li><code>--discovery-interval &lt;duration&gt;</code>Turns the file reader into a continuous
+ *       source that will monitor the provided input directories every interval and read any new
+ *       files.
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job will write the
+ *       results. If no output path is provided, the Job will print the results to <code>stdout
+ *       </code>.
+ *   <li><code>--execution-mode &lt;mode&gt;</code>The execution mode (BATCH, STREAMING, or
+ *       AUTOMATIC) of this pipeline.
+ * </ul>
  *
  * <p>This example shows how to:
  *
  * <ul>
- *   <li>write a simple Flink Streaming program,
- *   <li>use tuple data types,
- *   <li>write and use user-defined functions.
+ *   <li>Write a simple Flink DataStream program
+ *   <li>Use tuple data types
+ *   <li>Write and use a user-defined function
  * </ul>
  */
 public class WordCount {
@@ -50,51 +70,93 @@ public class WordCount {
     // *************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        // Checking input parameters
-        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
-
-        // set up the execution environment
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // make parameters available in the web interface
+        // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+        // application executed over bounded input will produce the same final results regardless
+        // of the configured execution mode. It is important to note what final means here: a job
+        // executing in STREAMING mode might produce incremental updates (think upserts in
+        // a database) while in BATCH mode, it would only produce one final result at the end. The
+        // final result will be the same if interpreted correctly, but getting there can be
+        // different.
+        //
+        // The “classic” execution behavior of the DataStream API is called STREAMING execution
+        // mode. Applications should use streaming execution for unbounded jobs that require
+        // continuous incremental processing and are expected to stay online indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+        // can only do when we know that our input is bounded. For example, different
+        // join/aggregation strategies can be used, in addition to a different shuffle
+        // implementation that allows more efficient task scheduling and failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+        // are bounded and otherwise STREAMING.
+        env.setRuntimeMode(params.getExecutionMode());
+
+        // This optional step makes the input parameters
+        // available in the Flink UI.
         env.getConfig().setGlobalJobParameters(params);
 
-        // get input data
-        DataStream<String> text = null;
-        if (params.has("input")) {
-            // union all the inputs from text files
-            for (String input : params.getMultiParameterRequired("input")) {
-                if (text == null) {
-                    text = env.readTextFile(input);
-                } else {
-                    text = text.union(env.readTextFile(input));
-                }
-            }
-            Preconditions.checkNotNull(text, "Input DataStream should not be null.");
+        DataStream<String> text;
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set of directories.
+            // Each file will be processed as plain text and split based on newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineFormat(), params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
         } else {
-            System.out.println("Executing WordCount example with default input data set.");
-            System.out.println("Use --input to specify file input.");
-            // get default test text data
-            text = env.fromElements(WordCountData.WORDS);
+            text = env.fromElements(WordCountData.WORDS).name("in-memory-input");
         }
 
         DataStream<Tuple2<String, Integer>> counts =
-                // split up the lines in pairs (2-tuples) containing: (word,1)
+                // The text lines read from the source are split into words
+                // using a user-defined function. The tokenizer, implemented below,
+                // will output each word as a (2-tuple) containing (word, 1)
                 text.flatMap(new Tokenizer())
-                        // group by the tuple field "0" and sum up tuple field "1"
+                        .name("tokenizer")
+                        // keyBy groups tuples based on the "0" field, the word.
+                        // Using a keyBy allows performing aggregations and other
+                        // stateful transformations over data on a per-key basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
                         .keyBy(value -> value.f0)
-                        .sum(1);
+                        // For each key, we perform a simple sum of the "1" field, the count.
+                        // If the input data stream is bounded, sum will output a final count for
+                        // each word. If it is unbounded, it will continuously output updates
+                        // each time it sees a new instance of each word in the stream.
+                        .sum(1)
+                        .name("counter");
 
-        // emit result
-        if (params.has("output")) {
-            counts.writeAsText(params.get("output"));
+        if (params.getOutput().isPresent()) {
+            // Given an output directory, Flink will write the results to a file
+            // using a simple string encoding. In a production environment, this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            counts.sinkTo(
+                            FileSink.<Tuple2<String, Integer>>forRowFormat(
+                                            params.getOutput().get(), new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("file-sink");
         } else {
-            System.out.println("Printing result to stdout. Use --output to specify output path.");
-            counts.print();
+            counts.print().name("print-sink");
         }
-        // execute program
-        env.execute("Streaming WordCount");
+
+        // Apache Flink applications are composed lazily. Calling execute
+        // submits the Job and begins processing.
+        env.execute("WordCount");
     }
 
     // *************************************************************************
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
new file mode 100644
index 0000000..ddf1111
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * A simple CLI parser for the {@link org.apache.flink.streaming.examples.wordcount.WordCount}
+ * example application.
+ */
+public class CLI extends ExecutionConfig.GlobalJobParameters {
+
+    public static final String INPUT_KEY = "input";
+    public static final String OUTPUT_KEY = "output";
+    public static final String DISCOVERY_INTERVAL = "discovery-interval";
+    public static final String EXECUTION_MODE = "execution-mode";
+
+    public static CLI fromArgs(String[] args) throws Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+        Path[] inputs = null;
+        if (params.has(INPUT_KEY)) {
+            inputs =
+                    params.getMultiParameterRequired(INPUT_KEY).stream()
+                            .map(Path::new)
+                            .toArray(Path[]::new);
+        } else {
+            System.out.println("Executing example with default input data.");
+            System.out.println("Use --input to specify file input.");
+        }
+
+        Path output = null;
+        if (params.has(OUTPUT_KEY)) {
+            output = new Path(params.get(OUTPUT_KEY));
+        } else {
+            System.out.println("Printing result to stdout. Use --output to specify output path.");
+        }
+
+        Duration watchInterval = null;
+        if (params.has(DISCOVERY_INTERVAL)) {
+            watchInterval = TimeUtils.parseDuration(params.get(DISCOVERY_INTERVAL));
+        }
+
+        RuntimeExecutionMode executionMode = ExecutionOptions.RUNTIME_MODE.defaultValue();
+        if (params.has(EXECUTION_MODE)) {
+            executionMode = RuntimeExecutionMode.valueOf(params.get(EXECUTION_MODE).toUpperCase());
+        }
+
+        return new CLI(inputs, output, watchInterval, executionMode, params);
+    }
+
+    private final Path[] inputs;
+    private final Path output;
+    private final Duration discoveryInterval;
+    private final RuntimeExecutionMode executionMode;
+    private final MultipleParameterTool params;
+
+    private CLI(
+            Path[] inputs,
+            Path output,
+            Duration discoveryInterval,
+            RuntimeExecutionMode executionMode,
+            MultipleParameterTool params) {
+        this.inputs = inputs;
+        this.output = output;
+        this.discoveryInterval = discoveryInterval;
+        this.executionMode = executionMode;
+        this.params = params;
+    }
+
+    public Optional<Path[]> getInputs() {
+        return Optional.ofNullable(inputs);
+    }
+
+    public Optional<Duration> getDiscoveryInterval() {
+        return Optional.ofNullable(discoveryInterval);
+    }
+
+    public Optional<Path> getOutput() {
+        return Optional.ofNullable(output);
+    }
+
+    public RuntimeExecutionMode getExecutionMode() {
+        return executionMode;
+    }
+
+    public OptionalInt getInt(String key) {
+        if (params.has(key)) {
+            return OptionalInt.of(params.getInt(key));
+        }
+
+        return OptionalInt.empty();
+    }
+
+    @Override
+    public Map<String, String> toMap() {
+        return params.toMap();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        CLI cli = (CLI) o;
+        return Arrays.equals(inputs, cli.inputs)
+                && Objects.equals(output, cli.output)
+                && Objects.equals(discoveryInterval, cli.discoveryInterval);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(output, discoveryInterval);
+        result = 31 * result + Arrays.hashCode(inputs);
+        return result;
+    }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
index 271d737..46350a5 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
@@ -18,74 +18,155 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
- * The input is a plain text file with lines separated by newline characters.
+ * The input is a [list of] plain text file[s] with lines separated by a newline character.
  *
  * Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
  *
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no input is provided, the program is run with default data from [[WordCountData]].
  *
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the results
+ * to `stdout`
  *
- *  - write a simple Flink Streaming program,
- *  - use tuple data types,
- *  - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or AUTOMATIC) of this
+ * pipeline.
  *
+ * This example shows how to:
+ *
+ *  - Write a simple Flink DataStream program
+ *  - Use tuple data types
+ *  - Write and use a user-defined function
  */
 object WordCount {
 
-  def main(args: Array[String]) {
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
 
-    // Checking input parameters
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // make parameters available in the web interface
+    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+    // application executed over bounded input will produce the same final results regardless
+    // of the configured execution mode. It is important to note what final means here: a job
+    // executing in STREAMING mode might produce incremental updates (think upserts in
+    // a database) while in BATCH mode, it would only produce one final result at the end. The
+    // final result will be the same if interpreted correctly, but getting there can be
+    // different.
+    //
+    // The “classic” execution behavior of the DataStream API is called STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs that require
+    // continuous incremental processing and are expected to stay online indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+    // can only do when we know that our input is bounded. For example, different
+    // join/aggregation strategies can be used, in addition to a different shuffle
+    // implementation that allows more efficient task scheduling and failure recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
+
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
 
     // get input data
-    val text =
-    // read the text file from given input path
-    if (params.has("input")) {
-      env.readTextFile(params.get("input"))
-    } else {
-      println("Executing WordCount example with default inputs data set.")
-      println("Use --input to specify file input.")
-      // get default test text data
-      env.fromElements(WordCountData.WORDS: _*)
+    val text = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of directories.
+        // Each file will be processed as plain text and split based on newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+      case None =>
+        env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
     }
 
-    val counts: DataStream[(String, Int)] = text
-      // split up the lines in pairs (2-tuples) containing: (word,1)
-      .flatMap(_.toLowerCase.split("\\W+"))
-      .filter(_.nonEmpty)
-      .map((_, 1))
-      // group by the tuple field "0" and sum up tuple field "1"
-      .keyBy(_._1)
-      .sum(1)
-
-    // emit result
-    if (params.has("output")) {
-      counts.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      counts.print()
+    val counts =
+      // The text lines read from the source are split into words
+      // using a user-defined function. The tokenizer, implemented below,
+      // will output each word as a (2-tuple) containing (word, 1)
+      text.flatMap(new Tokenizer)
+        .name("tokenizer")
+        // keyBy groups tuples based on the "_1" field, the word.
+        // Using a keyBy allows performing aggregations and other
+        // stateful transformations over data on a per-key basis.
+        // This is similar to a GROUP BY clause in a SQL query.
+        .keyBy(_._1)
+        // For each key, we perform a simple sum of the "1" field, the count.
+        // If the input data stream is bounded, sum will output a final count for
+        // each word. If it is unbounded, it will continuously output updates
+        // each time it sees a new instance of each word in the stream.
+        .sum(1)
+        .name("counter")
+
+    params.output match {
+      case Some(output) =>
+        // Given an output directory, Flink will write the results to a file
+        // using a simple string encoding. In a production environment, this might
+        // be something more structured like CSV, Avro, JSON, or Parquet.
+        counts.sinkTo(FileSink.forRowFormat[(String, Int)](output, new SimpleStringEncoder())
+          .withRollingPolicy(DefaultRollingPolicy.builder()
+              .withMaxPartSize(MemorySize.ofMebiBytes(1))
+              .withRolloverInterval(Duration.ofSeconds(10))
+            .build())
+          .build())
+          .name("file-sink")
+
+      case None => counts.print().name("print-sink")
     }
 
-    // execute program
-    env.execute("Streaming WordCount")
+    // Apache Flink applications are composed lazily. Calling execute
+    // submits the Job and begins processing.
+    env.execute("WordCount")
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   * Implements the string tokenizer that splits a sentence into words as a user-defined
+   * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
+   * form of "(word,1)".
+   */
+  final class Tokenizer extends FlatMapFunction[String, (String, Int)] {
+    override def flatMap(value: String, out: Collector[(String, Int)]): Unit = for {
+      token <- value.toLowerCase.split("\\W+")
+      if token.nonEmpty
+    } out.collect((token, 1))
   }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala
new file mode 100644
index 0000000..f7f7b0b
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.wordcount.util
+
+import org.apache.flink.api.common.{ExecutionConfig, RuntimeExecutionMode}
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.examples.wordcount.util.{CLI => JCLI}
+
+import java.time.Duration
+import java.util
+import java.util.Optional
+
+/**
+ * A simple CLI parser for the [[org.apache.flink.streaming.scala.examples.wordcount.WordCount]]
+ * example application.
+ */
+object CLI {
+  def fromArgs(args: Array[String]) = new CLI(JCLI.fromArgs(args))
+}
+
+class CLI private (val inner: JCLI) extends ExecutionConfig.GlobalJobParameters {
+
+  def input: Option[Array[Path]] = asScala(inner.getInputs)
+
+  def discoveryInterval: Option[Duration] = asScala(inner.getDiscoveryInterval)
+
+  def output: Option[Path] = asScala(inner.getOutput)
+
+  def executionMode: RuntimeExecutionMode = inner.getExecutionMode
+
+  def getInt(key: String): Option[Int] = {
+    val result = inner.getInt(key)
+    if (result.isPresent) {
+      Option(result.getAsInt)
+    } else {
+      None
+    }
+  }
+
+  override def equals(obj: Any): Boolean =
+    obj.isInstanceOf[CLI] && inner.equals(obj.asInstanceOf[CLI].inner)
+
+  override def hashCode(): Int = inner.hashCode()
+
+  override def toMap: util.Map[String, String] = inner.toMap
+
+  private def asScala[T](optional: Optional[T]): Option[T] =
+    Option(optional.orElse(null.asInstanceOf[T]))
+}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 7598f98..e45994f8 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -150,10 +150,11 @@ public class StreamingExamplesITCase extends AbstractTestBase {
         org.apache.flink.streaming.examples.wordcount.WordCount.main(
                 new String[] {
                     "--input", textPath,
-                    "--output", resultPath
+                    "--output", resultPath,
+                    "--execution-mode", "automatic"
                 });
 
-        compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+        compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath);
     }
 
     /**
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index de00b95..4d6fe8b 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -136,11 +136,12 @@ class StreamingExamplesITCase extends AbstractTestBase {
 
     WordCount.main(Array(
       "--input", textPath,
-      "--output", resultPath
+      "--output", resultPath,
+      "--execution-mode", "automatic"
     ))
 
     TestBaseUtils.compareResultsByLinesInMemory(
-      WordCountData.STREAMING_COUNTS_AS_TUPLES,
+      WordCountData.COUNTS_AS_TUPLES,
       resultPath)
   }
 }

[flink] 03/04: [hotfix][e2e] Run kerberos test under AUTOMATIC execution

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a3830ed24dd141f835d7b9871592058d7b08a969
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 08:02:27 2021 -0600

    [hotfix][e2e] Run kerberos test under AUTOMATIC execution
---
 .../test-scripts/test_yarn_application_kerberos_docker.sh      | 10 ----------
 .../test-scripts/test_yarn_job_kerberos_docker.sh              | 10 ----------
 2 files changed, 20 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
index 5309507..1ff51ca 100755
--- a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
@@ -25,18 +25,15 @@ source "$(dirname "$0")"/common_yarn_docker.sh
 
 # Configure Flink dir before making tarball.
 INPUT_TYPE=${1:-default-input}
-EXPECTED_RESULT_LOG_CONTAINS=()
 case $INPUT_TYPE in
     (default-input)
         INPUT_ARGS=""
-        EXPECTED_RESULT_LOG_CONTAINS=("consummation,1" "of,14" "calamity,1")
     ;;
     (dummy-fs)
         # shellcheck source=common_dummy_fs.sh
         source "$(dirname "$0")"/common_dummy_fs.sh
         dummy_fs_setup
         INPUT_ARGS="--input dummy://localhost/words --input anotherDummy://localhost/words"
-        EXPECTED_RESULT_LOG_CONTAINS=("my,2" "dear,4" "world,4")
     ;;
     (*)
         echo "Unknown input type $INPUT_TYPE"
@@ -75,13 +72,6 @@ wait_for_single_yarn_application
 OUTPUT=$(get_output "$OUTPUT_PATH/*")
 echo "$OUTPUT"
 
-for expected_result in "${EXPECTED_RESULT_LOG_CONTAINS[@]}"; do
-    if [[ ! "$OUTPUT" =~ $expected_result ]]; then
-        echo "Output does not contain '$expected_result' as required"
-        exit 1
-    fi
-done
-
 echo "Running Job without configured keytab, the exception you see below is expected"
 docker exec master bash -c "echo \"\" > /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
 # verify that it doesn't work if we don't configure a keytab
diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
index a86860f..372a72b 100755
--- a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
@@ -22,17 +22,14 @@ source "$(dirname "$0")"/common_yarn_docker.sh
 
 # Configure Flink dir before making tarball.
 INPUT_TYPE=${1:-default-input}
-EXPECTED_RESULT_LOG_CONTAINS=()
 case $INPUT_TYPE in
     (default-input)
         INPUT_ARGS=""
-        EXPECTED_RESULT_LOG_CONTAINS=("consummation,1" "of,14" "calamity,1")
     ;;
     (dummy-fs)
         source "$(dirname "$0")"/common_dummy_fs.sh
         dummy_fs_setup
         INPUT_ARGS="--input dummy://localhost/words --input anotherDummy://localhost/words"
-        EXPECTED_RESULT_LOG_CONTAINS=("my,2" "dear,4" "world,4")
     ;;
     (*)
         echo "Unknown input type $INPUT_TYPE"
@@ -60,13 +57,6 @@ else
     exit 1
 fi
 
-for expected_result in ${EXPECTED_RESULT_LOG_CONTAINS[@]}; do
-    if [[ ! "$OUTPUT" =~ $expected_result ]]; then
-        echo "Output does not contain '$expected_result' as required"
-        exit 1
-    fi
-done
-
 echo "Running Job without configured keytab, the exception you see below is expected"
 docker exec master bash -c "echo \"\" > /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
 # verify that it doesn't work if we don't configure a keytab

[flink] 02/04: [hotfix][yarn-tests] drop assertion on streaming word count output

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3762c3b2fd1e4ea717aeea8fc98810f86793a999
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Fri Nov 12 16:25:57 2021 -0600

    [hotfix][yarn-tests] drop assertion on streaming word count output
    
    The modern FileSink keeps output files hidden until a bucket roll;
    based on time, file size, or checkpoint. This is different from
    DataStream#writeTextFile which always writes visible files. The
    rolling behavior makes checking job output flakey, however, that
    is not really the point of this test so it is dropped.
---
 .../flink/yarn/YARNSessionCapacitySchedulerITCase.java       | 12 ------------
 1 file changed, 12 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index d878661..ce71d3b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -675,18 +675,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
                     content += FileUtils.readFileToString(f) + "\n";
                 }
             }
-            // String content = FileUtils.readFileToString(taskmanagerOut);
-            // check for some of the wordcount outputs.
-            Assert.assertTrue(
-                    "Expected string 'da 5' or '(all,2)' not found in string '" + content + "'",
-                    content.contains("da 5")
-                            || content.contains("(da,5)")
-                            || content.contains("(all,2)"));
-            Assert.assertTrue(
-                    "Expected string 'der 29' or '(mind,1)' not found in string'" + content + "'",
-                    content.contains("der 29")
-                            || content.contains("(der,29)")
-                            || content.contains("(mind,1)"));
 
             // check if the heap size for the TaskManager was set correctly
             File jobmanagerLog =

[flink] 01/04: [hotfix][testutils] Read result files recursively

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df2c47dd7bb942334596f19eadd63ac9efe66494
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Tue Nov 9 12:23:20 2021 -0600

    [hotfix][testutils] Read result files recursively
---
 .../org/apache/flink/test/util/TestBaseUtils.java  | 35 +++++++++++++---------
 1 file changed, 21 insertions(+), 14 deletions(-)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 9cdac72..a775055 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -44,6 +43,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -303,19 +304,25 @@ public class TestBaseUtils extends TestLogger {
         assertTrue("Result file was not written", result.exists());
 
         if (result.isDirectory()) {
-            return result.listFiles(
-                    new FilenameFilter() {
-
-                        @Override
-                        public boolean accept(File dir, String name) {
-                            for (String p : excludePrefixes) {
-                                if (name.startsWith(p)) {
-                                    return false;
-                                }
-                            }
-                            return true;
-                        }
-                    });
+            try {
+                return Files.walk(result.toPath())
+                        .filter(Files::isRegularFile)
+                        .filter(
+                                path -> {
+                                    for (String prefix : excludePrefixes) {
+                                        if (path.getFileName().startsWith(prefix)) {
+                                            return false;
+                                        }
+                                    }
+
+                                    return true;
+                                })
+                        .map(Path::toFile)
+                        .filter(file -> !file.isHidden())
+                        .toArray(File[]::new);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to retrieve result files");
+            }
         } else {
             return new File[] {result};
         }