You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/16 21:40:12 UTC

[flink] 01/11: [FLINK-24831][examples] Update DataStream Window examples

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 443b7121373a7f7c4b7a25660a50b06cbee8cb01
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:42:43 2021 -0600

    [FLINK-24831][examples] Update DataStream Window examples
---
 flink-examples/flink-examples-streaming/pom.xml    |   2 +
 .../GroupedProcessingTimeWindowExample.java        |  13 +-
 .../examples/windowing/SessionWindowing.java       |  20 ++-
 .../examples/windowing/TopSpeedWindowing.java      | 151 ++++++++++-----------
 .../examples/windowing/WindowWordCount.java        | 111 +++++++++++----
 .../examples/windowing/util/CarSource.java         |  77 +++++++++++
 .../GroupedProcessingTimeWindowExample.scala       |   7 +-
 .../examples/windowing/SessionWindowing.scala      |  20 ++-
 .../examples/windowing/TopSpeedWindowing.scala     | 133 ++++++++++--------
 .../scala/examples/windowing/WindowWordCount.scala | 126 ++++++++++++-----
 .../scala/examples/windowing/util/CarSource.scala  |  62 +++++++++
 .../streaming/test/StreamingExamplesITCase.java    |   6 +-
 .../windowing/TopSpeedWindowingExampleITCase.java  |   9 +-
 .../windowing/TopSpeedWindowingExampleITCase.java  |   3 +-
 .../scala/examples/StreamingExamplesITCase.scala   |   9 +-
 15 files changed, 518 insertions(+), 231 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index b4ab863..705f063 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -327,6 +327,8 @@ under the License.
 							<includes>
 								<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include>
 								<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/util/CarSource.class</include>
+								<include>org/apache/flink/streaming/examples/wordcoucnt/util/CLI.class</include>
 								<include>META-INF/LICENSE</include>
 								<include>META-INF/NOTICE</include>
 							</includes>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index c39166a..0662e70 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
@@ -33,15 +33,14 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 /**
- * An example of grouped stream windowing into sliding time windows. This example uses
- * [[RichParallelSourceFunction]] to generate a list of key-value pairs.
+ * An example of grouped stream windowing into sliding time windows. This example uses {@link
+ * RichParallelSourceFunction} to generate a list of key-value pairs.
  */
 public class GroupedProcessingTimeWindowExample {
 
     public static void main(String[] args) throws Exception {
 
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(4);
 
         DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());
 
@@ -57,11 +56,7 @@ public class GroupedProcessingTimeWindowExample {
                 // Time.milliseconds(500)))
                 //			.apply(new SummingWindowFunction())
 
-                .addSink(
-                        new SinkFunction<Tuple2<Long, Long>>() {
-                            @Override
-                            public void invoke(Tuple2<Long, Long> value) {}
-                        });
+                .addSink(new DiscardingSink<>());
 
         env.execute();
     }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index b36c5b2..ac59d89 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,15 +17,21 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,7 +41,6 @@ import java.util.List;
  */
 public class SessionWindowing {
 
-    @SuppressWarnings("serial")
     public static void main(String[] args) throws Exception {
 
         final ParameterTool params = ParameterTool.fromArgs(args);
@@ -85,7 +90,18 @@ public class SessionWindowing {
                         .sum(2);
 
         if (fileOutput) {
-            aggregated.writeAsText(params.get("output"));
+            aggregated
+                    .sinkTo(
+                            FileSink.<Tuple3<String, Long, Integer>>forRowFormat(
+                                            new Path(params.get("output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("output");
         } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             aggregated.print();
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 02180d3..7808e4b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -17,21 +17,26 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.examples.windowing.util.CarSource;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
 
-import java.util.Arrays;
-import java.util.Random;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -47,26 +52,64 @@ public class TopSpeedWindowing {
     // *************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        final ParameterTool params = ParameterTool.fromArgs(args);
-
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+        // application executed over bounded input will produce the same final results regardless
+        // of the configured execution mode. It is important to note what final means here: a job
+        // executing in STREAMING mode might produce incremental updates (think upserts in
+        // a database) while a BATCH job would only produce one final result at the end. The final
+        // result will be the same if interpreted correctly, but getting there can be different.
+        //
+        // The “classic” execution behavior of the DataStream API is called STREAMING execution
+        // mode. Applications should use streaming execution for unbounded jobs that require
+        // continuous incremental processing and are expected to stay online indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+        // can only do when we know that our input is bounded. For example, different
+        // join/aggregation strategies can be used, in addition to a different shuffle
+        // implementation that allows more efficient task scheduling and failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  if all sources
+        // are bounded and otherwise STREAMING.
+        env.setRuntimeMode(params.getExecutionMode());
+
+        // This optional step makes the input parameters
+        // available in the Flink UI.
         env.getConfig().setGlobalJobParameters(params);
 
-        @SuppressWarnings({"rawtypes", "serial"})
         DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
-        if (params.has("input")) {
-            carData = env.readTextFile(params.get("input")).map(new ParseCarData());
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set of directories.
+            // Each file will be processed as plain text and split based on newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineFormat(), params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            carData =
+                    env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+                            .map(new ParseCarData())
+                            .name("parse-input");
         } else {
-            System.out.println("Executing TopSpeedWindowing example with default input data set.");
-            System.out.println("Use --input to specify file input.");
-            carData = env.addSource(CarSource.create(2));
+            carData = env.addSource(CarSource.create(2)).name("in-memory-source");
         }
 
         int evictionSec = 10;
         double triggerMeters = 50;
         DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds =
-                carData.assignTimestampsAndWatermarks(new CarTimestamp())
+                carData.assignTimestampsAndWatermarks(
+                                WatermarkStrategy
+                                        .<Tuple4<Integer, Integer, Double, Long>>
+                                                forMonotonousTimestamps()
+                                        .withTimestampAssigner((car, ts) -> car.f3))
                         .keyBy(value -> value.f0)
                         .window(GlobalWindows.create())
                         .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
@@ -89,10 +132,22 @@ public class TopSpeedWindowing {
                                         carData.getType().createSerializer(env.getConfig())))
                         .maxBy(1);
 
-        if (params.has("output")) {
-            topSpeeds.writeAsText(params.get("output"));
+        if (params.getOutput().isPresent()) {
+            // Given an output directory, Flink will write the results to a file
+            // using a simple string encoding. In a production environment, this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            topSpeeds
+                    .sinkTo(
+                            FileSink.<Tuple4<Integer, Integer, Double, Long>>forRowFormat(
+                                            params.getOutput().get(), new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("file-sink");
         } else {
-            System.out.println("Printing result to stdout. Use --output to specify output path.");
             topSpeeds.print();
         }
 
@@ -103,58 +158,6 @@ public class TopSpeedWindowing {
     // USER FUNCTIONS
     // *************************************************************************
 
-    private static class CarSource
-            implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
-
-        private static final long serialVersionUID = 1L;
-        private Integer[] speeds;
-        private Double[] distances;
-
-        private Random rand = new Random();
-
-        private volatile boolean isRunning = true;
-
-        private CarSource(int numOfCars) {
-            speeds = new Integer[numOfCars];
-            distances = new Double[numOfCars];
-            Arrays.fill(speeds, 50);
-            Arrays.fill(distances, 0d);
-        }
-
-        public static CarSource create(int cars) {
-            return new CarSource(cars);
-        }
-
-        @Override
-        public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
-                throws Exception {
-
-            while (isRunning) {
-                Thread.sleep(100);
-                for (int carId = 0; carId < speeds.length; carId++) {
-                    if (rand.nextBoolean()) {
-                        speeds[carId] = Math.min(100, speeds[carId] + 5);
-                    } else {
-                        speeds[carId] = Math.max(0, speeds[carId] - 5);
-                    }
-                    distances[carId] += speeds[carId] / 3.6d;
-                    Tuple4<Integer, Integer, Double, Long> record =
-                            new Tuple4<>(
-                                    carId,
-                                    speeds[carId],
-                                    distances[carId],
-                                    System.currentTimeMillis());
-                    ctx.collect(record);
-                }
-            }
-        }
-
-        @Override
-        public void cancel() {
-            isRunning = false;
-        }
-    }
-
     private static class ParseCarData
             extends RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
         private static final long serialVersionUID = 1L;
@@ -170,14 +173,4 @@ public class TopSpeedWindowing {
                     Long.valueOf(data[3]));
         }
     }
-
-    private static class CarTimestamp
-            extends AscendingTimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public long extractAscendingTimestamp(Tuple4<Integer, Integer, Double, Long> element) {
-            return element.f3;
-        }
-    }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index 9cdecdb..cbb4acc 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -17,13 +17,22 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.examples.wordcount.WordCount;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 
+import java.time.Duration;
+
 /**
  * Implements a windowed version of the streaming "WordCount" program.
  *
@@ -49,48 +58,96 @@ public class WindowWordCount {
     // *************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        final ParameterTool params = ParameterTool.fromArgs(args);
-
-        // set up the execution environment
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // get input data
+        // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+        // application executed over bounded input will produce the same final results regardless
+        // of the configured execution mode. It is important to note what final means here: a job
+        // executing in STREAMING mode might produce incremental updates (think upserts in
+        // a database) while a BATCH job would only produce one final result at the end. The final
+        // result will be the same if interpreted correctly, but getting there can be different.
+        //
+        // The “classic” execution behavior of the DataStream API is called STREAMING execution
+        // mode. Applications should use streaming execution for unbounded jobs that require
+        // continuous incremental processing and are expected to stay online indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+        // can only do when we know that our input is bounded. For example, different
+        // join/aggregation strategies can be used, in addition to a different shuffle
+        // implementation that allows more efficient task scheduling and failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  if all sources
+        // are bounded and otherwise STREAMING.
+        env.setRuntimeMode(params.getExecutionMode());
+
+        // This optional step makes the input parameters
+        // available in the Flink UI.
+        env.getConfig().setGlobalJobParameters(params);
+
         DataStream<String> text;
-        if (params.has("input")) {
-            // read the text file from given input path
-            text = env.readTextFile(params.get("input"));
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set of directories.
+            // Each file will be processed as plain text and split based on newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineFormat(), params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
         } else {
-            System.out.println("Executing WindowWordCount example with default input data set.");
-            System.out.println("Use --input to specify file input.");
-            // get default test text data
-            text = env.fromElements(WordCountData.WORDS);
+            text = env.fromElements(WordCountData.WORDS).name("in-memory-input");
         }
 
-        // make parameters available in the web interface
-        env.getConfig().setGlobalJobParameters(params);
-
-        final int windowSize = params.getInt("window", 10);
-        final int slideSize = params.getInt("slide", 5);
+        int windowSize = params.getInt("window").orElse(250);
+        int slideSize = params.getInt("slide").orElse(150);
 
         DataStream<Tuple2<String, Integer>> counts =
-                // split up the lines in pairs (2-tuples) containing: (word,1)
+                // The text lines read from the source are split into words
+                // using a user-defined function. The tokenizer, implemented below,
+                // will output each words as a (2-tuple) containing (word, 1)
                 text.flatMap(new WordCount.Tokenizer())
-                        // create windows of windowSize records slided every slideSize records
+                        .name("tokenizer")
+                        // keyBy groups tuples based on the "0" field, the word.
+                        // Using a keyBy allows performing aggregations and other
+                        // stateful transformations over data on a per-key basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
                         .keyBy(value -> value.f0)
+                        // create windows of windowSize records slided every slideSize records
                         .countWindow(windowSize, slideSize)
-                        // group by the tuple field "0" and sum up tuple field "1"
-                        .sum(1);
+                        // For each key, we perform a simple sum of the "1" field, the count.
+                        // If the input data set is bounded, sum will output a final count for
+                        // each word. If it is unbounded, it will continuously output updates
+                        // each time it sees a new instance of each word in the stream.
+                        .sum(1)
+                        .name("counter");
 
-        // emit result
-        if (params.has("output")) {
-            counts.writeAsText(params.get("output"));
+        if (params.getOutput().isPresent()) {
+            // Given an output directory, Flink will write the results to a file
+            // using a simple string encoding. In a production environment, this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            counts.sinkTo(
+                            FileSink.<Tuple2<String, Integer>>forRowFormat(
+                                            params.getOutput().get(), new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("file-sink");
         } else {
-            System.out.println("Printing result to stdout. Use --output to specify output path.");
-            counts.print();
+            counts.print().name("print-sink");
         }
 
-        // execute program
+        // Apache Flink applications are composed lazily. Calling execute
+        // submits the Job and begins processing.
         env.execute("WindowWordCount");
     }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java
new file mode 100644
index 0000000..e1c4d23
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.Arrays;
+import java.util.Random;
+
+/** A simple in-memory source. */
+public class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+
+    private static final long serialVersionUID = 1L;
+    private Integer[] speeds;
+    private Double[] distances;
+
+    private Random rand = new Random();
+
+    private volatile boolean isRunning = true;
+
+    private CarSource(int numOfCars) {
+        speeds = new Integer[numOfCars];
+        distances = new Double[numOfCars];
+        Arrays.fill(speeds, 50);
+        Arrays.fill(distances, 0d);
+    }
+
+    public static CarSource create(int cars) {
+        return new CarSource(cars);
+    }
+
+    @Override
+    public void run(SourceFunction.SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
+            throws Exception {
+
+        while (isRunning) {
+            Thread.sleep(100);
+            for (int carId = 0; carId < speeds.length; carId++) {
+                if (rand.nextBoolean()) {
+                    speeds[carId] = Math.min(100, speeds[carId] + 5);
+                } else {
+                    speeds[carId] = Math.max(0, speeds[carId] - 5);
+                }
+                distances[carId] += speeds[carId] / 3.6d;
+                Tuple4<Integer, Integer, Double, Long> record =
+                        new Tuple4<>(
+                                carId, speeds[carId], distances[carId], System.currentTimeMillis());
+                ctx.collectWithTimestamp(record, record.f3);
+            }
+
+            ctx.emitWatermark(new Watermark(System.currentTimeMillis()));
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
index c12b118..a982feb 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.scala.examples.windowing
 
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.sink.{DiscardingSink, SinkFunction}
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -35,7 +35,6 @@ object GroupedProcessingTimeWindowExample {
   def main(args: Array[String]): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(4)
 
     val stream: DataStream[(Long, Long)] = env.addSource(new DataSource)
 
@@ -43,9 +42,7 @@ object GroupedProcessingTimeWindowExample {
       .keyBy(_._1)
       .window(SlidingProcessingTimeWindows.of(Time.milliseconds(2500), Time.milliseconds(500)))
       .reduce((value1, value2) => (value1._1, value1._2 + value2._2))
-      .addSink(new SinkFunction[(Long, Long)]() {
-        override def invoke(in: (Long, Long)): Unit = {}
-      })
+      .addSink(new DiscardingSink[(Long, Long)])
 
     env.execute()
   }
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
index 7fe483c..a5d0b7e 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
@@ -18,9 +18,13 @@
 
 package org.apache.flink.streaming.scala.examples.windowing
 
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -28,6 +32,8 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 
+import java.time.Duration
+
 /**
  * An example of grouped stream windowing in session windows with session timeout of 3 msec.
  * A source fetches elements with key, timestamp, and count.
@@ -40,8 +46,6 @@ object SessionWindowing {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     env.getConfig.setGlobalJobParameters(params)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
 
     val fileOutput = params.has("output")
 
@@ -80,7 +84,15 @@ object SessionWindowing {
       .sum(2)
 
     if (fileOutput) {
-      aggregated.writeAsText(params.get("output"))
+      aggregated.sinkTo(FileSink.forRowFormat[(String, Long, Int)](
+          new Path(params.get("output")),
+          new SimpleStringEncoder())
+        .withRollingPolicy(DefaultRollingPolicy.builder()
+          .withMaxPartSize(MemorySize.ofMebiBytes(1))
+          .withRolloverInterval(Duration.ofSeconds(10))
+          .build())
+        .build())
+        .name("file-sink")
     } else {
       print("Printing result to stdout. Use --output to specify output path.")
       aggregated.print()
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 33f9076..00cff26 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -19,22 +19,27 @@
 package org.apache.flink.streaming.scala.examples.windowing
 
 
-import java.beans.Transient
-import java.util.concurrent.TimeUnit
-
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.windowing.util.CarSource
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
 
-import scala.language.postfixOps
-import scala.util.Random
+import java.time.Duration
+import java.util.concurrent.TimeUnit
 
 /**
  * An example of grouped stream windowing where different eviction and 
@@ -56,50 +61,55 @@ object TopSpeedWindowing {
   val evictionSec = 10
   val triggerMeters = 50d
 
-  def main(args: Array[String]) {
-
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+    // application executed over bounded input will produce the same final results regardless
+    // of the configured execution mode. It is important to note what final means here: a job
+    // executing in STREAMING mode might produce incremental updates (think upserts in
+    // a database) while in BATCH mode, it would only produce one final result at the end. The
+    // final result will be the same if interpreted correctly, but getting there can be
+    // different.
+    //
+    // The “classic” execution behavior of the DataStream API is called STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs that require
+    // continuous incremental processing and are expected to stay online indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+    // can only do when we know that our input is bounded. For example, different
+    // join/aggregation strategies can be used, in addition to a different shuffle
+    // implementation that allows more efficient task scheduling and failure recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
+
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-
-    val cars =
-      if (params.has("input")) {
-        env.readTextFile(params.get("input"))
-          .map(parseMap(_))
-          .map(x => CarEvent(x._1, x._2, x._3, x._4))
-      } else {
-        println("Executing TopSpeedWindowing example with default inputs data set.")
-        println("Use --input to specify file input.")
-        env.addSource(new SourceFunction[CarEvent]() {
-
-          val speeds = Array.fill[Integer](numOfCars)(50)
-          val distances = Array.fill[Double](numOfCars)(0d)
-          @Transient lazy val rand = new Random()
-
-          var isRunning:Boolean = true
-
-          override def run(ctx: SourceContext[CarEvent]) = {
-            while (isRunning) {
-              Thread.sleep(100)
-
-              for (carId <- 0 until numOfCars) {
-                if (rand.nextBoolean) speeds(carId) = Math.min(100, speeds(carId) + 5)
-                else speeds(carId) = Math.max(0, speeds(carId) - 5)
-
-                distances(carId) += speeds(carId) / 3.6d
-                val record = CarEvent(carId, speeds(carId),
-                  distances(carId), System.currentTimeMillis)
-                ctx.collect(record)
-              }
-            }
-          }
-
-          override def cancel(): Unit = isRunning = false
-        })
-      }
+
+    val cars = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of directories.
+        // Each file will be processed as plain text and split based on newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env
+          .fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+          .map(line => parseMap(line))
+          .name("parse-input")
+      case None =>
+        env.addSource(CarSource(2)).name("in-memory-input")
+    }
 
     val topSpeeds = cars
       .assignAscendingTimestamps( _.time )
@@ -108,17 +118,26 @@ object TopSpeedWindowing {
       .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
       .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
         def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
-      }, cars.getType().createSerializer(env.getConfig)))
+      }, cars.dataType.createSerializer(env.getConfig)))
 //      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
 //      .every(Delta.of[CarEvent](triggerMeters,
 //          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
       .maxBy("speed")
 
-    if (params.has("output")) {
-      topSpeeds.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      topSpeeds.print()
+    params.output match {
+      case Some(output) =>
+        // Given an output directory, Flink will write the results to a file
+        // using a simple string encoding. In a production environment, this might
+        // be something more structured like CSV, Avro, JSON, or Parquet.
+        topSpeeds.sinkTo(FileSink.forRowFormat[CarEvent](output, new SimpleStringEncoder())
+          .withRollingPolicy(DefaultRollingPolicy.builder()
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .build())
+          .build())
+          .name("file-sink")
+
+      case None => topSpeeds.print().name("print-sink")
     }
 
     env.execute("TopSpeedWindowing")
@@ -129,8 +148,8 @@ object TopSpeedWindowing {
   // USER FUNCTIONS
   // *************************************************************************
 
-  def parseMap(line : String): (Int, Int, Double, Long) = {
+  def parseMap(line : String): CarEvent = {
     val record = line.substring(1, line.length - 1).split(",")
-    (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
+    CarEvent(record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
   }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
index 07efd90..79ed824 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
@@ -18,10 +18,20 @@
 
 package org.apache.flink.streaming.scala.examples.windowing
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.WordCount.Tokenizer
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+
+import java.time.Duration
 
 /**
  * Implements a windowed version of the streaming "WordCount" program.
@@ -50,51 +60,93 @@ import org.apache.flink.streaming.examples.wordcount.util.WordCountData
 object WindowWordCount {
 
   def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    val params = ParameterTool.fromArgs(args)
-
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // get input data
-    val text =
-    if (params.has("input")) {
-      // read the text file from given input path
-      env.readTextFile(params.get("input"))
-    } else {
-      println("Executing WindowWordCount example with default input data set.")
-      println("Use --input to specify file input.")
-      // get default test text data
-      env.fromElements(WordCountData.WORDS: _*)
-    }
+    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+    // application executed over bounded input will produce the same final results regardless
+    // of the configured execution mode. It is important to note what final means here: a job
+    // executing in STREAMING mode might produce incremental updates (think upserts in
+    // a database) while in BATCH mode, it would only produce one final result at the end. The
+    // final result will be the same if interpreted correctly, but getting there can be
+    // different.
+    //
+    // The “classic” execution behavior of the DataStream API is called STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs that require
+    // continuous incremental processing and are expected to stay online indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+    // can only do when we know that our input is bounded. For example, different
+    // join/aggregation strategies can be used, in addition to a different shuffle
+    // implementation that allows more efficient task scheduling and failure recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
 
-    // make parameters available in the web interface
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
 
-    val windowSize = params.getInt("window", 250)
-    val slideSize = params.getInt("slide", 150)
+    val text = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of directories.
+        // Each file will be processed as plain text and split based on newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+      case None =>
+        env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
+    }
 
-    val counts: DataStream[(String, Int)] = text
-      // split up the lines in pairs (2-tuple) containing: (word,1)
-      .flatMap(_.toLowerCase.split("\\W+"))
-      .filter(_.nonEmpty)
-      .map((_, 1))
-      .keyBy(_._1)
-      // create windows of windowSize records slided every slideSize records
-      .countWindow(windowSize, slideSize)
-      // group by the tuple field "0" and sum up tuple field "1"
-      .sum(1)
+    val windowSize = params.getInt("window").getOrElse(250)
+    val slideSize = params.getInt("slide").getOrElse(150)
 
-    // emit result
-    if (params.has("output")) {
-      counts.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      counts.print()
+    val counts =
+      // The text lines read from the source are split into words
+      // using a user-defined function. The tokenizer, implemented below,
+      // will output each words as a (2-tuple) containing (word, 1)
+      text.flatMap(new Tokenizer)
+        .name("tokenizer")
+        // keyBy groups tuples based on the "_1" field, the word.
+        // Using a keyBy allows performing aggregations and other
+        // stateful transformations over data on a per-key basis.
+        // This is similar to a GROUP BY clause in a SQL query.
+        .keyBy(_._1)
+        // create windows of windowSize records slided every slideSize records
+        .countWindow(windowSize, slideSize)
+        // For each key, we perform a simple sum of the "1" field, the count.
+        // If the input data set is bounded, sum will output a final count for
+        // each word. If it is unbounded, it will continuously output updates
+        // each time it sees a new instance of each word in the stream.
+        .sum(1)
+        .name("counter")
+
+    params.output match {
+      case Some(output) =>
+        // Given an output directory, Flink will write the results to a file
+        // using a simple string encoding. In a production environment, this might
+        // be something more structured like CSV, Avro, JSON, or Parquet.
+        counts.sinkTo(FileSink.forRowFormat[(String, Int)](output, new SimpleStringEncoder())
+          .withRollingPolicy(DefaultRollingPolicy.builder()
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .build())
+          .build())
+          .name("file-sink")
+
+      case None => counts.print().name("print-sink")
     }
 
-    // execute program
+    // Apache Flink applications are composed lazily. Calling execute
+    // submits the Job and begins processing.
     env.execute("WindowWordCount")
   }
-
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala
new file mode 100644
index 0000000..8f1d4c4
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing.util
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.examples.windowing.util.{CarSource => JCarSource}
+import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing.CarEvent
+import org.apache.flink.api.java.tuple.{Tuple4 => JTuple4}
+
+import java.lang.{Integer => JInt, Double => JDouble, Long => JLong}
+
+/** A simple in-memory source. */
+object CarSource {
+  def apply(cars: Int): CarSource =
+    new CarSource(JCarSource.create(cars))
+}
+
+class CarSource private (inner: JCarSource) extends SourceFunction[CarEvent] {
+
+  override def run(ctx: SourceFunction.SourceContext[CarEvent]): Unit = {
+    inner.run(new WrappingCollector(ctx))
+  }
+
+  override def cancel(): Unit = inner.cancel()
+}
+
+private class WrappingCollector(ctx: SourceFunction.SourceContext[CarEvent])
+  extends SourceFunction.SourceContext[JTuple4[JInt, JInt, JDouble, JLong]] {
+
+  override def collect(element: JTuple4[JInt, JInt, JDouble, JLong]): Unit =
+    ctx.collect(CarEvent(element.f0, element.f1, element.f2, element.f3))
+
+  override def collectWithTimestamp(
+                                     element: JTuple4[JInt, JInt, JDouble, JLong],
+                                     timestamp: Long): Unit =
+    ctx.collectWithTimestamp(CarEvent(element.f0, element.f1, element.f2, element.f3), timestamp)
+
+  override def emitWatermark(mark: Watermark): Unit = ctx.emitWatermark(mark)
+
+  override def markAsTemporarilyIdle(): Unit = ctx.markAsTemporarilyIdle()
+
+  override def getCheckpointLock: AnyRef = ctx.getCheckpointLock
+
+  override def close(): Unit = ctx.close()
+}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index e45994f8..80776b9 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
-import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
 import org.apache.flink.streaming.test.examples.join.WindowJoinData;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -118,13 +117,12 @@ public class StreamingExamplesITCase extends AbstractTestBase {
         final String resultPath = getTempDirPath("result");
         org.apache.flink.streaming.examples.windowing.SessionWindowing.main(
                 new String[] {"--output", resultPath});
-        compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
     }
 
     @Test
     public void testWindowWordCount() throws Exception {
-        final String windowSize = "250";
-        final String slideSize = "150";
+        final String windowSize = "25";
+        final String slideSize = "15";
         final String textPath = createTempFile("text.txt", WordCountData.TEXT);
         final String resultPath = getTempDirPath("result");
 
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index f980a1a..72dd508 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -53,7 +53,14 @@ public class TopSpeedWindowingExampleITCase extends TestLogger {
         final String resultPath = temporaryFolder.newFolder().toURI().toString();
 
         TopSpeedWindowing.main(
-                new String[] {"--input", inputFile.getAbsolutePath(), "--output", resultPath});
+                new String[] {
+                    "--input",
+                    inputFile.getAbsolutePath(),
+                    "--output",
+                    resultPath,
+                    "--execution-mode",
+                    "AUTOMATIC"
+                });
 
         compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
     }
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
index 1e33899..0c993c7 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -34,7 +34,8 @@ public class TopSpeedWindowingExampleITCase extends AbstractTestBase {
         TopSpeedWindowing.main(
                 new String[] {
                     "--input", textPath,
-                    "--output", resultPath
+                    "--output", resultPath,
+                    "--execution-mode", "AUTOMATIC"
                 });
 
         compareResultsByLinesInMemory(
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index 4d6fe8b..d55405f 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
-import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData
 import org.apache.flink.streaming.scala.examples.iteration.IterateExample
 import org.apache.flink.streaming.scala.examples.join.WindowJoin
 import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
@@ -106,13 +105,12 @@ class StreamingExamplesITCase extends AbstractTestBase {
   def testSessionWindowing(): Unit = {
     val resultPath = getTempDirPath("result")
     SessionWindowing.main(Array("--output", resultPath))
-    TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
   }
 
   @Test
   def testWindowWordCount(): Unit = {
-    val windowSize = "250"
-    val slideSize = "150"
+    val windowSize = "25"
+    val slideSize = "15"
     val textPath = createTempFile("text.txt", WordCountData.TEXT)
     val resultPath = getTempDirPath("result")
 
@@ -120,7 +118,8 @@ class StreamingExamplesITCase extends AbstractTestBase {
       "--input", textPath,
       "--output", resultPath,
       "--window", windowSize,
-      "--slide", slideSize
+      "--slide", slideSize,
+      "--execution-mode", "AUTOMATIC"
     ))
 
     // since the parallel tokenizers might have different speed